You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/10/22 11:31:44 UTC

[2/2] incubator-kylin git commit: HBaseZeroCopyByteString

HBaseZeroCopyByteString


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e877e835
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e877e835
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e877e835

Branch: refs/heads/KYLIN-942
Commit: e877e83519c204b6fdaef71cf363d4058c4c14a3
Parents: b036489
Author: honma <ho...@ebay.com>
Authored: Thu Oct 22 17:30:36 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Oct 22 17:30:36 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/hadoop/invertedindex/IITest.java  |  3 ++-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  7 ++++---
 .../coprocessor/endpoint/CubeVisitService.java  |  8 +++----
 .../endpoint/EndpointTupleIterator.java         | 20 +++++++++---------
 .../ii/coprocessor/endpoint/IIEndpoint.java     | 22 ++++++++++----------
 5 files changed, 31 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index dcd460b..749962f 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -10,6 +10,7 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
+import com.google.protobuf.HBaseZeroCopyByteString;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -231,7 +232,7 @@ public class IITest extends LocalFileMetadataTestCase {
         System.out.println(response.getRowsList().size());
         Set<String> answers = Sets.newHashSet("120.4747", "26.8551");
         for (IIProtos.IIResponseInternal.IIRow responseRow : response.getRowsList()) {
-            byte[] measuresBytes = responseRow.getMeasures().toByteArray();
+            byte[] measuresBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(responseRow.getMeasures());
             List<Object> metrics = aggregators.deserializeMetricValues(measuresBytes, 0);
             Assert.assertTrue(answers.contains(metrics.get(0)));
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 9aba128..86bc42d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -56,6 +56,7 @@ import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
 
 public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
@@ -143,7 +144,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         }
 
         byte[] scanRequestBytes = KryoUtils.serialize(scanRequest);
-        final ByteString scanRequestBytesString = ByteString.copyFrom(scanRequestBytes);
+        final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(scanRequestBytes);
 
         ExecutorService executorService = Executors.newFixedThreadPool(rawScans.size());
         final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList());
