You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/05 02:54:42 UTC
[28/31] incubator-kylin git commit: KYLIN-1112 fix inverted index
query bug
KYLIN-1112 fix inverted index query bug
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b7cb41a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b7cb41a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b7cb41a8
Branch: refs/heads/KYLIN-1112
Commit: b7cb41a8b390b8b391fb038f5ce688dc03887089
Parents: bb3aa4f
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 4 20:28:11 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 5 09:53:17 2015 +0800
----------------------------------------------------------------------
.../kylin/job/hadoop/invertedindex/IITest.java | 8 +++----
.../org/apache/kylin/common/util/BytesUtil.java | 10 +++++++++
.../measure/fixedlen/FixedHLLCodec.java | 6 +++++
.../measure/fixedlen/FixedLenMeasureCodec.java | 4 ++++
.../measure/fixedlen/FixedPointLongCodec.java | 8 +++++++
.../endpoint/EndpointAggregators.java | 23 +++++++++++++-------
.../endpoint/EndpointTupleIterator.java | 7 +++---
.../ii/coprocessor/endpoint/IIEndpoint.java | 13 ++++++++---
8 files changed, 61 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 800b615..200156a 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -1,6 +1,7 @@
package org.apache.kylin.job.hadoop.invertedindex;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -15,8 +16,6 @@ import com.google.common.base.Function;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
-import com.google.protobuf.HBaseZeroCopyByteString;
-
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -235,9 +234,10 @@ public class IITest extends LocalFileMetadataTestCase {
System.out.println(response.getRowsList().size());
Set<String> answers = Sets.newHashSet("120.4747", "26.8551");
for (IIProtos.IIResponseInternal.IIRow responseRow : response.getRowsList()) {
- byte[] measuresBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(responseRow.getMeasures());
- List<Object> metrics = aggregators.deserializeMetricValues(measuresBytes, 0);
+ ByteBuffer bf = responseRow.getMeasures().asReadOnlyByteBuffer();
+ List<Object> metrics = aggregators.deserializeMetricValues(bf);
Assert.assertTrue(answers.contains(metrics.get(0)));
+ answers.remove(metrics.get(0));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index 0d4dba9..e01ce4f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -57,6 +57,16 @@ public class BytesUtil {
return integer;
}
+
+ public static long readLong(ByteBuffer buffer, int size) {
+ long integer = 0;
+ for (int i = 0; i < size; i++) {
+ integer <<= 8;
+ integer |= (long) buffer.get() & 0xFF;
+ }
+ return integer;
+ }
+
public static void writeUnsigned(int num, byte[] bytes, int offset, int size) {
for (int i = offset + size - 1; i >= offset; i--) {
bytes[i] = (byte) num;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
index 85dfe14..a0af6a2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
@@ -71,4 +71,10 @@ public class FixedHLLCodec extends FixedLenMeasureCodec<HyperLogLogPlusCounter>
public void write(HyperLogLogPlusCounter v, byte[] buf, int offset) {
v.writeRegistersArray(ByteBuffer.wrap(buf, offset, buf.length - offset));
}
+
+ @Override
+ public HyperLogLogPlusCounter read(ByteBuffer buffer) {
+ current.readRegistersArray(buffer);
+ return current;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
index ad8c483..54c4eb8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
@@ -20,6 +20,8 @@ package org.apache.kylin.metadata.measure.fixedlen;
import org.apache.kylin.metadata.model.DataType;
+import java.nio.ByteBuffer;
+
abstract public class FixedLenMeasureCodec<T> {
public static FixedLenMeasureCodec<?> get(DataType type) {
@@ -42,4 +44,6 @@ abstract public class FixedLenMeasureCodec<T> {
abstract public void write(T v, byte[] buf, int offset);
+ abstract public T read(ByteBuffer buffer);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
index 4014c21..37eb0be 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
@@ -22,6 +22,8 @@ import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.metadata.measure.LongMutable;
import org.apache.kylin.metadata.model.DataType;
+import java.nio.ByteBuffer;
+
public class FixedPointLongCodec extends FixedLenMeasureCodec<LongMutable> {
private static final int SIZE = 8;
@@ -106,4 +108,10 @@ public class FixedPointLongCodec extends FixedLenMeasureCodec<LongMutable> {
public void write(LongMutable v, byte[] buf, int offset) {
BytesUtil.writeLong(v == null ? 0 : v.get(), buf, offset, SIZE);
}
+
+ @Override
+ public LongMutable read(ByteBuffer buffer) {
+ current.set(BytesUtil.readLong(buffer, SIZE));
+ return current;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
index 3a0049d..95cca0d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
@@ -199,26 +199,26 @@ public class EndpointAggregators {
* @param buffer byte buffer to get the metric data
* @return length of metric data
*/
- public int serializeMetricValues(MeasureAggregator[] aggrs, byte[] buffer) {
+ public int serializeMetricValues(MeasureAggregator[] aggrs, byte[] buffer, int offset) {
for (int i = 0; i < funcNames.length; i++) {
metricValues[i] = aggrs[i].getState();
}
- int metricBytesOffset = 0;
+ int metricBytesOffset = offset;
+ int length = 0;
for (int i = 0; i < measureSerializers.length; i++) {
measureSerializers[i].write(metricValues[i], buffer, metricBytesOffset);
metricBytesOffset += measureSerializers[i].getLength();
+ length += measureSerializers[i].getLength();
}
- return metricBytesOffset;
+ return length;
}
-
- public List<Object> deserializeMetricValues(byte[] metricBytes, int offset) {
+
+ public List<Object> deserializeMetricValues(ByteBuffer buffer) {
List<Object> ret = Lists.newArrayList();
- int metricBytesOffset = offset;
for (int i = 0; i < measureSerializers.length; i++) {
- measureSerializers[i].read(metricBytes, metricBytesOffset);
+ measureSerializers[i].read(buffer);
Object valueString = measureSerializers[i].getValue();
- metricBytesOffset += measureSerializers[i].getLength();
ret.add(valueString);
}
return ret;
@@ -279,4 +279,11 @@ public class EndpointAggregators {
}
+ public int getMeasureSerializeLength() {
+ int length = 0;
+ for (int i = 0; i < this.measureSerializers.length; ++i) {
+ length += this.measureSerializers[i].getLength();
+ }
+ return length;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
index 6d3ec4d..99db123 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
@@ -19,6 +19,7 @@
package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -28,6 +29,7 @@ import java.util.Map;
import javax.annotation.Nullable;
+import com.google.protobuf.ByteString;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.hbase.client.HConnection;
@@ -340,9 +342,8 @@ public class EndpointTupleIterator implements ITupleIterator {
byte[] columnsBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getColumns());
this.tableRecord.setBytes(columnsBytes, 0, columnsBytes.length);
if (currentRow.hasMeasures()) {
- byte[] measuresBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getMeasures());
-
- this.measureValues = pushedDownAggregators.deserializeMetricValues(measuresBytes, 0);
+ ByteBuffer buffer = currentRow.getMeasures().asReadOnlyByteBuffer();
+ this.measureValues = pushedDownAggregators.deserializeMetricValues(buffer);
}
index++;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
index 6173241..e4360a2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
@@ -205,7 +205,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
RowKeyColumnIO rowKeyColumnIO = new RowKeyColumnIO(clearTextDictionary);
byte[] recordBuffer = new byte[recordInfo.getByteFormLen()];
- final byte[] buffer = new byte[CoprocessorConstants.METRIC_SERIALIZE_BUFFER_SIZE];
+ byte[] buffer = new byte[CoprocessorConstants.METRIC_SERIALIZE_BUFFER_SIZE];
int iteratedSliceCount = 0;
long latestSliceTs = Long.MIN_VALUE;
@@ -267,11 +267,18 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
logger.info("Iterated Slices count: " + iteratedSliceCount);
if (needAgg) {
+ int offset = 0;
+ int measureLength = aggregators.getMeasureSerializeLength();
for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
AggrKey aggrKey = entry.getKey();
IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(aggrKey.get(), aggrKey.offset(), aggrKey.length()));
- int length = aggregators.serializeMetricValues(entry.getValue(), buffer);
- rowBuilder.setMeasures(HBaseZeroCopyByteString.wrap(buffer, 0, length));
+ if (offset + measureLength > buffer.length) {
+ buffer = new byte[CoprocessorConstants.METRIC_SERIALIZE_BUFFER_SIZE];
+ offset = 0;
+ }
+ int length = aggregators.serializeMetricValues(entry.getValue(), buffer, offset);
+ rowBuilder.setMeasures(HBaseZeroCopyByteString.wrap(buffer, offset, length));
+ offset += length;
responseBuilder.addRows(rowBuilder.build());
}
}