You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/05 02:54:42 UTC

[28/31] incubator-kylin git commit: KYLIN-1112 fix inverted index query bug

KYLIN-1112 fix inverted index query bug


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b7cb41a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b7cb41a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b7cb41a8

Branch: refs/heads/KYLIN-1112
Commit: b7cb41a8b390b8b391fb038f5ce688dc03887089
Parents: bb3aa4f
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 4 20:28:11 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 5 09:53:17 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/hadoop/invertedindex/IITest.java  |  8 +++----
 .../org/apache/kylin/common/util/BytesUtil.java | 10 +++++++++
 .../measure/fixedlen/FixedHLLCodec.java         |  6 +++++
 .../measure/fixedlen/FixedLenMeasureCodec.java  |  4 ++++
 .../measure/fixedlen/FixedPointLongCodec.java   |  8 +++++++
 .../endpoint/EndpointAggregators.java           | 23 +++++++++++++-------
 .../endpoint/EndpointTupleIterator.java         |  7 +++---
 .../ii/coprocessor/endpoint/IIEndpoint.java     | 13 ++++++++---
 8 files changed, 61 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 800b615..200156a 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -1,6 +1,7 @@
 package org.apache.kylin.job.hadoop.invertedindex;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -15,8 +16,6 @@ import com.google.common.base.Function;
 import kafka.message.Message;
 import kafka.message.MessageAndOffset;
 
