You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/10/22 11:31:43 UTC

[1/2] incubator-kylin git commit: check all TODO

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-942 5ac205501 -> e877e8351


check all TODO


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b0364897
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b0364897
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b0364897

Branch: refs/heads/KYLIN-942
Commit: b0364897658d5212181b2547555536a3e18fcc1d
Parents: 5ac2055
Author: honma <ho...@ebay.com>
Authored: Thu Oct 22 17:05:20 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Oct 22 17:05:20 2015 +0800

----------------------------------------------------------------------
 .../kylin/cube/gridtable/TrimmedCubeCodeSystem.java  | 15 ++++++---------
 .../storage/hbase/cube/v2/CubeHBaseEndpointRPC.java  |  1 -
 .../coprocessor/endpoint/EndpointTupleIterator.java  |  1 -
 3 files changed, 6 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0364897/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
index ea020f3..e4f32fb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
@@ -74,15 +74,12 @@ public class TrimmedCubeCodeSystem implements IGTCodeSystem {
     @Override
     public void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) {
         DataTypeSerializer serializer = serializers[col];
-        if (serializer instanceof CubeCodeSystem.TrimmedDictionarySerializer || serializer instanceof CubeCodeSystem.DictionarySerializer) {
-            //TODO: remove this check
-            throw new IllegalStateException("Encode dictionary value in coprocessor");
-        } else {
-            if (((value instanceof String) && !(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer))) {
-                value = serializer.valueOf((String) value);
-            }
-            serializer.serialize(value, buf);
-        }
+
+//        if (((value instanceof String) && !(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer))) {
+//            value = serializer.valueOf((String) value);
+//        }
+        
+        serializer.serialize(value, buf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0364897/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index bb2a18a..9aba128 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -221,7 +221,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     }
 
-    //TODO : async callback ?
     private Collection<CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable {
         Map<byte[], CubeVisitProtos.CubeVisitResponse> results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>() {
             public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0364897/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
index 9ff7aa4..92c7e05 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
@@ -288,7 +288,6 @@ public class EndpointTupleIterator implements ITupleIterator {
         return request;
     }
 
-    //TODO : async callback
     private Collection<IIProtos.IIResponse> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable {
         Map<byte[], IIProtos.IIResponse> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call<IIProtos.RowsService, IIProtos.IIResponse>() {
             public IIProtos.IIResponse call(IIProtos.RowsService rowsService) throws IOException {


[2/2] incubator-kylin git commit: HBaseZeroCopyByteString

Posted by ma...@apache.org.
HBaseZeroCopyByteString


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e877e835
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e877e835
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e877e835

Branch: refs/heads/KYLIN-942
Commit: e877e83519c204b6fdaef71cf363d4058c4c14a3
Parents: b036489
Author: honma <ho...@ebay.com>
Authored: Thu Oct 22 17:30:36 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Oct 22 17:30:36 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/hadoop/invertedindex/IITest.java  |  3 ++-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  7 ++++---
 .../coprocessor/endpoint/CubeVisitService.java  |  8 +++----
 .../endpoint/EndpointTupleIterator.java         | 20 +++++++++---------
 .../ii/coprocessor/endpoint/IIEndpoint.java     | 22 ++++++++++----------
 5 files changed, 31 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index dcd460b..749962f 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -10,6 +10,7 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
+import com.google.protobuf.HBaseZeroCopyByteString;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -231,7 +232,7 @@ public class IITest extends LocalFileMetadataTestCase {
         System.out.println(response.getRowsList().size());
         Set<String> answers = Sets.newHashSet("120.4747", "26.8551");
         for (IIProtos.IIResponseInternal.IIRow responseRow : response.getRowsList()) {
-            byte[] measuresBytes = responseRow.getMeasures().toByteArray();
+            byte[] measuresBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(responseRow.getMeasures());
             List<Object> metrics = aggregators.deserializeMetricValues(measuresBytes, 0);
             Assert.assertTrue(answers.contains(metrics.get(0)));
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 9aba128..86bc42d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -56,6 +56,7 @@ import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
 
 public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
@@ -143,7 +144,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         }
 
         byte[] scanRequestBytes = KryoUtils.serialize(scanRequest);
-        final ByteString scanRequestBytesString = ByteString.copyFrom(scanRequestBytes);
+        final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(scanRequestBytes);
 
         ExecutorService executorService = Executors.newFixedThreadPool(rawScans.size());
         final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList());
@@ -162,7 +163,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 public void run() {
                     final byte[] rawScanBytes = KryoUtils.serialize(rawScan);
                     CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
-                    builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(ByteString.copyFrom(rawScanBytes));
+                    builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(HBaseZeroCopyByteString.wrap(rawScanBytes));
                     for (IntList intList : hbaseColumnsToGTIntList) {
                         builder.addHbaseColumnsToGT(intList);
                     }
@@ -188,7 +189,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                         @Override
                         public byte[] apply(CubeVisitProtos.CubeVisitResponse input) {
                             try {
-                                return CompressionUtils.decompress(input.getCompressedRows().toByteArray());
+                                return CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(input.getCompressedRows()));
                             } catch (IOException | DataFormatException e) {
                                 throw new RuntimeException(e);
                             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 5e14474..ba766bd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -126,8 +126,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
         try {
             this.serviceStartTime = System.currentTimeMillis();
 
-            GTScanRequest scanReq = KryoUtils.deserialize(request.getGtScanRequest().toByteArray(), GTScanRequest.class);
-            RawScan hbaseRawScan = KryoUtils.deserialize(request.getHbaseRawScan().toByteArray(), RawScan.class);
+            GTScanRequest scanReq = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()), GTScanRequest.class);
+            RawScan hbaseRawScan = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()), RawScan.class);
             List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
             for (IntList intList : request.getHbaseColumnsToGTList()) {
                 hbaseColumnsToGT.add(intList.getIntsList());
@@ -161,7 +161,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             byte[] allRows = outputStream.toByteArray();
             CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
             done.run(responseBuilder.//
-                    setCompressedRows(ByteString.copyFrom(CompressionUtils.compress(allRows))).//too many array copies 
+                    setCompressedRows(HBaseZeroCopyByteString.wrap(CompressionUtils.compress(allRows))).//too many array copies 
                     setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().//
                             setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).//
                             setScannedRowCount(finalScanner.getScannedRowCount()).//

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
index 92c7e05..6d3ec4d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
@@ -51,9 +51,9 @@ import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.cache.TsConditionExtractor;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
 import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
+import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
 import org.apache.kylin.storage.tuple.Tuple;
 import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
@@ -67,7 +67,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.Ranges;
 import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
 
 /**
  */
@@ -152,7 +152,7 @@ public class EndpointTupleIterator implements ITupleIterator {
         //decompress
         Collection<IIProtos.IIResponseInternal> shardResults = new ArrayList<>();
         for (IIProtos.IIResponse input : compressedShardResults) {
-            byte[] compressed = input.getBlob().toByteArray();
+            byte[] compressed = HBaseZeroCopyByteString.zeroCopyGetBytes(input.getBlob());
             try {
                 byte[] decompressed = CompressionUtils.decompress(compressed);
                 shardResults.add(IIProtos.IIResponseInternal.parseFrom(decompressed));
@@ -275,13 +275,13 @@ public class EndpointTupleIterator implements ITupleIterator {
 
         if (this.tsRange != null) {
             byte[] tsRangeBytes = SerializationUtils.serialize(this.tsRange);
-            builder.setTsRange(ByteString.copyFrom(tsRangeBytes));
+            builder.setTsRange(HBaseZeroCopyByteString.wrap(tsRangeBytes));
         }
 
-        builder.setType(ByteString.copyFrom(CoprocessorRowType.serialize(pushedDownRowType))) //
-                .setFilter(ByteString.copyFrom(CoprocessorFilter.serialize(pushedDownFilter))) //
-                .setProjector(ByteString.copyFrom(CoprocessorProjector.serialize(pushedDownProjector))) //
-                .setAggregator(ByteString.copyFrom(EndpointAggregators.serialize(pushedDownAggregators)));
+        builder.setType(HBaseZeroCopyByteString.wrap(CoprocessorRowType.serialize(pushedDownRowType))) //
+                .setFilter(HBaseZeroCopyByteString.wrap(CoprocessorFilter.serialize(pushedDownFilter))) //
+                .setProjector(HBaseZeroCopyByteString.wrap(CoprocessorProjector.serialize(pushedDownProjector))) //
+                .setAggregator(HBaseZeroCopyByteString.wrap(EndpointAggregators.serialize(pushedDownAggregators)));
 
         IIProtos.IIRequest request = builder.build();
 
@@ -337,10 +337,10 @@ public class EndpointTupleIterator implements ITupleIterator {
             }
 
             IIProtos.IIResponseInternal.IIRow currentRow = rows.get(index);
-            byte[] columnsBytes = currentRow.getColumns().toByteArray();
+            byte[] columnsBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getColumns());
             this.tableRecord.setBytes(columnsBytes, 0, columnsBytes.length);
             if (currentRow.hasMeasures()) {
-                byte[] measuresBytes = currentRow.getMeasures().toByteArray();
+                byte[] measuresBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getMeasures());
 
                 this.measureValues = pushedDownAggregators.deserializeMetricValues(measuresBytes, 0);
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
index 5f8fefe..6173241 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
@@ -53,15 +53,15 @@ import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorConstants;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
 import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
 import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Range;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -88,7 +88,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
         scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES);
 
         if (request.hasTsRange()) {
-            Range<Long> tsRange = (Range<Long>) SerializationUtils.deserialize(request.getTsRange().toByteArray());
+            Range<Long> tsRange = (Range<Long>) SerializationUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getTsRange()));
             byte[] regionStartKey = region.getStartKey();
             if (!ArrayUtils.isEmpty(regionStartKey)) {
                 shard = BytesUtil.readUnsigned(regionStartKey, 0, IIKeyValueCodec.SHARD_LEN);
@@ -148,15 +148,15 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
 
             innerScanner = region.getScanner(prepareScan(request, region));
 
-            CoprocessorRowType type = CoprocessorRowType.deserialize(request.getType().toByteArray());
-            CoprocessorProjector projector = CoprocessorProjector.deserialize(request.getProjector().toByteArray());
-            EndpointAggregators aggregators = EndpointAggregators.deserialize(request.getAggregator().toByteArray());
-            CoprocessorFilter filter = CoprocessorFilter.deserialize(request.getFilter().toByteArray());
+            CoprocessorRowType type = CoprocessorRowType.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getType()));
+            CoprocessorProjector projector = CoprocessorProjector.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getProjector()));
+            EndpointAggregators aggregators = EndpointAggregators.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getAggregator()));
+            CoprocessorFilter filter = CoprocessorFilter.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getFilter()));
 
             //compression
             IIProtos.IIResponseInternal response = getResponse(innerScanner, type, projector, aggregators, filter);
             byte[] compressed = CompressionUtils.compress(response.toByteArray());
