You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/10/22 11:31:44 UTC
[2/2] incubator-kylin git commit: HBaseZeroCopyByteString
HBaseZeroCopyByteString
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e877e835
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e877e835
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e877e835
Branch: refs/heads/KYLIN-942
Commit: e877e83519c204b6fdaef71cf363d4058c4c14a3
Parents: b036489
Author: honma <ho...@ebay.com>
Authored: Thu Oct 22 17:30:36 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Oct 22 17:30:36 2015 +0800
----------------------------------------------------------------------
.../kylin/job/hadoop/invertedindex/IITest.java | 3 ++-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 7 ++++---
.../coprocessor/endpoint/CubeVisitService.java | 8 +++----
.../endpoint/EndpointTupleIterator.java | 20 +++++++++---------
.../ii/coprocessor/endpoint/IIEndpoint.java | 22 ++++++++++----------
5 files changed, 31 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index dcd460b..749962f 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -10,6 +10,7 @@ import java.util.Set;
import javax.annotation.Nullable;
+import com.google.protobuf.HBaseZeroCopyByteString;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -231,7 +232,7 @@ public class IITest extends LocalFileMetadataTestCase {
System.out.println(response.getRowsList().size());
Set<String> answers = Sets.newHashSet("120.4747", "26.8551");
for (IIProtos.IIResponseInternal.IIRow responseRow : response.getRowsList()) {
- byte[] measuresBytes = responseRow.getMeasures().toByteArray();
+ byte[] measuresBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(responseRow.getMeasures());
List<Object> metrics = aggregators.deserializeMetricValues(measuresBytes, 0);
Assert.assertTrue(answers.contains(metrics.get(0)));
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 9aba128..86bc42d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -56,6 +56,7 @@ import com.google.common.collect.Collections2;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
@@ -143,7 +144,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}
byte[] scanRequestBytes = KryoUtils.serialize(scanRequest);
- final ByteString scanRequestBytesString = ByteString.copyFrom(scanRequestBytes);
+ final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(scanRequestBytes);
ExecutorService executorService = Executors.newFixedThreadPool(rawScans.size());
final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList());
@@ -162,7 +163,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
public void run() {
final byte[] rawScanBytes = KryoUtils.serialize(rawScan);
CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
- builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(ByteString.copyFrom(rawScanBytes));
+ builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(HBaseZeroCopyByteString.wrap(rawScanBytes));
for (IntList intList : hbaseColumnsToGTIntList) {
builder.addHbaseColumnsToGT(intList);
}
@@ -188,7 +189,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
@Override
public byte[] apply(CubeVisitProtos.CubeVisitResponse input) {
try {
- return CompressionUtils.decompress(input.getCompressedRows().toByteArray());
+ return CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(input.getCompressedRows()));
} catch (IOException | DataFormatException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 5e14474..ba766bd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
@@ -126,8 +126,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
try {
this.serviceStartTime = System.currentTimeMillis();
- GTScanRequest scanReq = KryoUtils.deserialize(request.getGtScanRequest().toByteArray(), GTScanRequest.class);
- RawScan hbaseRawScan = KryoUtils.deserialize(request.getHbaseRawScan().toByteArray(), RawScan.class);
+ GTScanRequest scanReq = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()), GTScanRequest.class);
+ RawScan hbaseRawScan = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()), RawScan.class);
List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
for (IntList intList : request.getHbaseColumnsToGTList()) {
hbaseColumnsToGT.add(intList.getIntsList());
@@ -161,7 +161,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
byte[] allRows = outputStream.toByteArray();
CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
done.run(responseBuilder.//
- setCompressedRows(ByteString.copyFrom(CompressionUtils.compress(allRows))).//too many array copies
+ setCompressedRows(HBaseZeroCopyByteString.wrap(CompressionUtils.compress(allRows))).//too many array copies
setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().//
setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).//
setScannedRowCount(finalScanner.getScannedRowCount()).//
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
index 92c7e05..6d3ec4d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
@@ -51,9 +51,9 @@ import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.cache.TsConditionExtractor;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
+import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
import org.apache.kylin.storage.tuple.Tuple;
import org.apache.kylin.storage.tuple.TupleInfo;
import org.slf4j.Logger;
@@ -67,7 +67,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Ranges;
import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
/**
*/
@@ -152,7 +152,7 @@ public class EndpointTupleIterator implements ITupleIterator {
//decompress
Collection<IIProtos.IIResponseInternal> shardResults = new ArrayList<>();
for (IIProtos.IIResponse input : compressedShardResults) {
- byte[] compressed = input.getBlob().toByteArray();
+ byte[] compressed = HBaseZeroCopyByteString.zeroCopyGetBytes(input.getBlob());
try {
byte[] decompressed = CompressionUtils.decompress(compressed);
shardResults.add(IIProtos.IIResponseInternal.parseFrom(decompressed));
@@ -275,13 +275,13 @@ public class EndpointTupleIterator implements ITupleIterator {
if (this.tsRange != null) {
byte[] tsRangeBytes = SerializationUtils.serialize(this.tsRange);
- builder.setTsRange(ByteString.copyFrom(tsRangeBytes));
+ builder.setTsRange(HBaseZeroCopyByteString.wrap(tsRangeBytes));
}
- builder.setType(ByteString.copyFrom(CoprocessorRowType.serialize(pushedDownRowType))) //
- .setFilter(ByteString.copyFrom(CoprocessorFilter.serialize(pushedDownFilter))) //
- .setProjector(ByteString.copyFrom(CoprocessorProjector.serialize(pushedDownProjector))) //
- .setAggregator(ByteString.copyFrom(EndpointAggregators.serialize(pushedDownAggregators)));
+ builder.setType(HBaseZeroCopyByteString.wrap(CoprocessorRowType.serialize(pushedDownRowType))) //
+ .setFilter(HBaseZeroCopyByteString.wrap(CoprocessorFilter.serialize(pushedDownFilter))) //
+ .setProjector(HBaseZeroCopyByteString.wrap(CoprocessorProjector.serialize(pushedDownProjector))) //
+ .setAggregator(HBaseZeroCopyByteString.wrap(EndpointAggregators.serialize(pushedDownAggregators)));
IIProtos.IIRequest request = builder.build();
@@ -337,10 +337,10 @@ public class EndpointTupleIterator implements ITupleIterator {
}
IIProtos.IIResponseInternal.IIRow currentRow = rows.get(index);
- byte[] columnsBytes = currentRow.getColumns().toByteArray();
+ byte[] columnsBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getColumns());
this.tableRecord.setBytes(columnsBytes, 0, columnsBytes.length);
if (currentRow.hasMeasures()) {
- byte[] measuresBytes = currentRow.getMeasures().toByteArray();
+ byte[] measuresBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getMeasures());
this.measureValues = pushedDownAggregators.deserializeMetricValues(measuresBytes, 0);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e877e835/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
index 5f8fefe..6173241 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
@@ -53,15 +53,15 @@ import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorConstants;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Range;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
@@ -88,7 +88,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES);
if (request.hasTsRange()) {
- Range<Long> tsRange = (Range<Long>) SerializationUtils.deserialize(request.getTsRange().toByteArray());
+ Range<Long> tsRange = (Range<Long>) SerializationUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getTsRange()));
byte[] regionStartKey = region.getStartKey();
if (!ArrayUtils.isEmpty(regionStartKey)) {
shard = BytesUtil.readUnsigned(regionStartKey, 0, IIKeyValueCodec.SHARD_LEN);
@@ -148,15 +148,15 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
innerScanner = region.getScanner(prepareScan(request, region));
- CoprocessorRowType type = CoprocessorRowType.deserialize(request.getType().toByteArray());
- CoprocessorProjector projector = CoprocessorProjector.deserialize(request.getProjector().toByteArray());
- EndpointAggregators aggregators = EndpointAggregators.deserialize(request.getAggregator().toByteArray());
- CoprocessorFilter filter = CoprocessorFilter.deserialize(request.getFilter().toByteArray());
+ CoprocessorRowType type = CoprocessorRowType.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getType()));
+ CoprocessorProjector projector = CoprocessorProjector.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getProjector()));
+ EndpointAggregators aggregators = EndpointAggregators.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getAggregator()));
+ CoprocessorFilter filter = CoprocessorFilter.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getFilter()));
//compression
IIProtos.IIResponseInternal response = getResponse(innerScanner, type, projector, aggregators, filter);
byte[] compressed = CompressionUtils.compress(response.toByteArray());
- IIProtos.IIResponse compressedR = IIProtos.IIResponse.newBuilder().setBlob(ByteString.copyFrom(compressed)).build();
+ IIProtos.IIResponse compressedR = IIProtos.IIResponse.newBuilder().setBlob(HBaseZeroCopyByteString.wrap(compressed)).build();
done.run(compressedR);
} catch (IOException ioe) {
@@ -257,7 +257,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
if (totalByteFormLen >= MEMORY_LIMIT) {
throw new RuntimeException("the query has exceeded the memory limit, please check the query");
}
- IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(ByteString.copyFrom(recordBuffer));
+ IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(recordBuffer));
responseBuilder.addRows(rowBuilder.build());
totalByteFormLen += byteFormLen;
}
@@ -269,9 +269,9 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
if (needAgg) {
for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
AggrKey aggrKey = entry.getKey();
- IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(ByteString.copyFrom(aggrKey.get(), aggrKey.offset(), aggrKey.length()));
+ IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(aggrKey.get(), aggrKey.offset(), aggrKey.length()));
int length = aggregators.serializeMetricValues(entry.getValue(), buffer);
- rowBuilder.setMeasures(ByteString.copyFrom(buffer, 0, length));
+ rowBuilder.setMeasures(HBaseZeroCopyByteString.wrap(buffer, 0, length));
responseBuilder.addRows(rowBuilder.build());
}
}