You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/11/05 08:29:40 UTC
[1/3] incubator-kylin git commit: KYLIN-942 append more server stats
in endpoint response
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-1126 [created] ad3622e12
KYLIN-942 append more server stats in endpoint response
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9ac673a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9ac673a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9ac673a2
Branch: refs/heads/KYLIN-1126
Commit: 9ac673a2ec946f1e9d232b6e32bffb5a43fd702b
Parents: cae8a65
Author: honma <ho...@ebay.com>
Authored: Wed Nov 4 16:51:02 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Nov 4 16:51:02 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/BasicTest.java | 13 +-
.../hbase/cube/v1/CubeSegmentTupleIterator.java | 1 -
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 3 +-
.../coprocessor/endpoint/CubeVisitService.java | 16 +-
.../endpoint/generated/CubeVisitProtos.java | 457 ++++++++++++++++++-
.../endpoint/protobuf/CubeVisit.proto | 4 +
6 files changed, 477 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9ac673a2/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index c60f007..259f977 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -19,6 +19,7 @@
package org.apache.kylin.common.util;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.TreeMultiset;
+import com.sun.management.OperatingSystemMXBean;
/**
* <p/>
@@ -72,10 +74,13 @@ public class BasicTest {
}
@Test
- public void testxx() {
- B b= new B();
- b.foo();;
-
+ public void testxx() throws InterruptedException {
+ while (true) {
+ OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
+ System.out.println(operatingSystemMXBean.getSystemCpuLoad());
+ System.out.println(operatingSystemMXBean.getFreePhysicalMemorySize());
+ Thread.sleep(1000);
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9ac673a2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 034ffac..22f7017 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -254,7 +254,6 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
flushScanCountDelta();
if (logger.isDebugEnabled() && scan != null) {
- logger.debug("Scan " + scan.toString());
byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
if (metricsBytes != null) {
ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(metricsBytes);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9ac673a2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 6ad30e2..a8894a1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -227,10 +227,11 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
private String getStatsString(CubeVisitProtos.CubeVisitResponse result, int shardIndex) {
StringBuilder sb = new StringBuilder();
Stats stats = result.getStats();
- sb.append("Shard " + shardIndex + ": ");
+ sb.append("Shard " + shardIndex + " on host: " + stats.getHostname());
sb.append("Total scanned row: " + stats.getScannedRowCount() + ". ");
sb.append("Total filtered/aggred row: " + stats.getAggregatedRowCount() + ". ");
sb.append("Time elapsed in EP: " + (stats.getServiceEndTime() - stats.getServiceStartTime()) + "(ms). ");
+ sb.append("Server CPU usage: " + stats.getSystemCpuLoad() + ", server physical mem left: " + stats.getFreePhysicalMemorySize() + ", server swap mem left:" + stats.getFreeSwapSpaceSize());
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9ac673a2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index f474139..31d1901 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -20,6 +20,8 @@ package org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
@@ -56,6 +58,7 @@ import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
+import com.sun.management.OperatingSystemMXBean;
@SuppressWarnings("unused")
//used in hbase endpoint
@@ -165,6 +168,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining());
finalRowCount++;
}
+
+ OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
+ double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
+ double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize();
+ double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize();
+
//outputStream.close() is not necessary
byte[] allRows = outputStream.toByteArray();
CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
@@ -174,7 +183,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).//
setScannedRowCount(finalScanner.getScannedRowCount()).//
setServiceStartTime(serviceStartTime).//
- setServiceEndTime(System.currentTimeMillis()).build()).//
+ setServiceEndTime(System.currentTimeMillis()).//
+ setSystemCpuLoad(systemCpuLoad).//
+ setFreePhysicalMemorySize(freePhysicalMemorySize).//
+ setFreeSwapSpaceSize(freeSwapSpaceSize).//
+ setHostname(InetAddress.getLocalHost().getHostName()).//
+ build()).//
build());
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9ac673a2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index 0923484..4e69dcc 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -1776,6 +1776,51 @@ public final class CubeVisitProtos {
* <code>optional int32 aggregatedRowCount = 4;</code>
*/
int getAggregatedRowCount();
+
+ // optional double systemCpuLoad = 5;
+ /**
+ * <code>optional double systemCpuLoad = 5;</code>
+ */
+ boolean hasSystemCpuLoad();
+ /**
+ * <code>optional double systemCpuLoad = 5;</code>
+ */
+ double getSystemCpuLoad();
+
+ // optional double freePhysicalMemorySize = 6;
+ /**
+ * <code>optional double freePhysicalMemorySize = 6;</code>
+ */
+ boolean hasFreePhysicalMemorySize();
+ /**
+ * <code>optional double freePhysicalMemorySize = 6;</code>
+ */
+ double getFreePhysicalMemorySize();
+
+ // optional double freeSwapSpaceSize = 7;
+ /**
+ * <code>optional double freeSwapSpaceSize = 7;</code>
+ */
+ boolean hasFreeSwapSpaceSize();
+ /**
+ * <code>optional double freeSwapSpaceSize = 7;</code>
+ */
+ double getFreeSwapSpaceSize();
+
+ // optional string hostname = 8;
+ /**
+ * <code>optional string hostname = 8;</code>
+ */
+ boolean hasHostname();
+ /**
+ * <code>optional string hostname = 8;</code>
+ */
+ java.lang.String getHostname();
+ /**
+ * <code>optional string hostname = 8;</code>
+ */
+ com.google.protobuf.ByteString
+ getHostnameBytes();
}
/**
* Protobuf type {@code CubeVisitResponse.Stats}
@@ -1848,6 +1893,26 @@ public final class CubeVisitProtos {
aggregatedRowCount_ = input.readInt32();
break;
}
+ case 41: {
+ bitField0_ |= 0x00000010;
+ systemCpuLoad_ = input.readDouble();
+ break;
+ }
+ case 49: {
+ bitField0_ |= 0x00000020;
+ freePhysicalMemorySize_ = input.readDouble();
+ break;
+ }
+ case 57: {
+ bitField0_ |= 0x00000040;
+ freeSwapSpaceSize_ = input.readDouble();
+ break;
+ }
+ case 66: {
+ bitField0_ |= 0x00000080;
+ hostname_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1952,11 +2017,106 @@ public final class CubeVisitProtos {
return aggregatedRowCount_;
}
+ // optional double systemCpuLoad = 5;
+ public static final int SYSTEMCPULOAD_FIELD_NUMBER = 5;
+ private double systemCpuLoad_;
+ /**
+ * <code>optional double systemCpuLoad = 5;</code>
+ */
+ public boolean hasSystemCpuLoad() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * <code>optional double systemCpuLoad = 5;</code>
+ */
+ public double getSystemCpuLoad() {
+ return systemCpuLoad_;
+ }
+
+ // optional double freePhysicalMemorySize = 6;
+ public static final int FREEPHYSICALMEMORYSIZE_FIELD_NUMBER = 6;
+ private double freePhysicalMemorySize_;
+ /**
+ * <code>optional double freePhysicalMemorySize = 6;</code>
+ */
+ public boolean hasFreePhysicalMemorySize() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional double freePhysicalMemorySize = 6;</code>
+ */
+ public double getFreePhysicalMemorySize() {
+ return freePhysicalMemorySize_;
+ }
+
+ // optional double freeSwapSpaceSize = 7;
+ public static final int FREESWAPSPACESIZE_FIELD_NUMBER = 7;
+ private double freeSwapSpaceSize_;
+ /**
+ * <code>optional double freeSwapSpaceSize = 7;</code>
+ */
+ public boolean hasFreeSwapSpaceSize() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * <code>optional double freeSwapSpaceSize = 7;</code>
+ */
+ public double getFreeSwapSpaceSize() {
+ return freeSwapSpaceSize_;
+ }
+
+ // optional string hostname = 8;
+ public static final int HOSTNAME_FIELD_NUMBER = 8;
+ private java.lang.Object hostname_;
+ /**
+ * <code>optional string hostname = 8;</code>
+ */
+ public boolean hasHostname() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional string hostname = 8;</code>
+ */
+ public java.lang.String getHostname() {
+ java.lang.Object ref = hostname_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ hostname_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string hostname = 8;</code>
+ */
+ public com.google.protobuf.ByteString
+ getHostnameBytes() {
+ java.lang.Object ref = hostname_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ hostname_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
serviceStartTime_ = 0L;
serviceEndTime_ = 0L;
scannedRowCount_ = 0;
aggregatedRowCount_ = 0;
+ systemCpuLoad_ = 0D;
+ freePhysicalMemorySize_ = 0D;
+ freeSwapSpaceSize_ = 0D;
+ hostname_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -1982,6 +2142,18 @@ public final class CubeVisitProtos {
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeInt32(4, aggregatedRowCount_);
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeDouble(5, systemCpuLoad_);
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeDouble(6, freePhysicalMemorySize_);
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ output.writeDouble(7, freeSwapSpaceSize_);
+ }
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ output.writeBytes(8, getHostnameBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -2007,6 +2179,22 @@ public final class CubeVisitProtos {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(4, aggregatedRowCount_);
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeDoubleSize(5, systemCpuLoad_);
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeDoubleSize(6, freePhysicalMemorySize_);
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeDoubleSize(7, freeSwapSpaceSize_);
+ }
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(8, getHostnameBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -2050,6 +2238,23 @@ public final class CubeVisitProtos {
result = result && (getAggregatedRowCount()
== other.getAggregatedRowCount());
}
+ result = result && (hasSystemCpuLoad() == other.hasSystemCpuLoad());
+ if (hasSystemCpuLoad()) {
+ result = result && (Double.doubleToLongBits(getSystemCpuLoad()) == Double.doubleToLongBits(other.getSystemCpuLoad()));
+ }
+ result = result && (hasFreePhysicalMemorySize() == other.hasFreePhysicalMemorySize());
+ if (hasFreePhysicalMemorySize()) {
+ result = result && (Double.doubleToLongBits(getFreePhysicalMemorySize()) == Double.doubleToLongBits(other.getFreePhysicalMemorySize()));
+ }
+ result = result && (hasFreeSwapSpaceSize() == other.hasFreeSwapSpaceSize());
+ if (hasFreeSwapSpaceSize()) {
+ result = result && (Double.doubleToLongBits(getFreeSwapSpaceSize()) == Double.doubleToLongBits(other.getFreeSwapSpaceSize()));
+ }
+ result = result && (hasHostname() == other.hasHostname());
+ if (hasHostname()) {
+ result = result && getHostname()
+ .equals(other.getHostname());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -2079,6 +2284,25 @@ public final class CubeVisitProtos {
hash = (37 * hash) + AGGREGATEDROWCOUNT_FIELD_NUMBER;
hash = (53 * hash) + getAggregatedRowCount();
}
+ if (hasSystemCpuLoad()) {
+ hash = (37 * hash) + SYSTEMCPULOAD_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(
+ Double.doubleToLongBits(getSystemCpuLoad()));
+ }
+ if (hasFreePhysicalMemorySize()) {
+ hash = (37 * hash) + FREEPHYSICALMEMORYSIZE_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(
+ Double.doubleToLongBits(getFreePhysicalMemorySize()));
+ }
+ if (hasFreeSwapSpaceSize()) {
+ hash = (37 * hash) + FREESWAPSPACESIZE_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(
+ Double.doubleToLongBits(getFreeSwapSpaceSize()));
+ }
+ if (hasHostname()) {
+ hash = (37 * hash) + HOSTNAME_FIELD_NUMBER;
+ hash = (53 * hash) + getHostname().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -2196,6 +2420,14 @@ public final class CubeVisitProtos {
bitField0_ = (bitField0_ & ~0x00000004);
aggregatedRowCount_ = 0;
bitField0_ = (bitField0_ & ~0x00000008);
+ systemCpuLoad_ = 0D;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ freePhysicalMemorySize_ = 0D;
+ bitField0_ = (bitField0_ & ~0x00000020);
+ freeSwapSpaceSize_ = 0D;
+ bitField0_ = (bitField0_ & ~0x00000040);
+ hostname_ = "";
+ bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
@@ -2240,6 +2472,22 @@ public final class CubeVisitProtos {
to_bitField0_ |= 0x00000008;
}
result.aggregatedRowCount_ = aggregatedRowCount_;
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.systemCpuLoad_ = systemCpuLoad_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.freePhysicalMemorySize_ = freePhysicalMemorySize_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000040;
+ }
+ result.freeSwapSpaceSize_ = freeSwapSpaceSize_;
+ if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+ to_bitField0_ |= 0x00000080;
+ }
+ result.hostname_ = hostname_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -2268,6 +2516,20 @@ public final class CubeVisitProtos {
if (other.hasAggregatedRowCount()) {
setAggregatedRowCount(other.getAggregatedRowCount());
}
+ if (other.hasSystemCpuLoad()) {
+ setSystemCpuLoad(other.getSystemCpuLoad());
+ }
+ if (other.hasFreePhysicalMemorySize()) {
+ setFreePhysicalMemorySize(other.getFreePhysicalMemorySize());
+ }
+ if (other.hasFreeSwapSpaceSize()) {
+ setFreeSwapSpaceSize(other.getFreeSwapSpaceSize());
+ }
+ if (other.hasHostname()) {
+ bitField0_ |= 0x00000080;
+ hostname_ = other.hostname_;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -2427,6 +2689,179 @@ public final class CubeVisitProtos {
return this;
}
+ // optional double systemCpuLoad = 5;
+ private double systemCpuLoad_ ;
+ /**
+ * <code>optional double systemCpuLoad = 5;</code>
+ */
+ public boolean hasSystemCpuLoad() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * <code>optional double systemCpuLoad = 5;</code>
+ */
+ public double getSystemCpuLoad() {
+ return systemCpuLoad_;
+ }
+ /**
+ * <code>optional double systemCpuLoad = 5;</code>
+ */
+ public Builder setSystemCpuLoad(double value) {
+ bitField0_ |= 0x00000010;
+ systemCpuLoad_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional double systemCpuLoad = 5;</code>
+ */
+ public Builder clearSystemCpuLoad() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ systemCpuLoad_ = 0D;
+ onChanged();
+ return this;
+ }
+
+ // optional double freePhysicalMemorySize = 6;
+ private double freePhysicalMemorySize_ ;
+ /**
+ * <code>optional double freePhysicalMemorySize = 6;</code>
+ */
+ public boolean hasFreePhysicalMemorySize() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional double freePhysicalMemorySize = 6;</code>
+ */
+ public double getFreePhysicalMemorySize() {
+ return freePhysicalMemorySize_;
+ }
+ /**
+ * <code>optional double freePhysicalMemorySize = 6;</code>
+ */
+ public Builder setFreePhysicalMemorySize(double value) {
+ bitField0_ |= 0x00000020;
+ freePhysicalMemorySize_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional double freePhysicalMemorySize = 6;</code>
+ */
+ public Builder clearFreePhysicalMemorySize() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ freePhysicalMemorySize_ = 0D;
+ onChanged();
+ return this;
+ }
+
+ // optional double freeSwapSpaceSize = 7;
+ private double freeSwapSpaceSize_ ;
+ /**
+ * <code>optional double freeSwapSpaceSize = 7;</code>
+ */
+ public boolean hasFreeSwapSpaceSize() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * <code>optional double freeSwapSpaceSize = 7;</code>
+ */
+ public double getFreeSwapSpaceSize() {
+ return freeSwapSpaceSize_;
+ }
+ /**
+ * <code>optional double freeSwapSpaceSize = 7;</code>
+ */
+ public Builder setFreeSwapSpaceSize(double value) {
+ bitField0_ |= 0x00000040;
+ freeSwapSpaceSize_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional double freeSwapSpaceSize = 7;</code>
+ */
+ public Builder clearFreeSwapSpaceSize() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ freeSwapSpaceSize_ = 0D;
+ onChanged();
+ return this;
+ }
+
+ // optional string hostname = 8;
+ private java.lang.Object hostname_ = "";
+ /**
+ * <code>optional string hostname = 8;</code>
+ */
+ public boolean hasHostname() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional string hostname = 8;</code>
+ */
+ public java.lang.String getHostname() {
+ java.lang.Object ref = hostname_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ hostname_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string hostname = 8;</code>
+ */
+ public com.google.protobuf.ByteString
+ getHostnameBytes() {
+ java.lang.Object ref = hostname_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ hostname_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string hostname = 8;</code>
+ */
+ public Builder setHostname(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000080;
+ hostname_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string hostname = 8;</code>
+ */
+ public Builder clearHostname() {
+ bitField0_ = (bitField0_ & ~0x00000080);
+ hostname_ = getDefaultInstance().getHostname();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string hostname = 8;</code>
+ */
+ public Builder setHostnameBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000080;
+ hostname_ = value;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:CubeVisitResponse.Stats)
}
@@ -3224,17 +3659,19 @@ public final class CubeVisitProtos {
"ubeVisitRequest\022\020\n\010behavior\030\001 \002(\t\022\025\n\rgtS" +
"canRequest\030\002 \002(\014\022\024\n\014hbaseRawScan\030\003 \002(\014\0223" +
"\n\020hbaseColumnsToGT\030\004 \003(\0132\031.CubeVisitRequ" +
- "est.IntList\032\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\304\001\n" +
+ "est.IntList\032\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\251\002\n" +
"\021CubeVisitResponse\022\026\n\016compressedRows\030\001 \002" +
"(\014\022\'\n\005stats\030\002 \002(\0132\030.CubeVisitResponse.St" +
- "ats\032n\n\005Stats\022\030\n\020serviceStartTime\030\001 \001(\003\022\026",
- "\n\016serviceEndTime\030\002 \001(\003\022\027\n\017scannedRowCoun" +
- "t\030\003 \001(\005\022\032\n\022aggregatedRowCount\030\004 \001(\0052F\n\020C" +
- "ubeVisitService\0222\n\tvisitCube\022\021.CubeVisit" +
- "Request\032\022.CubeVisitResponseB`\nEorg.apach" +
- "e.kylin.storage.hbase.cube.v2.coprocesso" +
- "r.endpoint.generatedB\017CubeVisitProtosH\001\210" +
- "\001\001\240\001\001"
+ "ats\032\322\001\n\005Stats\022\030\n\020serviceStartTime\030\001 \001(\003\022",
+ "\026\n\016serviceEndTime\030\002 \001(\003\022\027\n\017scannedRowCou" +
+ "nt\030\003 \001(\005\022\032\n\022aggregatedRowCount\030\004 \001(\005\022\025\n\r" +
+ "systemCpuLoad\030\005 \001(\001\022\036\n\026freePhysicalMemor" +
+ "ySize\030\006 \001(\001\022\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020" +
+ "\n\010hostname\030\010 \001(\t2F\n\020CubeVisitService\0222\n\t" +
+ "visitCube\022\021.CubeVisitRequest\032\022.CubeVisit" +
+ "ResponseB`\nEorg.apache.kylin.storage.hba" +
+ "se.cube.v2.coprocessor.endpoint.generate" +
+ "dB\017CubeVisitProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3264,7 +3701,7 @@ public final class CubeVisitProtos {
internal_static_CubeVisitResponse_Stats_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CubeVisitResponse_Stats_descriptor,
- new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", });
+ new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", });
return null;
}
};
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9ac673a2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index 4ac6414..a6ad308 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -27,6 +27,10 @@ message CubeVisitResponse {
optional int64 serviceEndTime = 2;
optional int32 scannedRowCount = 3;
optional int32 aggregatedRowCount = 4;
+ optional double systemCpuLoad = 5;
+ optional double freePhysicalMemorySize = 6;
+ optional double freeSwapSpaceSize = 7;
+ optional string hostname = 8;
}
required bytes compressedRows = 1;
required Stats stats = 2;
[2/3] incubator-kylin git commit: next step: enable rowkeyencoder
deal with gtrecord
Posted by ma...@apache.org.
next step: enable rowkeyencoder deal with gtrecord
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6a9c7413
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6a9c7413
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6a9c7413
Branch: refs/heads/KYLIN-1126
Commit: 6a9c741383d607e6cb7e5d552c8377f7c29c0db7
Parents: 9ac673a
Author: honma <ho...@ebay.com>
Authored: Thu Nov 5 11:12:58 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Nov 5 11:12:58 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeSegment.java | 18 +++---
.../kylin/cube/kv/AbstractRowKeyEncoder.java | 16 ++----
.../apache/kylin/cube/kv/FuzzyMaskEncoder.java | 12 ++--
.../org/apache/kylin/cube/kv/RowConstants.java | 1 +
.../org/apache/kylin/cube/kv/RowKeyEncoder.java | 59 +++++++++++---------
.../coprocessor/CoprocessorProjector.java | 5 +-
6 files changed, 60 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 1a44fcf..bd36dfa 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -69,6 +69,8 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
private String lastBuildJobID;
@JsonProperty("create_time_utc")
private long createTimeUTC;
+ @JsonProperty("enable_sharding")
+ private boolean enableSharding = true;
@JsonProperty("cuboid_shard_nums")
private Map<Long, Short> cuboidShardNums = Maps.newHashMap();
@JsonProperty("total_shards")
@@ -368,6 +370,14 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
return cubeInstance.getStorageType();
}
+ public boolean isEnableSharding() {
+ return enableSharding;
+ }
+
+ public void setEnableSharding(boolean enableSharding) {
+ this.enableSharding = enableSharding;
+ }
+
/**
* get the number of shards where each cuboid will distribute
* @return
@@ -381,14 +391,6 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
}
}
- // /**
- // * get the number of shards where each cuboid will distribute
- // * @return
- // */
- // public Map<Long, Short> getCuboidShards() {
- // return this.cuboidShards;
- // }
-
public void setCuboidShardNums(Map<Long, Short> newCuboidShards) {
this.cuboidShardNums = newCuboidShards;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
index 231f737..b7de983 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -34,30 +34,26 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractRowKeyEncoder {
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractRowKeyEncoder.class);
public static final byte DEFAULT_BLANK_BYTE = Dictionary.NULL;
- protected static final Logger logger = LoggerFactory.getLogger(AbstractRowKeyEncoder.class);
+ protected final Cuboid cuboid;
+ protected final CubeSegment cubeSeg;
+ protected byte blankByte = DEFAULT_BLANK_BYTE;
public static AbstractRowKeyEncoder createInstance(CubeSegment cubeSeg, Cuboid cuboid) {
return new RowKeyEncoder(cubeSeg, cuboid);
}
- protected final Cuboid cuboid;
- protected byte blankByte = DEFAULT_BLANK_BYTE;
- protected boolean encodeShard = true;
-
- protected AbstractRowKeyEncoder(Cuboid cuboid) {
+ protected AbstractRowKeyEncoder(CubeSegment cubeSeg,Cuboid cuboid) {
this.cuboid = cuboid;
+ this.cubeSeg = cubeSeg;
}
public void setBlankByte(byte blankByte) {
this.blankByte = blankByte;
}
- public void setEncodeShard(boolean encodeShard) {
- this.encodeShard = encodeShard;
- }
-
abstract public byte[] encode(Map<TblColRef, String> valueMap);
abstract public byte[] encode(byte[][] values);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
index bf67538..7748e8c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -36,11 +36,15 @@ public class FuzzyMaskEncoder extends RowKeyEncoder {
}
@Override
- protected int fillHeader(byte[] bytes) {
- Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);
+ protected void fillHeader(byte[] bytes) {
+ int offset = 0;
+ if (enableSharding) {
+ Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);
+ offset += RowConstants.ROWKEY_SHARDID_LEN;
+ }
// always fuzzy match cuboid ID to lock on the selected cuboid
- Arrays.fill(bytes, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN, RowConstants.BYTE_ZERO);
- return this.headerLength;
+ int headerLength = this.getHeaderLength();
+ Arrays.fill(bytes, offset, headerLength, RowConstants.BYTE_ZERO);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
index 6a8eeb5..09bccc3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -33,6 +33,7 @@ public class RowConstants {
public static final int ROWKEY_SHARDID_LEN = 2;
public static final int ROWKEY_HEADER_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN;
+ public static final int ROWKEY_SHARD_AND_CUBOID_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN;
public static final byte BYTE_ZERO = 0;
public static final byte BYTE_ONE = 1;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index 0676df6..059288a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -32,18 +32,35 @@ import org.apache.kylin.metadata.model.TblColRef;
public class RowKeyEncoder extends AbstractRowKeyEncoder {
- private int bytesLength;
- protected int headerLength;
+ private int bodyLength = 0;
private RowKeyColumnIO colIO;
- CubeSegment cubeSeg;
+ protected boolean enableSharding;
protected RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
- super(cuboid);
- this.cubeSeg = cubeSeg;
+ super(cubeSeg, cuboid);
+ enableSharding = cubeSeg.isEnableSharding();
colIO = new RowKeyColumnIO(cubeSeg);
- bytesLength = headerLength = RowConstants.ROWKEY_HEADER_LEN; // include shard and cuboidid
for (TblColRef column : cuboid.getColumns()) {
- bytesLength += colIO.getColumnLength(column);
+ bodyLength += colIO.getColumnLength(column);
+ }
+ }
+
+ protected int getHeaderLength() {
+ return enableSharding ? RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN : RowConstants.ROWKEY_CUBOIDID_LEN;
+ }
+
+ protected int getBytesLength() {
+ return getHeaderLength() + bodyLength;
+ }
+
+ protected short calculateShard(byte[] key) {
+ if (enableSharding) {
+ int bodyOffset = RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN;
+ short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+ short shardOffset = ShardingHash.getShard(key, bodyOffset, bodyLength, cuboidShardNum);
+ return ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
+ } else {
+ throw new RuntimeException("If enableSharding false, you should never caculate shard");
}
}
@@ -71,9 +88,8 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
@Override
public byte[] encode(byte[][] values) {
- byte[] bytes = new byte[this.bytesLength];
- int bodyOffset = RowConstants.ROWKEY_HEADER_LEN;
- int offset = bodyOffset;
+ byte[] bytes = new byte[this.getBytesLength()];
+ int offset = getHeaderLength();
for (int i = 0; i < cuboid.getColumns().size(); i++) {
TblColRef column = cuboid.getColumns().get(i);
@@ -93,27 +109,18 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
return bytes;
}
- protected int fillHeader(byte[] bytes) {
+ protected void fillHeader(byte[] bytes) {
int offset = 0;
- if (encodeShard) {
- short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
- short shardOffset = ShardingHash.getShard(bytes, RowConstants.ROWKEY_HEADER_LEN, bytes.length - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum);
- short finalShard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
- BytesUtil.writeShort(finalShard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
- } else {
- BytesUtil.writeShort((short) 0, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+ if (enableSharding) {
+ short shard = calculateShard(bytes);
+ BytesUtil.writeShort(shard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+ offset += RowConstants.ROWKEY_SHARDID_LEN;
}
- offset += RowConstants.ROWKEY_SHARDID_LEN;
System.arraycopy(cuboid.getBytes(), 0, bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
- offset += RowConstants.ROWKEY_CUBOIDID_LEN;
-
- if (this.headerLength != offset) {
- throw new IllegalStateException("Expected header length is " + headerLength + ". But the offset is " + offset);
- }
-
- return offset;
+ //offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+ //return offset;
}
protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6a9c7413/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
index 9b839c3..c37b2f4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
@@ -41,9 +41,8 @@ public class CoprocessorProjector {
RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) {
@Override
- protected int fillHeader(byte[] bytes) {
- Arrays.fill(bytes, 0, this.headerLength, (byte) 0xff);
- return this.headerLength;
+ protected void fillHeader(byte[] bytes) {
+ Arrays.fill(bytes, 0, this.getHeaderLength(), (byte) 0xff);
}
@Override
[3/3] incubator-kylin git commit: pending for test
Posted by ma...@apache.org.
pending for test
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ad3622e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ad3622e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ad3622e1
Branch: refs/heads/KYLIN-1126
Commit: ad3622e12bbe71300b3b36e4802dd3bdccfee878
Parents: 6a9c741
Author: honma <ho...@ebay.com>
Authored: Thu Nov 5 15:33:24 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Nov 5 15:33:24 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeSegment.java | 5 +
.../org/apache/kylin/cube/cuboid/Cuboid.java | 18 +-
.../kylin/cube/kv/AbstractRowKeyEncoder.java | 18 +-
.../apache/kylin/cube/kv/FuzzyKeyEncoder.java | 17 +-
.../apache/kylin/cube/kv/FuzzyMaskEncoder.java | 29 +++
.../apache/kylin/cube/kv/LazyRowKeyEncoder.java | 67 ++++++
.../org/apache/kylin/cube/kv/RowKeyEncoder.java | 30 ++-
.../org/apache/kylin/gridtable/GTRecord.java | 20 ++
.../kylin/gridtable/GTScanRangePlanner.java | 2 +-
.../kylin/storage/translate/HBaseKeyRange.java | 8 +-
.../mr/steps/MapContextGTRecordWriter.java | 35 +--
.../common/coprocessor/CoprocessorRowType.java | 17 +-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 100 ++++----
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 2 +-
.../hbase/cube/v2/HBaseReadonlyStore.java | 7 +-
.../kylin/storage/hbase/cube/v2/HBaseScan.java | 88 -------
.../coprocessor/endpoint/CubeVisitService.java | 2 +-
.../endpoint/generated/CubeVisitProtos.java | 227 +++++++++++++------
.../endpoint/protobuf/CubeVisit.proto | 3 +-
.../storage/hbase/steps/CreateHTableJob.java | 14 +-
.../storage/hbase/steps/HBaseCuboidWriter.java | 45 ++--
.../hbase/steps/HBaseStreamingOutput.java | 6 +
.../observer/AggregateRegionObserverTest.java | 2 +-
23 files changed, 446 insertions(+), 316 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index bd36dfa..1a34596 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.ShardingHash;
+import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.IDictionaryAware;
@@ -378,6 +379,10 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
this.enableSharding = enableSharding;
}
+ public int getRowKeyPreambleSize() {
+ return enableSharding ? RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN : RowConstants.ROWKEY_CUBOIDID_LEN;
+ }
+
/**
* get the number of shards where each cuboid will distribute
* @return
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index 2c8680d..d7e7d9c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -18,13 +18,23 @@
package org.apache.kylin.cube.cuboid;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
-import org.apache.kylin.cube.model.*;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.cube.model.HierarchyDesc;
+import org.apache.kylin.cube.model.RowKeyColDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.cube.model.RowKeyDesc.AggrGroupMask;
import org.apache.kylin.cube.model.RowKeyDesc.HierarchyMask;
import org.apache.kylin.metadata.model.TblColRef;
@@ -147,7 +157,7 @@ public class Cuboid implements Comparable<Cuboid> {
return cuboidID;
} else {
// no column (except mandatory), add one column
- long toAddCol = (1 << (BitSet.valueOf(new long[]{rowkey.getTailMask()}).cardinality()));
+ long toAddCol = (1 << (BitSet.valueOf(new long[] { rowkey.getTailMask() }).cardinality()));
// check if the toAddCol belongs to any hierarchy
List<HierarchyMask> hierarchyMaskList = rowkey.getHierarchyMasks();
if (hierarchyMaskList != null && hierarchyMaskList.size() > 0) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
index b7de983..c9a304e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -20,9 +20,11 @@ package org.apache.kylin.cube.kv;
import java.util.Map;
+import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +47,7 @@ public abstract class AbstractRowKeyEncoder {
return new RowKeyEncoder(cubeSeg, cuboid);
}
- protected AbstractRowKeyEncoder(CubeSegment cubeSeg,Cuboid cuboid) {
+ protected AbstractRowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
this.cuboid = cuboid;
this.cubeSeg = cubeSeg;
}
@@ -54,6 +56,20 @@ public abstract class AbstractRowKeyEncoder {
this.blankByte = blankByte;
}
+ public long getCuboidID() {
+ return cuboid.getId();
+ }
+
+ abstract public byte[] createBuf();
+
+ /**
+ * encode a gtrecord into a given byte[] buffer
+ * @param record
+ * @param keyColumns
+ * @param buf
+ */
+ abstract public void encode(GTRecord record, ImmutableBitSet keyColumns, byte[] buf);
+
abstract public byte[] encode(Map<TblColRef, String> valueMap);
abstract public byte[] encode(byte[][] values);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
index 2185bc5..9da8ff5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
@@ -18,8 +18,6 @@
package org.apache.kylin.cube.kv;
-import java.util.Arrays;
-
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -35,9 +33,16 @@ public class FuzzyKeyEncoder extends RowKeyEncoder {
}
@Override
- protected byte[] defaultValue(int length) {
- byte[] keyBytes = new byte[length];
- Arrays.fill(keyBytes, RowConstants.BYTE_ZERO);
- return keyBytes;
+ protected short calculateShard(byte[] key) {
+ if (enableSharding) {
+ return 0;
+ } else {
+ throw new RuntimeException("If enableSharding false, you should never calculate shard");
+ }
+ }
+
+ @Override
+ protected byte defaultValue() {
+ return RowConstants.BYTE_ZERO;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
index 7748e8c..94db94b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -20,8 +20,12 @@ package org.apache.kylin.cube.kv;
import java.util.Arrays;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.model.TblColRef;
/**
@@ -36,6 +40,31 @@ public class FuzzyMaskEncoder extends RowKeyEncoder {
}
@Override
+ public void encode(GTRecord record, ImmutableBitSet keyColumns, byte[] buf) {
+ ByteArray byteArray = new ByteArray(buf, getHeaderLength(), 0);
+
+ GTInfo info = record.getInfo();
+ byte fill;
+ int pos = 0;
+ for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) {
+ int c = info.getPrimaryKey().trueBitAt(i);
+ int colLength = info.getCodeSystem().maxCodeLength(c);
+
+ if (record.get(c).array() != null) {
+ fill = RowConstants.BYTE_ZERO;
+ } else {
+ fill = RowConstants.BYTE_ONE;
+ }
+ Arrays.fill(byteArray.array(), byteArray.offset() + pos, byteArray.offset() + pos + colLength, fill);
+ pos += colLength;
+ }
+ byteArray.setLength(pos);
+
+ //fill shard and cuboid
+ fillHeader(buf);
+ }
+
+ @Override
protected void fillHeader(byte[] bytes) {
int offset = 0;
if (enableSharding) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
new file mode 100644
index 0000000..7c70fff
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.kv;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+
+import com.google.common.collect.Lists;
+
+/**
+ * A LazyRowKeyEncoder will not try to calculate shard
+ * It works for both enableSharding or non-enableSharding scenario
+ * Usually it's for sharded cube scanning, later all possible shard will be rewrite
+ */
+public class LazyRowKeyEncoder extends RowKeyEncoder {
+ public LazyRowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
+ super(cubeSeg, cuboid);
+ }
+
+ protected short calculateShard(byte[] key) {
+ if (enableSharding) {
+ return 0;
+ } else {
+ throw new RuntimeException("If enableSharding false, you should never calculate shard");
+ }
+ }
+
+ //for non-sharding cases it will only return one byte[] with not shard at beginning
+ public List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) {
+ final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+
+ if (!enableSharding) {
+ return Lists.newArrayList(halfCookedKey);//not shard to append at head, so it is already well cooked
+ } else {
+ List<byte[]> ret = Lists.newArrayList();
+ for (short i = 0; i < cuboidShardNum; ++i) {
+ short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards());
+ byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length);
+ BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ ret.add(cookedKey);
+ }
+ return ret;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index 059288a..4d1055b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -23,11 +23,14 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.model.TblColRef;
public class RowKeyEncoder extends AbstractRowKeyEncoder {
@@ -46,7 +49,7 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
}
protected int getHeaderLength() {
- return enableSharding ? RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN : RowConstants.ROWKEY_CUBOIDID_LEN;
+ return cubeSeg.getRowKeyPreambleSize();
}
protected int getBytesLength() {
@@ -60,7 +63,7 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
short shardOffset = ShardingHash.getShard(key, bodyOffset, bodyLength, cuboidShardNum);
return ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards());
} else {
- throw new RuntimeException("If enableSharding false, you should never caculate shard");
+ throw new RuntimeException("If enableSharding false, you should never calculate shard");
}
}
@@ -69,6 +72,20 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
}
@Override
+ public byte[] createBuf() {
+ return new byte[this.getBytesLength()];
+ }
+
+ @Override
+ public void encode(GTRecord record, ImmutableBitSet keyColumns, byte[] buf) {
+ ByteArray byteArray = new ByteArray(buf, getHeaderLength(), 0);
+ record.exportColumns(keyColumns, byteArray, defaultValue());
+
+ //fill shard and cuboid
+ fillHeader(buf);
+ }
+
+ @Override
public byte[] encode(Map<TblColRef, String> valueMap) {
List<byte[]> valueList = new ArrayList<byte[]>();
for (TblColRef bdCol : cuboid.getColumns()) {
@@ -126,18 +143,15 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) {
// special null value case
if (value == null) {
- byte[] valueBytes = defaultValue(columnLen);
- System.arraycopy(valueBytes, 0, outputValue, outputValueOffset, columnLen);
+ Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, defaultValue());
return;
}
colIO.writeColumn(column, value, valueLen, this.blankByte, outputValue, outputValueOffset);
}
- protected byte[] defaultValue(int length) {
- byte[] values = new byte[length];
- Arrays.fill(values, this.blankByte);
- return values;
+ protected byte defaultValue() {
+ return this.blankByte;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index 0f4eb3d..98f6e2d 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -7,6 +7,8 @@ import java.util.List;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
+import com.google.common.base.Preconditions;
+
public class GTRecord implements Comparable<GTRecord> {
final GTInfo info;
@@ -222,12 +224,30 @@ public class GTRecord implements Comparable<GTRecord> {
int pos = 0;
for (int i = 0; i < selectedCols.trueBitCount(); i++) {
int c = selectedCols.trueBitAt(i);
+ Preconditions.checkNotNull(cols[c].array());
System.arraycopy(cols[c].array(), cols[c].offset(), buf.array(), buf.offset() + pos, cols[c].length());
pos += cols[c].length();
}
buf.setLength(pos);
}
+ /** write data to given buffer, like serialize, use defaultValue when required column is not set*/
+ public void exportColumns(ImmutableBitSet selectedCols, ByteArray buf, byte defaultValue) {
+ int pos = 0;
+ for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+ int c = selectedCols.trueBitAt(i);
+ if (cols[c].array() != null) {
+ System.arraycopy(cols[c].array(), cols[c].offset(), buf.array(), buf.offset() + pos, cols[c].length());
+ pos += cols[c].length();
+ } else {
+ int maxLength = info.codeSystem.maxCodeLength(c);
+ Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + maxLength, defaultValue);
+ pos += maxLength;
+ }
+ }
+ buf.setLength(pos);
+ }
+
/** write data to given buffer, like serialize */
public void exportColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
for (int i = 0; i < selectedCols.trueBitCount(); i++) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index d860090..3d07623 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -553,7 +553,7 @@ public class GTScanRangePlanner {
/**
* asymmetric means compare(a,b) > 0 does not cause compare(b,a) < 0
- * so min max functions will not bu supported
+ * so min max functions will not be supported
*/
private static class AsymmetricRecordComparator extends RecordComparator {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
index a6d78e7..fbb258f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
@@ -34,6 +34,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.kv.FuzzyKeyEncoder;
import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
+import org.apache.kylin.cube.kv.LazyRowKeyEncoder;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -118,15 +119,10 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
}
}
- AbstractRowKeyEncoder encoder = AbstractRowKeyEncoder.createInstance(cubeSeg, cuboid);
- encoder.setEncodeShard(false);//will enumerate all possible shards when scanning
-
+ AbstractRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
-
this.startKey = encoder.encode(startValues);
-
encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE);
-
// In order to make stopRow inclusive add a trailing 0 byte. #See Scan.setStopRow(byte [] stopRow)
this.stopKey = Bytes.add(encoder.encode(stopValues), ZERO_TAIL_BYTES);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 4c743fb..6098381 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -7,16 +7,14 @@ import java.util.BitSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.metadata.model.TblColRef;
/**
*/
@@ -28,7 +26,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
protected CubeSegment cubeSegment;
protected CubeDesc cubeDesc;
- private int bytesLength;
+ private AbstractRowKeyEncoder rowKeyEncoder;
private int dimensions;
private int measureCount;
private byte[] keyBuf;
@@ -61,25 +59,13 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
}
cuboidRowCount++;
- int header = RowConstants.ROWKEY_HEADER_LEN;
- int offSet = header;
- for (int x = 0; x < dimensions; x++) {
- System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
- offSet += record.get(x).length();
- }
-
- //fill shard
- short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId);
- short shardOffset = ShardingHash.getShard(keyBuf, header, offSet - header, cuboidShardNum);
- short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId);
- short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, cubeSegment.getTotalShards());
- BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf);
//output measures
valueBuf.clear();
record.exportColumns(measureColumnsIndex, valueBuf);
- outputKey.set(keyBuf, 0, offSet);
+ outputKey.set(keyBuf, 0, keyBuf.length);
outputValue.set(valueBuf.array(), 0, valueBuf.position());
try {
mapContext.write(outputKey, outputValue);
@@ -95,24 +81,17 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
@Override
public void close() {
-
+
}
private void initVariables(Long cuboidId) {
- bytesLength = RowConstants.ROWKEY_HEADER_LEN;
- Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
- for (TblColRef column : cuboid.getColumns()) {
- bytesLength += cubeSegment.getColumnLength(column);
- }
+ rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId));
+ keyBuf = rowKeyEncoder.createBuf();
- keyBuf = new byte[bytesLength];
dimensions = BitSet.valueOf(new long[] { cuboidId }).cardinality();
measureColumnsIndex = new int[measureCount];
for (int i = 0; i < measureCount; i++) {
measureColumnsIndex[i] = dimensions + i;
}
-
- //write cuboid id first
- BytesUtil.writeLong(cuboidId, keyBuf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
index 7ec97c0..35488d1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
@@ -26,7 +26,6 @@ import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.kv.RowKeyColumnIO;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.metadata.model.ColumnDesc;
@@ -47,7 +46,9 @@ public class CoprocessorRowType {
for (int i = 0; i < cols.size(); i++) {
colSizes[i] = tableRecordInfo.getDigest().length(i);
}
- return new CoprocessorRowType(cols.toArray(new TblColRef[cols.size()]), colSizes);
+
+ //TODO:check0
+ return new CoprocessorRowType(cols.toArray(new TblColRef[cols.size()]), colSizes, 0);
}
//for observer
@@ -59,7 +60,7 @@ public class CoprocessorRowType {
for (int i = 0; i < cols.length; i++) {
colSizes[i] = colIO.getColumnLength(cols[i]);
}
- return new CoprocessorRowType(cols, colSizes);
+ return new CoprocessorRowType(cols, colSizes, seg.getRowKeyPreambleSize());
}
public static byte[] serialize(CoprocessorRowType o) {
@@ -82,6 +83,7 @@ public class CoprocessorRowType {
public void serialize(CoprocessorRowType o, ByteBuffer out) {
int n = o.columns.length;
BytesUtil.writeVInt(o.columns.length, out);
+ BytesUtil.writeVInt(o.bodyOffset, out);
for (int i = 0; i < n; i++) {
BytesUtil.writeAsciiString(o.columns[i].getTable(), out);
BytesUtil.writeAsciiString(o.columns[i].getName(), out);
@@ -92,6 +94,7 @@ public class CoprocessorRowType {
@Override
public CoprocessorRowType deserialize(ByteBuffer in) {
int n = BytesUtil.readVInt(in);
+ int bodyOffset = BytesUtil.readVInt(in);
TblColRef[] cols = new TblColRef[n];
int[] colSizes = new int[n];
for (int i = 0; i < n; i++) {
@@ -108,18 +111,20 @@ public class CoprocessorRowType {
int colSize = BytesUtil.readVInt(in);
colSizes[i] = colSize;
}
- return new CoprocessorRowType(cols, colSizes);
+ return new CoprocessorRowType(cols, colSizes, bodyOffset);
}
}
// ============================================================================
public TblColRef[] columns;
+ private int bodyOffset;
public int[] columnSizes;
public int[] columnOffsets;
public HashMap<TblColRef, Integer> columnIdxMap;
- public CoprocessorRowType(TblColRef[] columns, int[] columnSizes) {
+ public CoprocessorRowType(TblColRef[] columns, int[] columnSizes, int bodyOffset) {
+ this.bodyOffset = bodyOffset;
this.columns = columns;
this.columnSizes = columnSizes;
init();
@@ -131,7 +136,7 @@ public class CoprocessorRowType {
private void init() {
int[] offsets = new int[columns.length];
- int o = RowConstants.ROWKEY_HEADER_LEN;
+ int o = bodyOffset;
for (int i = 0; i < columns.length; i++) {
offsets[i] = o;
o += columnSizes[i];
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 1d217ac..412e7602 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -5,20 +5,22 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import org.apache.hadoop.hbase.Cell;
+import javax.annotation.Nullable;
+
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
-import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.FuzzyKeyEncoder;
+import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
+import org.apache.kylin.cube.kv.LazyRowKeyEncoder;
import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyEncoder;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.cube.model.HBaseMappingDesc;
@@ -29,6 +31,7 @@ import org.apache.kylin.gridtable.IGTScanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -41,11 +44,16 @@ public abstract class CubeHBaseRPC {
final protected CubeSegment cubeSeg;
final protected Cuboid cuboid;
final protected GTInfo fullGTInfo;
+ final private RowKeyEncoder fuzzyKeyEncoder;
+ final private RowKeyEncoder fuzzyMaskEncoder;
public CubeHBaseRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) {
this.cubeSeg = cubeSeg;
this.cuboid = cuboid;
this.fullGTInfo = fullGTInfo;
+
+ this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
+ this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
}
abstract IGTScanner getGTScanner(GTScanRequest scanRequest) throws IOException;
@@ -76,23 +84,34 @@ public abstract class CubeHBaseRPC {
final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
List<RawScan> ret = Lists.newArrayList();
- byte[] start = makeRowKeyToScan(pkStart, RowConstants.ROWKEY_LOWER_BYTE);
- byte[] end = makeRowKeyToScan(pkEnd, RowConstants.ROWKEY_UPPER_BYTE);
+ LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
+ byte[] start = encoder.createBuf();
+ byte[] end = encoder.createBuf();
+ List<byte[]> startKeys;
+ List<byte[]> endKeys;
+
+ encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
+ encoder.encode(pkStart, pkStart.getInfo().getPrimaryKey(), start);
+ startKeys = encoder.getRowKeysDifferentShards(start);
+
+ encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE);
+ encoder.encode(pkEnd, pkEnd.getInfo().getPrimaryKey(), end);
+ endKeys = encoder.getRowKeysDifferentShards(end);
+ endKeys = Lists.transform(endKeys, new Function<byte[], byte[]>() {
+ @Nullable
+ @Override
+ public byte[] apply(byte[] input) {
+ byte[] shardEnd = new byte[input.length + 1];//append extra 0 to the end key to make it inclusive while scanning
+ System.arraycopy(input, 0, shardEnd, 0, input.length);
+ return shardEnd;
+ }
+ });
+
+ Preconditions.checkState(startKeys.size() == endKeys.size());
List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys);
- short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
-
- for (short i = 0; i < cuboidShardNum; ++i) {
- short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards());
-
- byte[] shardStart = Arrays.copyOf(start, start.length);
- byte[] shardEnd = new byte[end.length + 1];//append extra 0 to the end key to make it inclusive while scanning
- System.arraycopy(end, 0, shardEnd, 0, end.length);
-
- BytesUtil.writeShort(shard, shardStart, 0, RowConstants.ROWKEY_SHARDID_LEN);
- BytesUtil.writeShort(shard, shardEnd, 0, RowConstants.ROWKEY_SHARDID_LEN);
-
- ret.add(new RawScan(shardStart, shardEnd, selectedColumns, hbaseFuzzyKeys));
+ for (short i = 0; i < startKeys.size(); ++i) {
+ ret.add(new RawScan(startKeys.get(i), endKeys.get(i), selectedColumns, hbaseFuzzyKeys));
}
return ret;
@@ -108,30 +127,12 @@ public abstract class CubeHBaseRPC {
}
List<Pair<byte[], byte[]>> ret = Lists.newArrayList();
- int coreLength = fullGTInfo.getMaxColumnLength(fullGTInfo.getPrimaryKey());
for (GTRecord gtRecordFuzzyKey : fuzzyKeys) {
- byte[] hbaseFuzzyKey = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN];
- byte[] hbaseFuzzyMask = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN];
-
- int pos = 0;
- //shard part
- Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);//shard part should better be FIXED, for simplicity we make it non-fixed
- pos += RowConstants.ROWKEY_SHARDID_LEN;
-
- //cuboid part
- Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_CUBOIDID_LEN, RowConstants.BYTE_ZERO);
- System.arraycopy(cuboid.getBytes(), 0, hbaseFuzzyKey, pos, RowConstants.ROWKEY_CUBOIDID_LEN);
- pos += RowConstants.ROWKEY_CUBOIDID_LEN;
+ byte[] hbaseFuzzyKey = fuzzyKeyEncoder.createBuf();
+ byte[] hbaseFuzzyMask = fuzzyMaskEncoder.createBuf();
- //row key core part
- ByteArray coreKey = HBaseScan.exportScanKey(gtRecordFuzzyKey, RowConstants.BYTE_ZERO);
- System.arraycopy(coreKey.array(), coreKey.offset(), hbaseFuzzyKey, pos, coreKey.length());
- ByteArray coreMask = HBaseScan.exportScanMask(gtRecordFuzzyKey);
- System.arraycopy(coreMask.array(), coreMask.offset(), hbaseFuzzyMask, pos, coreMask.length());
-
- Preconditions.checkState(coreKey.length() == coreMask.length(), "corekey length not equal coremask length");
- pos += coreKey.length();
- Preconditions.checkState(hbaseFuzzyKey.length == pos, "HBase fuzzy key not completely populated");
+ fuzzyKeyEncoder.encode(gtRecordFuzzyKey, gtRecordFuzzyKey.getInfo().getPrimaryKey(), hbaseFuzzyKey);
+ fuzzyMaskEncoder.encode(gtRecordFuzzyKey, gtRecordFuzzyKey.getInfo().getPrimaryKey(), hbaseFuzzyMask);
ret.add(new Pair<byte[], byte[]>(hbaseFuzzyKey, hbaseFuzzyMask));
}
@@ -139,21 +140,6 @@ public abstract class CubeHBaseRPC {
return ret;
}
- private byte[] makeRowKeyToScan(GTRecord pkRec, byte fill) {
- ByteArray pk = HBaseScan.exportScanKey(pkRec, fill);
-
- byte[] buf = new byte[pk.length() + RowConstants.ROWKEY_HEADER_LEN];
- Arrays.fill(buf, fill);
-
- //for scanning/reading, later all possible shard will be applied
-
- System.arraycopy(cuboid.getBytes(), 0, buf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
- if (pk != null && pk.array() != null) {
- System.arraycopy(pk.array(), pk.offset(), buf, RowConstants.ROWKEY_HEADER_LEN, pk.length());
- }
- return buf;
- }
-
/**
* prune untouched hbase columns
*/
@@ -206,8 +192,6 @@ public abstract class CubeHBaseRPC {
return ret;
}
-
-
public static void applyHBaseColums(Scan scan, List<Pair<byte[], byte[]>> hbaseColumns) {
for (Pair<byte[], byte[]> hbaseColumn : hbaseColumns) {
byte[] byteFamily = hbaseColumn.getFirst();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index fa5a844..69b95ca 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -118,7 +118,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
}
};
- IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT);
+ IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT,cubeSeg.getRowKeyPreambleSize());
IGTScanner rawScanner = store.scan(scanRequest);
final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index 7731f19..303c360 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -43,13 +43,14 @@ public class HBaseReadonlyStore implements IGTStore {
private GTInfo info;
private List<Pair<byte[], byte[]>> hbaseColumns;
private List<List<Integer>> hbaseColumnsToGT;
+ private int rowkeyPreambleSize;
- public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT) {
+ public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize) {
this.cellListIterator = cellListIterator;
-
this.info = gtScanRequest.getInfo();
this.hbaseColumns = hbaseColumns;
this.hbaseColumnsToGT = hbaseColumnsToGT;
+ this.rowkeyPreambleSize = rowkeyPreambleSize;
}
@Override
@@ -108,7 +109,7 @@ public class HBaseReadonlyStore implements IGTStore {
// dimensions, set to primary key, also the 0th column block
Cell firstCell = oneRow.get(0);
- ByteBuffer buf = byteBuffer(firstCell.getRowArray(), RowConstants.ROWKEY_HEADER_LEN + firstCell.getRowOffset(), firstCell.getRowLength() - RowConstants.ROWKEY_HEADER_LEN);
+ ByteBuffer buf = byteBuffer(firstCell.getRowArray(), rowkeyPreambleSize + firstCell.getRowOffset(), firstCell.getRowLength() - rowkeyPreambleSize);
oneRecord.loadCellBlock(0, buf);
// metrics
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java
deleted file mode 100644
index 65a963d..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import java.util.Arrays;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-
-import com.google.common.base.Preconditions;
-
-public class HBaseScan {
-
- /**
- * every column in scan key is fixed length. for empty values, 0 zero will be populated
- */
- public static ByteArray exportScanKey(GTRecord rec, byte fill) {
-
- Preconditions.checkNotNull(rec);
-
- GTInfo info = rec.getInfo();
- int len = info.getMaxColumnLength(info.getPrimaryKey());
- ByteArray buf = ByteArray.allocate(len);
- int pos = 0;
- for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) {
- int c = info.getPrimaryKey().trueBitAt(i);
- int colLength = info.getCodeSystem().maxCodeLength(c);
-
- if (rec.get(c).array() != null) {
- Preconditions.checkArgument(colLength == rec.get(c).length(), "ColLength :" + colLength + " != cols[c].length: " + rec.get(c).length() + ", c is " + c);
- System.arraycopy(rec.get(c).array(), rec.get(c).offset(), buf.array(), buf.offset() + pos, rec.get(c).length());
- } else {
- Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill);
- }
- pos += colLength;
- }
- buf.setLength(pos);
-
- return buf;
- }
-
- /**
- * every column in scan key is fixed length. for fixed columns, 0 will be populated, for non-fixed columns, 1 will be populated
- */
- public static ByteArray exportScanMask(GTRecord rec) {
- Preconditions.checkNotNull(rec);
-
- GTInfo info = rec.getInfo();
- int len = info.getMaxColumnLength(info.getPrimaryKey());
- ByteArray buf = ByteArray.allocate(len);
- byte fill;
-
- int pos = 0;
- for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) {
- int c = info.getPrimaryKey().trueBitAt(i);
- int colLength = info.getCodeSystem().maxCodeLength(c);
-
- if (rec.get(c).array() != null) {
- fill = RowConstants.BYTE_ZERO;
- } else {
- fill = RowConstants.BYTE_ONE;
- }
- Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill);
- pos += colLength;
- }
- buf.setLength(pos);
-
- return buf;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 31d1901..3759738 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -147,7 +147,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
innerScanner = region.getScanner(scan);
InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
- IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT);
+ IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
IGTScanner rawScanner = store.scan(scanReq);
CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index 4e69dcc..7d9b313 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -46,27 +46,37 @@ public final class CubeVisitProtos {
*/
com.google.protobuf.ByteString getHbaseRawScan();
- // repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;
+ // required int32 rowkeyPreambleSize = 4;
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>required int32 rowkeyPreambleSize = 4;</code>
+ */
+ boolean hasRowkeyPreambleSize();
+ /**
+ * <code>required int32 rowkeyPreambleSize = 4;</code>
+ */
+ int getRowkeyPreambleSize();
+
+ // repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;
+ /**
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
java.util.List<org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList>
getHbaseColumnsToGTList();
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList getHbaseColumnsToGT(int index);
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
int getHbaseColumnsToGTCount();
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
java.util.List<? extends org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntListOrBuilder>
getHbaseColumnsToGTOrBuilderList();
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntListOrBuilder getHbaseColumnsToGTOrBuilder(
int index);
@@ -137,10 +147,15 @@ public final class CubeVisitProtos {
hbaseRawScan_ = input.readBytes();
break;
}
- case 34: {
- if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+ case 32: {
+ bitField0_ |= 0x00000008;
+ rowkeyPreambleSize_ = input.readInt32();
+ break;
+ }
+ case 42: {
+ if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
hbaseColumnsToGT_ = new java.util.ArrayList<org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList>();
- mutable_bitField0_ |= 0x00000008;
+ mutable_bitField0_ |= 0x00000010;
}
hbaseColumnsToGT_.add(input.readMessage(org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.PARSER, extensionRegistry));
break;
@@ -153,7 +168,7 @@ public final class CubeVisitProtos {
throw new com.google.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
- if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+ if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
hbaseColumnsToGT_ = java.util.Collections.unmodifiableList(hbaseColumnsToGT_);
}
this.unknownFields = unknownFields.build();
@@ -767,36 +782,52 @@ public final class CubeVisitProtos {
return hbaseRawScan_;
}
- // repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;
- public static final int HBASECOLUMNSTOGT_FIELD_NUMBER = 4;
+ // required int32 rowkeyPreambleSize = 4;
+ public static final int ROWKEYPREAMBLESIZE_FIELD_NUMBER = 4;
+ private int rowkeyPreambleSize_;
+ /**
+ * <code>required int32 rowkeyPreambleSize = 4;</code>
+ */
+ public boolean hasRowkeyPreambleSize() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>required int32 rowkeyPreambleSize = 4;</code>
+ */
+ public int getRowkeyPreambleSize() {
+ return rowkeyPreambleSize_;
+ }
+
+ // repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;
+ public static final int HBASECOLUMNSTOGT_FIELD_NUMBER = 5;
private java.util.List<org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList> hbaseColumnsToGT_;
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public java.util.List<org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList> getHbaseColumnsToGTList() {
return hbaseColumnsToGT_;
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public java.util.List<? extends org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntListOrBuilder>
getHbaseColumnsToGTOrBuilderList() {
return hbaseColumnsToGT_;
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public int getHbaseColumnsToGTCount() {
return hbaseColumnsToGT_.size();
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList getHbaseColumnsToGT(int index) {
return hbaseColumnsToGT_.get(index);
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntListOrBuilder getHbaseColumnsToGTOrBuilder(
int index) {
@@ -807,6 +838,7 @@ public final class CubeVisitProtos {
behavior_ = "";
gtScanRequest_ = com.google.protobuf.ByteString.EMPTY;
hbaseRawScan_ = com.google.protobuf.ByteString.EMPTY;
+ rowkeyPreambleSize_ = 0;
hbaseColumnsToGT_ = java.util.Collections.emptyList();
}
private byte memoizedIsInitialized = -1;
@@ -826,6 +858,10 @@ public final class CubeVisitProtos {
memoizedIsInitialized = 0;
return false;
}
+ if (!hasRowkeyPreambleSize()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -842,8 +878,11 @@ public final class CubeVisitProtos {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, hbaseRawScan_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeInt32(4, rowkeyPreambleSize_);
+ }
for (int i = 0; i < hbaseColumnsToGT_.size(); i++) {
- output.writeMessage(4, hbaseColumnsToGT_.get(i));
+ output.writeMessage(5, hbaseColumnsToGT_.get(i));
}
getUnknownFields().writeTo(output);
}
@@ -866,9 +905,13 @@ public final class CubeVisitProtos {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(3, hbaseRawScan_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(4, rowkeyPreambleSize_);
+ }
for (int i = 0; i < hbaseColumnsToGT_.size(); i++) {
size += com.google.protobuf.CodedOutputStream
- .computeMessageSize(4, hbaseColumnsToGT_.get(i));
+ .computeMessageSize(5, hbaseColumnsToGT_.get(i));
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
@@ -908,6 +951,11 @@ public final class CubeVisitProtos {
result = result && getHbaseRawScan()
.equals(other.getHbaseRawScan());
}
+ result = result && (hasRowkeyPreambleSize() == other.hasRowkeyPreambleSize());
+ if (hasRowkeyPreambleSize()) {
+ result = result && (getRowkeyPreambleSize()
+ == other.getRowkeyPreambleSize());
+ }
result = result && getHbaseColumnsToGTList()
.equals(other.getHbaseColumnsToGTList());
result = result &&
@@ -935,6 +983,10 @@ public final class CubeVisitProtos {
hash = (37 * hash) + HBASERAWSCAN_FIELD_NUMBER;
hash = (53 * hash) + getHbaseRawScan().hashCode();
}
+ if (hasRowkeyPreambleSize()) {
+ hash = (37 * hash) + ROWKEYPREAMBLESIZE_FIELD_NUMBER;
+ hash = (53 * hash) + getRowkeyPreambleSize();
+ }
if (getHbaseColumnsToGTCount() > 0) {
hash = (37 * hash) + HBASECOLUMNSTOGT_FIELD_NUMBER;
hash = (53 * hash) + getHbaseColumnsToGTList().hashCode();
@@ -1055,9 +1107,11 @@ public final class CubeVisitProtos {
bitField0_ = (bitField0_ & ~0x00000002);
hbaseRawScan_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000004);
+ rowkeyPreambleSize_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000008);
if (hbaseColumnsToGTBuilder_ == null) {
hbaseColumnsToGT_ = java.util.Collections.emptyList();
- bitField0_ = (bitField0_ & ~0x00000008);
+ bitField0_ = (bitField0_ & ~0x00000010);
} else {
hbaseColumnsToGTBuilder_.clear();
}
@@ -1101,10 +1155,14 @@ public final class CubeVisitProtos {
to_bitField0_ |= 0x00000004;
}
result.hbaseRawScan_ = hbaseRawScan_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.rowkeyPreambleSize_ = rowkeyPreambleSize_;
if (hbaseColumnsToGTBuilder_ == null) {
- if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
hbaseColumnsToGT_ = java.util.Collections.unmodifiableList(hbaseColumnsToGT_);
- bitField0_ = (bitField0_ & ~0x00000008);
+ bitField0_ = (bitField0_ & ~0x00000010);
}
result.hbaseColumnsToGT_ = hbaseColumnsToGT_;
} else {
@@ -1137,11 +1195,14 @@ public final class CubeVisitProtos {
if (other.hasHbaseRawScan()) {
setHbaseRawScan(other.getHbaseRawScan());
}
+ if (other.hasRowkeyPreambleSize()) {
+ setRowkeyPreambleSize(other.getRowkeyPreambleSize());
+ }
if (hbaseColumnsToGTBuilder_ == null) {
if (!other.hbaseColumnsToGT_.isEmpty()) {
if (hbaseColumnsToGT_.isEmpty()) {
hbaseColumnsToGT_ = other.hbaseColumnsToGT_;
- bitField0_ = (bitField0_ & ~0x00000008);
+ bitField0_ = (bitField0_ & ~0x00000010);
} else {
ensureHbaseColumnsToGTIsMutable();
hbaseColumnsToGT_.addAll(other.hbaseColumnsToGT_);
@@ -1154,7 +1215,7 @@ public final class CubeVisitProtos {
hbaseColumnsToGTBuilder_.dispose();
hbaseColumnsToGTBuilder_ = null;
hbaseColumnsToGT_ = other.hbaseColumnsToGT_;
- bitField0_ = (bitField0_ & ~0x00000008);
+ bitField0_ = (bitField0_ & ~0x00000010);
hbaseColumnsToGTBuilder_ =
com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
getHbaseColumnsToGTFieldBuilder() : null;
@@ -1180,6 +1241,10 @@ public final class CubeVisitProtos {
return false;
}
+ if (!hasRowkeyPreambleSize()) {
+
+ return false;
+ }
return true;
}
@@ -1348,13 +1413,46 @@ public final class CubeVisitProtos {
return this;
}
- // repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;
+ // required int32 rowkeyPreambleSize = 4;
+ private int rowkeyPreambleSize_ ;
+ /**
+ * <code>required int32 rowkeyPreambleSize = 4;</code>
+ */
+ public boolean hasRowkeyPreambleSize() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>required int32 rowkeyPreambleSize = 4;</code>
+ */
+ public int getRowkeyPreambleSize() {
+ return rowkeyPreambleSize_;
+ }
+ /**
+ * <code>required int32 rowkeyPreambleSize = 4;</code>
+ */
+ public Builder setRowkeyPreambleSize(int value) {
+ bitField0_ |= 0x00000008;
+ rowkeyPreambleSize_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required int32 rowkeyPreambleSize = 4;</code>
+ */
+ public Builder clearRowkeyPreambleSize() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ rowkeyPreambleSize_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;
private java.util.List<org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList> hbaseColumnsToGT_ =
java.util.Collections.emptyList();
private void ensureHbaseColumnsToGTIsMutable() {
- if (!((bitField0_ & 0x00000008) == 0x00000008)) {
+ if (!((bitField0_ & 0x00000010) == 0x00000010)) {
hbaseColumnsToGT_ = new java.util.ArrayList<org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList>(hbaseColumnsToGT_);
- bitField0_ |= 0x00000008;
+ bitField0_ |= 0x00000010;
}
}
@@ -1362,7 +1460,7 @@ public final class CubeVisitProtos {
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList, org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.Builder, org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntListOrBuilder> hbaseColumnsToGTBuilder_;
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public java.util.List<org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList> getHbaseColumnsToGTList() {
if (hbaseColumnsToGTBuilder_ == null) {
@@ -1372,7 +1470,7 @@ public final class CubeVisitProtos {
}
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public int getHbaseColumnsToGTCount() {
if (hbaseColumnsToGTBuilder_ == null) {
@@ -1382,7 +1480,7 @@ public final class CubeVisitProtos {
}
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList getHbaseColumnsToGT(int index) {
if (hbaseColumnsToGTBuilder_ == null) {
@@ -1392,7 +1490,7 @@ public final class CubeVisitProtos {
}
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public Builder setHbaseColumnsToGT(
int index, org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList value) {
@@ -1409,7 +1507,7 @@ public final class CubeVisitProtos {
return this;
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public Builder setHbaseColumnsToGT(
int index, org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.Builder builderForValue) {
@@ -1423,7 +1521,7 @@ public final class CubeVisitProtos {
return this;
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public Builder addHbaseColumnsToGT(org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList value) {
if (hbaseColumnsToGTBuilder_ == null) {
@@ -1439,7 +1537,7 @@ public final class CubeVisitProtos {
return this;
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public Builder addHbaseColumnsToGT(
int index, org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList value) {
@@ -1456,7 +1554,7 @@ public final class CubeVisitProtos {
return this;
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public Builder addHbaseColumnsToGT(
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.Builder builderForValue) {
@@ -1470,7 +1568,7 @@ public final class CubeVisitProtos {
return this;
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public Builder addHbaseColumnsToGT(
int index, org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.Builder builderForValue) {
@@ -1484,7 +1582,7 @@ public final class CubeVisitProtos {
return this;
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public Builder addAllHbaseColumnsToGT(
java.lang.Iterable<? extends org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList> values) {
@@ -1498,12 +1596,12 @@ public final class CubeVisitProtos {
return this;
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public Builder clearHbaseColumnsToGT() {
if (hbaseColumnsToGTBuilder_ == null) {
hbaseColumnsToGT_ = java.util.Collections.emptyList();
- bitField0_ = (bitField0_ & ~0x00000008);
+ bitField0_ = (bitField0_ & ~0x00000010);
onChanged();
} else {
hbaseColumnsToGTBuilder_.clear();
@@ -1511,7 +1609,7 @@ public final class CubeVisitProtos {
return this;
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public Builder removeHbaseColumnsToGT(int index) {
if (hbaseColumnsToGTBuilder_ == null) {
@@ -1524,14 +1622,14 @@ public final class CubeVisitProtos {
return this;
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.Builder getHbaseColumnsToGTBuilder(
int index) {
return getHbaseColumnsToGTFieldBuilder().getBuilder(index);
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntListOrBuilder getHbaseColumnsToGTOrBuilder(
int index) {
@@ -1541,7 +1639,7 @@ public final class CubeVisitProtos {
}
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public java.util.List<? extends org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntListOrBuilder>
getHbaseColumnsToGTOrBuilderList() {
@@ -1552,14 +1650,14 @@ public final class CubeVisitProtos {
}
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.Builder addHbaseColumnsToGTBuilder() {
return getHbaseColumnsToGTFieldBuilder().addBuilder(
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.getDefaultInstance());
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.Builder addHbaseColumnsToGTBuilder(
int index) {
@@ -1567,7 +1665,7 @@ public final class CubeVisitProtos {
index, org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.getDefaultInstance());
}
/**
- * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 4;</code>
+ * <code>repeated .CubeVisitRequest.IntList hbaseColumnsToGT = 5;</code>
*/
public java.util.List<org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.Builder>
getHbaseColumnsToGTBuilderList() {
@@ -1580,7 +1678,7 @@ public final class CubeVisitProtos {
hbaseColumnsToGTBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList, org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.Builder, org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntListOrBuilder>(
hbaseColumnsToGT_,
- ((bitField0_ & 0x00000008) == 0x00000008),
+ ((bitField0_ & 0x00000010) == 0x00000010),
getParentForChildren(),
isClean());
hbaseColumnsToGT_ = null;
@@ -3655,23 +3753,24 @@ public final class CubeVisitProtos {
java.lang.String[] descriptorData = {
"\npstorage-hbase/src/main/java/org/apache" +
"/kylin/storage/hbase/cube/v2/coprocessor" +
- "/endpoint/protobuf/CubeVisit.proto\"\237\001\n\020C" +
+ "/endpoint/protobuf/CubeVisit.proto\"\273\001\n\020C" +
"ubeVisitRequest\022\020\n\010behavior\030\001 \002(\t\022\025\n\rgtS" +
- "canRequest\030\002 \002(\014\022\024\n\014hbaseRawScan\030\003 \002(\014\0223" +
- "\n\020hbaseColumnsToGT\030\004 \003(\0132\031.CubeVisitRequ" +
- "est.IntList\032\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\251\002\n" +
- "\021CubeVisitResponse\022\026\n\016compressedRows\030\001 \002" +
- "(\014\022\'\n\005stats\030\002 \002(\0132\030.CubeVisitResponse.St" +
- "ats\032\322\001\n\005Stats\022\030\n\020serviceStartTime\030\001 \001(\003\022",
- "\026\n\016serviceEndTime\030\002 \001(\003\022\027\n\017scannedRowCou" +
- "nt\030\003 \001(\005\022\032\n\022aggregatedRowCount\030\004 \001(\005\022\025\n\r" +
- "systemCpuLoad\030\005 \001(\001\022\036\n\026freePhysicalMemor" +
- "ySize\030\006 \001(\001\022\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020" +
- "\n\010hostname\030\010 \001(\t2F\n\020CubeVisitService\0222\n\t" +
- "visitCube\022\021.CubeVisitRequest\032\022.CubeVisit" +
- "ResponseB`\nEorg.apache.kylin.storage.hba" +
- "se.cube.v2.coprocessor.endpoint.generate" +
- "dB\017CubeVisitProtosH\001\210\001\001\240\001\001"
+ "canRequest\030\002 \002(\014\022\024\n\014hbaseRawScan\030\003 \002(\014\022\032" +
+ "\n\022rowkeyPreambleSize\030\004 \002(\005\0223\n\020hbaseColum" +
+ "nsToGT\030\005 \003(\0132\031.CubeVisitRequest.IntList\032" +
+ "\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\251\002\n\021CubeVisitRe" +
+ "sponse\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030" +
+ "\002 \002(\0132\030.CubeVisitResponse.Stats\032\322\001\n\005Stat",
+ "s\022\030\n\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEn" +
+ "dTime\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\005\022\032\n\022" +
+ "aggregatedRowCount\030\004 \001(\005\022\025\n\rsystemCpuLoa" +
+ "d\030\005 \001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022" +
+ "\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010" +
+ " \001(\t2F\n\020CubeVisitService\0222\n\tvisitCube\022\021." +
+ "CubeVisitRequest\032\022.CubeVisitResponseB`\nE" +
+ "org.apache.kylin.storage.hbase.cube.v2.c" +
+ "oprocessor.endpoint.generatedB\017CubeVisit" +
+ "ProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3683,7 +3782,7 @@ public final class CubeVisitProtos {
internal_static_CubeVisitRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CubeVisitRequest_descriptor,
- new java.lang.String[] { "Behavior", "GtScanRequest", "HbaseRawScan", "HbaseColumnsToGT", });
+ new java.lang.String[] { "Behavior", "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", });
internal_static_CubeVisitRequest_IntList_descriptor =
internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0);
internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index a6ad308..aa05c0b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -15,7 +15,8 @@ message CubeVisitRequest {
required string behavior = 1;
required bytes gtScanRequest = 2;
required bytes hbaseRawScan = 3;
- repeated IntList hbaseColumnsToGT = 4;
+ required int32 rowkeyPreambleSize = 4;
+ repeated IntList hbaseColumnsToGT = 5;
message IntList {
repeated int32 ints = 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index b29bd81..0ed374f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -55,7 +55,6 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -179,7 +178,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
IOUtils.closeStream(tempFileStream);
}
Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap();
-
+
FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
int samplingPercentage = 25;
SequenceFile.Reader reader = null;
@@ -257,7 +256,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
Map<Long, Double> cubeSizeMap = Maps.newHashMap();
for (Map.Entry<Long, Long> entry : cubeRowCountMap.entrySet()) {
- cubeSizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeDesc, entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize));
+ cubeSizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize));
}
for (Double cuboidSize : cubeSizeMap.values()) {
@@ -357,14 +356,11 @@ public class CreateHTableJob extends AbstractHadoopJob {
/**
* Estimate the cuboid's size
*
- * @param cubeDesc
- * @param cuboidId
- * @param rowCount
* @return the cuboid size in M bytes
*/
- private static double estimateCuboidStorageSize(CubeDesc cubeDesc, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) {
+ private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) {
- int bytesLength = RowConstants.ROWKEY_HEADER_LEN;
+ int bytesLength = cubeSegment.getRowKeyPreambleSize();
long mask = Long.highestOneBit(baseCuboidId);
long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);
@@ -377,7 +373,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
// add the measure length
int space = 0;
- for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) {
DataType returnType = measureDesc.getFunction().getReturnDataType();
if (returnType.isHLLC()) {
// for HLL, it will be compressed when export to bytes
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index d857fb1..b63b1f3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -34,22 +34,16 @@
package org.apache.kylin.storage.hbase.steps;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
@@ -70,11 +64,13 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
private final List<KeyValueCreator> keyValueCreators;
private final int nColumns;
private final HTableInterface hTable;
- private final ByteBuffer byteBuffer;
private final CubeDesc cubeDesc;
private final CubeSegment cubeSegment;
private final Object[] measureValues;
+
private List<Put> puts = Lists.newArrayList();
+ private AbstractRowKeyEncoder rowKeyEncoder;
+ private byte[] keybuf;
public HBaseCuboidWriter(CubeSegment segment, HTableInterface hTable) {
this.keyValueCreators = Lists.newArrayList();
@@ -87,7 +83,6 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
}
this.nColumns = keyValueCreators.size();
this.hTable = hTable;
- this.byteBuffer = ByteBuffer.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
this.measureValues = new Object[cubeDesc.getMeasures().size()];
}
@@ -97,38 +92,28 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
return result;
}
- private ByteBuffer createKey(Long cuboidId, GTRecord record) {
- byteBuffer.clear();
- byteBuffer.put(Bytes.toBytes((short) 0), 0, RowConstants.ROWKEY_SHARDID_LEN);//occupy space first
- byteBuffer.put(Bytes.toBytes(cuboidId), 0, RowConstants.ROWKEY_CUBOIDID_LEN);
- final int cardinality = BitSet.valueOf(new long[] { cuboidId }).cardinality();
- for (int i = 0; i < cardinality; i++) {
- final ByteArray byteArray = record.get(i);
- byteBuffer.put(byteArray.array(), byteArray.offset(), byteArray.length());
+ //TODO:shardingonstreaming
+ private byte[] createKey(Long cuboidId, GTRecord record) {
+ if (rowKeyEncoder.getCuboidID() != cuboidId) {
+ rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId));
+ keybuf = rowKeyEncoder.createBuf();
}
+ rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keybuf);
+ return keybuf;
- //fill shard
- short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId);
- short shardOffset = ShardingHash.getShard(byteBuffer.array(), //
- RowConstants.ROWKEY_HEADER_LEN, byteBuffer.position() - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum);
- Short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId);
- short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, cubeSegment.getTotalShards());
- BytesUtil.writeShort(finalShard, byteBuffer.array(), 0, RowConstants.ROWKEY_SHARDID_LEN);
-
- return byteBuffer;
}
@Override
public void write(long cuboidId, GTRecord record) throws IOException {
- final ByteBuffer key = createKey(cuboidId, record);
+ byte[] key = createKey(cuboidId, record);
final Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
final int nDims = cuboid.getColumns().size();
final ImmutableBitSet bitSet = new ImmutableBitSet(nDims, nDims + cubeDesc.getMeasures().size());
for (int i = 0; i < nColumns; i++) {
final Object[] values = record.getValues(bitSet, measureValues);
- final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), values);
- final Put put = new Put(copy(key.array(), 0, key.position()));
+ final KeyValue keyValue = keyValueCreators.get(i).create(key, 0, key.length, values);
+ final Put put = new Put(copy(key, 0, key.length));
byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength());
@@ -158,7 +143,7 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
@Override
public void close() {
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index 7b4e1a4..2d4fb90 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -41,6 +41,8 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
*/
public class HBaseStreamingOutput implements IStreamingOutput {
@@ -51,6 +53,10 @@ public class HBaseStreamingOutput implements IStreamingOutput {
public ICuboidWriter getCuboidWriter(IBuildable buildable) {
try {
CubeSegment cubeSegment = (CubeSegment) buildable;
+
+ //If ever attempt to enable sharding on streaming please also check //TODO:shardingonstreaming
+ Preconditions.checkArgument(!cubeSegment.isEnableSharding(), "Streaming table not allowed to use sharding");
+
final HTableInterface hTable;
hTable = createHTable(cubeSegment);
return new HBaseCuboidWriter(cubeSegment, hTable);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad3622e1/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
index a88e147..5c1a780 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
@@ -206,7 +206,7 @@ public class AggregateRegionObserverTest {
t.setDatabase("DEFAULT");
TblColRef[] cols = new TblColRef[] { newCol(1, "A", t), newCol(2, "B", t), newCol(3, "C", t), newCol(4, "D", t) };
int[] sizes = new int[] { 1, 1, 1, 1 };
- return new CoprocessorRowType(cols, sizes);
+ return new CoprocessorRowType(cols, sizes,RowConstants.ROWKEY_HEADER_LEN);
}
private TblColRef newCol(int i, String name, TableDesc t) {