-            IIProtos.IIResponse compressedR = IIProtos.IIResponse.newBuilder().setBlob(ByteString.copyFrom(compressed)).build();
+            IIProtos.IIResponse compressedR = IIProtos.IIResponse.newBuilder().setBlob(HBaseZeroCopyByteString.wrap(compressed)).build();
 
             done.run(compressedR);
         } catch (IOException ioe) {
@@ -257,7 +257,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
                     if (totalByteFormLen >= MEMORY_LIMIT) {
                         throw new RuntimeException("the query has exceeded the memory limit, please check the query");
                     }
-                    IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(ByteString.copyFrom(recordBuffer));
+                    IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(recordBuffer));
                     responseBuilder.addRows(rowBuilder.build());
                     totalByteFormLen += byteFormLen;
                 }
@@ -269,9 +269,9 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
         if (needAgg) {
             for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
                 AggrKey aggrKey = entry.getKey();
-                IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(ByteString.copyFrom(aggrKey.get(), aggrKey.offset(), aggrKey.length()));
+                IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(aggrKey.get(), aggrKey.offset(), aggrKey.length()));
                 int length = aggregators.serializeMetricValues(entry.getValue(), buffer);
-                rowBuilder.setMeasures(ByteString.copyFrom(buffer, 0, length));
+                rowBuilder.setMeasures(HBaseZeroCopyByteString.wrap(buffer, 0, length));
                 responseBuilder.addRows(rowBuilder.build());
             }
         }