You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/10/22 10:26:28 UTC
[2/2] incubator-kylin git commit: print stat and avoid loadallcolumns
print stat and avoid loadallcolumns
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9d256bd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9d256bd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9d256bd3
Branch: refs/heads/KYLIN-942
Commit: 9d256bd370d6d2b7a0e5b553fbd3a5faebc5d419
Parents: e6f2e06
Author: honma <ho...@ebay.com>
Authored: Thu Oct 22 16:30:38 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Oct 22 16:30:38 2015 +0800
----------------------------------------------------------------------
.../kylin/common/util/CompressionUtils.java | 6 ++--
.../org/apache/kylin/gridtable/GTRecord.java | 34 ------------------
.../apache/kylin/query/test/KylinTestBase.java | 2 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 37 ++++++++++++++++----
.../hbase/cube/v2/HBaseReadonlyStore.java | 5 ++-
.../coprocessor/endpoint/CubeVisitService.java | 12 ++++---
6 files changed, 43 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9d256bd3/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
index 13abab5..c9838e4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
@@ -45,8 +45,7 @@ public class CompressionUtils {
outputStream.close();
byte[] output = outputStream.toByteArray();
- logger.info("Original: " + data.length + " bytes");
- logger.info("Compressed: " + output.length + " bytes");
+ logger.info("Original: " + data.length + " bytes. " + "Compressed: " + output.length + " bytes ");
return output;
}
@@ -63,8 +62,7 @@ public class CompressionUtils {
outputStream.close();
byte[] output = outputStream.toByteArray();
- logger.info("Original: " + data.length + " bytes");
- logger.info("Decompressed: " + output.length + " bytes");
+ logger.info("Original: " + data.length + " bytes. " + "Decompressed: " + output.length + " bytes");
return output;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9d256bd3/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index 5312308..0f4eb3d 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -228,22 +228,6 @@ public class GTRecord implements Comparable<GTRecord> {
buf.setLength(pos);
}
- /**
- * write data to given buffer, like serialize, UNLIKE other export this will put a prefix indicating null or not.
- * for saving space
- */
- public void exportAllColumns(ByteBuffer buf) {
- for (int i = 0; i < info.colAll.trueBitCount(); i++) {
- int c = info.colAll.trueBitAt(i);
- if (cols[c] == null || cols[c].array() == null) {
- buf.put((byte) 0);
- } else {
- buf.put((byte) 1);
- buf.put(cols[c].array(), cols[c].offset(), cols[c].length());
- }
- }
- }
-
/** write data to given buffer, like serialize */
public void exportColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
for (int i = 0; i < selectedCols.trueBitCount(); i++) {
@@ -269,24 +253,6 @@ public class GTRecord implements Comparable<GTRecord> {
}
/** change pointers to point to data in given buffer, UNLIKE deserialize */
- public void loadAllColumns(ByteBuffer buf) {
- int pos = buf.position();
- for (int i = 0; i < info.colAll.trueBitCount(); i++) {
- int c = info.colAll.trueBitAt(i);
-
- byte exist = buf.get();
- pos++;
-
- if (exist == 1) {
- int len = info.codeSystem.codeLength(c, buf);
- cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
- pos += len;
- buf.position(pos);
- }
- }
- }
-
- /** change pointers to point to data in given buffer, UNLIKE deserialize */
public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
int pos = buf.position();
for (int i = 0; i < selectedCols.trueBitCount(); i++) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9d256bd3/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
index b94542c..0399f8c 100644
--- a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
+++ b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
@@ -208,7 +208,7 @@ public class KylinTestBase {
if (needSort) {
queryTable = new SortedTable(queryTable, columnNames);
}
- printResult(queryTable);
+ //printResult(queryTable);
return queryTable;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9d256bd3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 5a91537..bb2a18a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -48,6 +48,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats;
import org.apache.kylin.storage.hbase.steps.HBaseConnection;
import com.google.common.base.Function;
@@ -61,10 +62,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
static class EndpointResultsAsGTScanner implements IGTScanner {
private GTInfo info;
private Iterator<byte[]> blocks;
+ private ImmutableBitSet columns;
- public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks) {
+ public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns) {
this.info = info;
this.blocks = blocks;
+ this.columns = columns;
}
@Override
@@ -89,7 +92,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
@Override
public Iterator<GTRecord> apply(@Nullable final byte[] input) {
- logger.info("Reassembling a raw block returned from Endpoint with byte length: " + input.length);
return new Iterator<GTRecord>() {
private ByteBuffer inputBuffer = null;
private GTRecord oneRecord = null;
@@ -106,7 +108,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
@Override
public GTRecord next() {
- oneRecord.loadAllColumns(inputBuffer);
+ oneRecord.loadColumns(columns, inputBuffer);
return oneRecord;
}
@@ -151,7 +153,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
}
- for (final RawScan rawScan : rawScans) {
+ for (int i = 0; i < rawScans.size(); ++i) {
+ final int shardIndex = i;
+ final RawScan rawScan = rawScans.get(i);
+
executorService.submit(new Runnable() {
@Override
public void run() {
@@ -169,6 +174,15 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
throw new RuntimeException("Error when visiting cubes by endpoint:", throwable);
}
+ //results.size() supposed to be 1;
+ if (results.size() != 1) {
+ logger.warn("{} CubeVisitResponse returned for shard {}", results.size(), shardIndex);
+ }
+
+ for (CubeVisitProtos.CubeVisitResponse result : results) {
+ logger.info(getStatsString(result, shardIndex));
+ }
+
Collection<byte[]> part = Collections2.transform(results, new Function<CubeVisitProtos.CubeVisitResponse, byte[]>() {
@Nullable
@Override
@@ -193,7 +207,18 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
throw new RuntimeException("Visiting cube by endpoint gets interrupted");
}
- return new EndpointResultsAsGTScanner(fullGTInfo, rowBlocks.iterator());
+ return new EndpointResultsAsGTScanner(fullGTInfo, rowBlocks.iterator(), scanRequest.getColumns());
+ }
+
+ private String getStatsString(CubeVisitProtos.CubeVisitResponse result, int shardIndex) {
+ StringBuilder sb = new StringBuilder();
+ Stats stats = result.getStats();
+ sb.append("Shard " + shardIndex + ": ");
+ sb.append("Total scanned row: " + stats.getScannedRowCount() + ". ");
+ sb.append("Total filtered/aggred row: " + stats.getAggregatedRowCount() + ". ");
+ sb.append("Time elapsed in EP: " + (stats.getServiceEndTime() - stats.getServiceStartTime()) + "(ms). ");
+ return sb.toString();
+
}
//TODO : async callback ?
@@ -211,8 +236,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}
});
- logger.info("{} regions returned results ", results.values().size());
-
return results.values();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9d256bd3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index d0551bb..7731f19 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -100,16 +100,15 @@ public class HBaseReadonlyStore implements IGTStore {
@Override
public GTRecord next() {
+ count++;
List<Cell> oneRow = cellListIterator.next();
if (oneRow.size() < 1) {
throw new IllegalStateException("cell list's size less than 1");
}
- ByteBuffer buf;
-
// dimensions, set to primary key, also the 0th column block
Cell firstCell = oneRow.get(0);
- buf = byteBuffer(firstCell.getRowArray(), RowConstants.ROWKEY_HEADER_LEN + firstCell.getRowOffset(), firstCell.getRowLength() - RowConstants.ROWKEY_HEADER_LEN);
+ ByteBuffer buf = byteBuffer(firstCell.getRowArray(), RowConstants.ROWKEY_HEADER_LEN + firstCell.getRowOffset(), firstCell.getRowLength() - RowConstants.ROWKEY_HEADER_LEN);
oneRecord.loadCellBlock(0, buf);
// metrics
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9d256bd3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index c753911..5e14474 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -146,12 +146,16 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
IGTScanner finalScanner = scanReq.decorateScanner(rawScanner);
ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow
+ int finalRowCount = 0;
for (GTRecord oneRecord : finalScanner) {
buffer.clear();
- oneRecord.exportAllColumns(buffer);
+ oneRecord.exportColumns(scanReq.getColumns(), buffer);
buffer.flip();
+
outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining());
+ finalRowCount++;
}
//outputStream.close() is not necessary
byte[] allRows = outputStream.toByteArray();
@@ -159,8 +163,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
done.run(responseBuilder.//
setCompressedRows(ByteString.copyFrom(CompressionUtils.compress(allRows))).//too many array copies
setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().//
- setAggregatedRowCount(0).//
- setScannedRowCount(0).//
+ setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).//
+ setScannedRowCount(finalScanner.getScannedRowCount()).//
setServiceStartTime(serviceStartTime).//
setServiceEndTime(System.currentTimeMillis()).build()).//
build());