@@ -162,7 +163,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 public void run() {
                     final byte[] rawScanBytes = KryoUtils.serialize(rawScan);
                     CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
-                    builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(ByteString.copyFrom(rawScanBytes));
+                    builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(HBaseZeroCopyByteString.wrap(rawScanBytes));
                     for (IntList intList : hbaseColumnsToGTIntList) {
                         builder.addHbaseColumnsToGT(intList);
                     }
@@ -188,7 +189,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                         @Override
                         public byte[] apply(CubeVisitProtos.CubeVisitResponse input) {
                             try {
-                                return CompressionUtils.decompress(input.getCompressedRows().toByteArray());
+                                return CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(input.getCompressedRows()));
                             } catch (IOException | DataFormatException e) {
                                 throw new RuntimeException(e);
                             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 5e14474..ba766bd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -126,8 +126,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
         try {
             this.serviceStartTime = System.currentTimeMillis();
 
-            GTScanRequest scanReq = KryoUtils.deserialize(request.getGtScanRequest().toByteArray(), GTScanRequest.class);
-            RawScan hbaseRawScan = KryoUtils.deserialize(request.getHbaseRawScan().toByteArray(), RawScan.class);
+            GTScanRequest scanReq = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()), GTScanRequest.class);
+            RawScan hbaseRawScan = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()), RawScan.class);
             List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
             for (IntList intList : request.getHbaseColumnsToGTList()) {
                 hbaseColumnsToGT.add(intList.getIntsList());
@@ -161,7 +161,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             byte[] allRows = outputStream.toByteArray();
             CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
             done.run(responseBuilder.//
-                    setCompressedRows(ByteString.copyFrom(CompressionUtils.compress(allRows))).//too many array copies 
+                    setCompressedRows(HBaseZeroCopyByteString.wrap(CompressionUtils.compress(allRows))).//too many array copies 
                     setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().//
                             setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).//
                             setScannedRowCount(finalScanner.getScannedRowCount()).//

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
index 92c7e05..6d3ec4d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
@@ -51,9 +51,9 @@ import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.cache.TsConditionExtractor;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
 import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
+import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
 import org.apache.kylin.storage.tuple.Tuple;
 import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
@@ -67,7 +67,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.Ranges;
 import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
 
 /**
  */
@@ -152,7 +152,7 @@ public class EndpointTupleIterator implements ITupleIterator {
         //decompress
         Collection<IIProtos.IIResponseInternal> shardResults = new ArrayList<>();
         for (IIProtos.IIResponse input : compressedShardResults) {
-            byte[] compressed = input.getBlob().toByteArray();
+            byte[] compressed = HBaseZeroCopyByteString.zeroCopyGetBytes(input.getBlob());
             try {
                 byte[] decompressed = CompressionUtils.decompress(compressed);
                 shardResults.add(IIProtos.IIResponseInternal.parseFrom(decompressed));
@@ -275,13 +275,13 @@ public class EndpointTupleIterator implements ITupleIterator {
 
         if (this.tsRange != null) {
             byte[] tsRangeBytes = SerializationUtils.serialize(this.tsRange);
-            builder.setTsRange(ByteString.copyFrom(tsRangeBytes));
+            builder.setTsRange(HBaseZeroCopyByteString.wrap(tsRangeBytes));
         }
 
-        builder.setType(ByteString.copyFrom(CoprocessorRowType.serialize(pushedDownRowType))) //
-                .setFilter(ByteString.copyFrom(CoprocessorFilter.serialize(pushedDownFilter))) //
-                .setProjector(ByteString.copyFrom(CoprocessorProjector.serialize(pushedDownProjector))) //
-                .setAggregator(ByteString.copyFrom(EndpointAggregators.serialize(pushedDownAggregators)));
+        builder.setType(HBaseZeroCopyByteString.wrap(CoprocessorRowType.serialize(pushedDownRowType))) //
+                .setFilter(HBaseZeroCopyByteString.wrap(CoprocessorFilter.serialize(pushedDownFilter))) //
+                .setProjector(HBaseZeroCopyByteString.wrap(CoprocessorProjector.serialize(pushedDownProjector))) //
+                .setAggregator(HBaseZeroCopyByteString.wrap(EndpointAggregators.serialize(pushedDownAggregators)));
 
         IIProtos.IIRequest request = builder.build();
 
@@ -337,10 +337,10 @@ public class EndpointTupleIterator implements ITupleIterator {
             }
 
             IIProtos.IIResponseInternal.IIRow currentRow = rows.get(index);
-            byte[] columnsBytes = currentRow.getColumns().toByteArray();
+            byte[] columnsBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getColumns());
             this.tableRecord.setBytes(columnsBytes, 0, columnsBytes.length);
             if (currentRow.hasMeasures()) {
-                byte[] measuresBytes = currentRow.getMeasures().toByteArray();
+                byte[] measuresBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getMeasures());
 
                 this.measureValues = pushedDownAggregators.deserializeMetricValues(measuresBytes, 0);
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
index 5f8fefe..6173241 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
@@ -53,15 +53,15 @@ import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorConstants;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
 import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
 import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Range;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -88,7 +88,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
         scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES);
 
         if (request.hasTsRange()) {
-            Range<Long> tsRange = (Range<Long>) SerializationUtils.deserialize(request.getTsRange().toByteArray());
+            Range<Long> tsRange = (Range<Long>) SerializationUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getTsRange()));
             byte[] regionStartKey = region.getStartKey();
             if (!ArrayUtils.isEmpty(regionStartKey)) {
                 shard = BytesUtil.readUnsigned(regionStartKey, 0, IIKeyValueCodec.SHARD_LEN);
@@ -148,15 +148,15 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
 
             innerScanner = region.getScanner(prepareScan(request, region));
 
-            CoprocessorRowType type = CoprocessorRowType.deserialize(request.getType().toByteArray());
-            CoprocessorProjector projector = CoprocessorProjector.deserialize(request.getProjector().toByteArray());
-            EndpointAggregators aggregators = EndpointAggregators.deserialize(request.getAggregator().toByteArray());
-            CoprocessorFilter filter = CoprocessorFilter.deserialize(request.getFilter().toByteArray());
+            CoprocessorRowType type = CoprocessorRowType.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getType()));
+            CoprocessorProjector projector = CoprocessorProjector.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getProjector()));
+            EndpointAggregators aggregators = EndpointAggregators.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getAggregator()));
+            CoprocessorFilter filter = CoprocessorFilter.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getFilter()));
 
             //compression
             IIProtos.IIResponseInternal response = getResponse(innerScanner, type, projector, aggregators, filter);
             byte[] compressed = CompressionUtils.compress(response.toByteArray());
-            IIProtos.IIResponse compressedR = IIProtos.IIResponse.newBuilder().setBlob(ByteString.copyFrom(compressed)).build();
+            IIProtos.IIResponse compressedR = IIProtos.IIResponse.newBuilder().setBlob(HBaseZeroCopyByteString.wrap(compressed)).build();
 
             done.run(compressedR);
         } catch (IOException ioe) {
@@ -257,7 +257,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
                     if (totalByteFormLen >= MEMORY_LIMIT) {
                         throw new RuntimeException("the query has exceeded the memory limit, please check the query");
                     }
-                    IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(ByteString.copyFrom(recordBuffer));
+                    IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(recordBuffer));
                     responseBuilder.addRows(rowBuilder.build());
                     totalByteFormLen += byteFormLen;
                 }
@@ -269,9 +269,9 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
         if (needAgg) {
             for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
                 AggrKey aggrKey = entry.getKey();
-                IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(ByteString.copyFrom(aggrKey.get(), aggrKey.offset(), aggrKey.length()));
+                IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(aggrKey.get(), aggrKey.offset(), aggrKey.length()));
                 int length = aggregators.serializeMetricValues(entry.getValue(), buffer);
-                rowBuilder.setMeasures(ByteString.copyFrom(buffer, 0, length));
+                rowBuilder.setMeasures(HBaseZeroCopyByteString.wrap(buffer, 0, length));
                 responseBuilder.addRows(rowBuilder.build());
             }
         }