-import com.google.protobuf.HBaseZeroCopyByteString;
-
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -235,9 +234,10 @@ public class IITest extends LocalFileMetadataTestCase {
         System.out.println(response.getRowsList().size());
         Set<String> answers = Sets.newHashSet("120.4747", "26.8551");
         for (IIProtos.IIResponseInternal.IIRow responseRow : response.getRowsList()) {
-            byte[] measuresBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(responseRow.getMeasures());
-            List<Object> metrics = aggregators.deserializeMetricValues(measuresBytes, 0);
+            ByteBuffer bf = responseRow.getMeasures().asReadOnlyByteBuffer();
+            List<Object> metrics = aggregators.deserializeMetricValues(bf);
             Assert.assertTrue(answers.contains(metrics.get(0)));
+            answers.remove(metrics.get(0));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index 0d4dba9..e01ce4f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -57,6 +57,16 @@ public class BytesUtil {
         return integer;
     }
 
+
+    public static long readLong(ByteBuffer buffer, int size) {
+        long integer = 0;
+        for (int i = 0; i < size; i++) {
+            integer <<= 8;
+            integer |= (long) buffer.get() & 0xFF;
+        }
+        return integer;
+    }
+
     public static void writeUnsigned(int num, byte[] bytes, int offset, int size) {
         for (int i = offset + size - 1; i >= offset; i--) {
             bytes[i] = (byte) num;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
index 85dfe14..a0af6a2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
@@ -71,4 +71,10 @@ public class FixedHLLCodec extends FixedLenMeasureCodec<HyperLogLogPlusCounter>
     public void write(HyperLogLogPlusCounter v, byte[] buf, int offset) {
         v.writeRegistersArray(ByteBuffer.wrap(buf, offset, buf.length - offset));
     }
+
+    @Override
+    public HyperLogLogPlusCounter read(ByteBuffer buffer) {
+        current.readRegistersArray(buffer);
+        return current;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
index ad8c483..54c4eb8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
@@ -20,6 +20,8 @@ package org.apache.kylin.metadata.measure.fixedlen;
 
 import org.apache.kylin.metadata.model.DataType;
 
+import java.nio.ByteBuffer;
+
 abstract public class FixedLenMeasureCodec<T> {
 
     public static FixedLenMeasureCodec<?> get(DataType type) {
@@ -42,4 +44,6 @@ abstract public class FixedLenMeasureCodec<T> {
 
     abstract public void write(T v, byte[] buf, int offset);
 
+    abstract public T read(ByteBuffer buffer);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
index 4014c21..37eb0be 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
@@ -22,6 +22,8 @@ import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.model.DataType;
 
+import java.nio.ByteBuffer;
+
 public class FixedPointLongCodec extends FixedLenMeasureCodec<LongMutable> {
 
     private static final int SIZE = 8;
@@ -106,4 +108,10 @@ public class FixedPointLongCodec extends FixedLenMeasureCodec<LongMutable> {
     public void write(LongMutable v, byte[] buf, int offset) {
         BytesUtil.writeLong(v == null ? 0 : v.get(), buf, offset, SIZE);
     }
+
+    @Override
+    public LongMutable read(ByteBuffer buffer) {
+        current.set(BytesUtil.readLong(buffer, SIZE));
+        return current;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
index 3a0049d..95cca0d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
@@ -199,26 +199,26 @@ public class EndpointAggregators {
      * @param buffer byte buffer to get the metric data
      * @return length of metric data
      */
-    public int serializeMetricValues(MeasureAggregator[] aggrs, byte[] buffer) {
+    public int serializeMetricValues(MeasureAggregator[] aggrs, byte[] buffer, int offset) {
         for (int i = 0; i < funcNames.length; i++) {
             metricValues[i] = aggrs[i].getState();
         }
 
-        int metricBytesOffset = 0;
+        int metricBytesOffset = offset;
+        int length = 0;
         for (int i = 0; i < measureSerializers.length; i++) {
             measureSerializers[i].write(metricValues[i], buffer, metricBytesOffset);
             metricBytesOffset += measureSerializers[i].getLength();
+            length += measureSerializers[i].getLength();
         }
-        return metricBytesOffset;
+        return length;
     }
-
-    public List<Object> deserializeMetricValues(byte[] metricBytes, int offset) {
+    
+    public List<Object> deserializeMetricValues(ByteBuffer buffer) {
         List<Object> ret = Lists.newArrayList();
-        int metricBytesOffset = offset;
         for (int i = 0; i < measureSerializers.length; i++) {
-            measureSerializers[i].read(metricBytes, metricBytesOffset);
+            measureSerializers[i].read(buffer);
             Object valueString = measureSerializers[i].getValue();
-            metricBytesOffset += measureSerializers[i].getLength();
             ret.add(valueString);
         }
         return ret;
@@ -279,4 +279,11 @@ public class EndpointAggregators {
 
     }
 
+    public int getMeasureSerializeLength() {
+        int length = 0;
+        for (int i = 0; i < this.measureSerializers.length; ++i) {
+            length += this.measureSerializers[i].getLength();
+        }
+        return length;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
index 6d3ec4d..99db123 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -28,6 +29,7 @@ import java.util.Map;
 
 import javax.annotation.Nullable;
 
+import com.google.protobuf.ByteString;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.hbase.client.HConnection;
@@ -340,9 +342,8 @@ public class EndpointTupleIterator implements ITupleIterator {
             byte[] columnsBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getColumns());
             this.tableRecord.setBytes(columnsBytes, 0, columnsBytes.length);
             if (currentRow.hasMeasures()) {
-                byte[] measuresBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getMeasures());
-
-                this.measureValues = pushedDownAggregators.deserializeMetricValues(measuresBytes, 0);
+                ByteBuffer buffer = currentRow.getMeasures().asReadOnlyByteBuffer();
+                this.measureValues = pushedDownAggregators.deserializeMetricValues(buffer);
             }
 
             index++;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b7cb41a8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
index 6173241..e4360a2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
@@ -205,7 +205,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
         RowKeyColumnIO rowKeyColumnIO = new RowKeyColumnIO(clearTextDictionary);
 
         byte[] recordBuffer = new byte[recordInfo.getByteFormLen()];
-        final byte[] buffer = new byte[CoprocessorConstants.METRIC_SERIALIZE_BUFFER_SIZE];
+        byte[] buffer = new byte[CoprocessorConstants.METRIC_SERIALIZE_BUFFER_SIZE];
 
         int iteratedSliceCount = 0;
         long latestSliceTs = Long.MIN_VALUE;
@@ -267,11 +267,18 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
         logger.info("Iterated Slices count: " + iteratedSliceCount);
 
         if (needAgg) {
+            int offset = 0;
+            int measureLength = aggregators.getMeasureSerializeLength();
             for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
                 AggrKey aggrKey = entry.getKey();
                 IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(aggrKey.get(), aggrKey.offset(), aggrKey.length()));
-                int length = aggregators.serializeMetricValues(entry.getValue(), buffer);
-                rowBuilder.setMeasures(HBaseZeroCopyByteString.wrap(buffer, 0, length));
+                if (offset + measureLength > buffer.length) {
+                    buffer =  new byte[CoprocessorConstants.METRIC_SERIALIZE_BUFFER_SIZE];
+                    offset = 0;
+                }
+                int length = aggregators.serializeMetricValues(entry.getValue(), buffer, offset);
+                rowBuilder.setMeasures(HBaseZeroCopyByteString.wrap(buffer, offset, length));
+                offset += length;
                 responseBuilder.addRows(rowBuilder.build());
             }
         }