You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ga...@apache.org on 2017/03/23 02:34:24 UTC

kylin git commit: KYLIN-2501 bugfix, pass IT

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2501 10fc77660 -> 498aa2562


KYLIN-2501 bugfix, pass IT


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/498aa256
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/498aa256
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/498aa256

Branch: refs/heads/KYLIN-2501
Commit: 498aa256224ff8e74100b5dade6bd9edaad8eb28
Parents: 10fc776
Author: gaodayue <ga...@meituan.com>
Authored: Wed Mar 22 16:31:45 2017 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Wed Mar 22 17:53:13 2017 +0800

----------------------------------------------------------------------
 .../gridtable/GTStreamAggregateScanner.java     |  24 +++--
 .../apache/kylin/storage/StorageContext.java    |  12 ---
 .../gtrecord/GTCubeStorageQueryBase.java        |   7 --
 .../storage/gtrecord/PartitionResultMerger.java | 100 -------------------
 .../gtrecord/SegmentCubeTupleIterator.java      |   7 +-
 .../SortMergedPartitionResultIterator.java      |  81 +++++++++++++++
 .../gtrecord/StorageResponseGTScatter.java      |  13 ++-
 7 files changed, 108 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
index 1fde423..4eb5791 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.gridtable;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import org.apache.kylin.GTForwardingScanner;
@@ -38,11 +39,10 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
     private final GTScanRequest req;
     private final Comparator<GTRecord> keyComparator;
 
-    public GTStreamAggregateScanner(IGTScanner delegated,
-            GTScanRequest req, Comparator<GTRecord> keyComparator) {
+    public GTStreamAggregateScanner(IGTScanner delegated, GTScanRequest scanRequest) {
         super(delegated);
-        this.req = req;
-        this.keyComparator = keyComparator;
+        this.req = Preconditions.checkNotNull(scanRequest, "scanRequest");
+        this.keyComparator = GTRecord.getComparator(scanRequest.getAggrGroupBy());
     }
 
     @Override
@@ -172,14 +172,22 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
     private class StreamMergeValuesIterator extends AbstractStreamMergeIterator<Object[]> {
 
         private int[] gtDimsIdx;
-        private int[] gtMetricsIdx;
+        private int[] gtMetricsIdx; // specify which metric to return and their order
+        private int[] aggIdx; // specify the ith returning metric's aggStates index
+
         private Object[] result; // avoid object creation
 
         StreamMergeValuesIterator(Iterator<GTRecord> input, int[] gtDimsIdx, int[] gtMetricsIdx) {
             super(input);
             this.gtDimsIdx = gtDimsIdx;
             this.gtMetricsIdx = gtMetricsIdx;
-            result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+            this.aggIdx = new int[gtMetricsIdx.length];
+            for (int i = 0; i < aggIdx.length; i++) {
+                int metricIdx = gtMetricsIdx[i];
+                aggIdx[i] = metrics.trueBitIndexOf(metricIdx);
+            }
+
+            this.result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
         }
 
         private void decodeAndSetDimensions(GTRecord record) {
@@ -202,8 +210,8 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
         protected Object[] finalizeResult(GTRecord record, Object[] aggStates) {
             decodeAndSetDimensions(record);
             // set metrics
-            for (int i = 0; i < gtMetricsIdx.length; i++) {
-                result[gtDimsIdx.length + i] = aggStates[i];
+            for (int i = 0; i < aggIdx.length; i++) {
+                result[gtDimsIdx.length + i] = aggStates[aggIdx[i]];
             }
             return result;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index bb17054..4522261 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -18,12 +18,10 @@
 
 package org.apache.kylin.storage;
 
-import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
 import org.slf4j.Logger;
@@ -49,9 +47,7 @@ public class StorageContext {
     private boolean exactAggregation = false;
     private boolean needStorageAggregation = false;
     private boolean enableCoprocessor = false;
-
     private boolean enableStreamAggregate = false;
-    private Comparator<GTRecord> groupKeyComparator;
 
     private IStorageQuery storageQuery;
     private AtomicLong processedRowCount = new AtomicLong();
@@ -242,12 +238,4 @@ public class StorageContext {
     public void enableStreamAggregate() {
         this.enableStreamAggregate = true;
     }
-
-    public Comparator<GTRecord> getGroupKeyComparator() {
-        return groupKeyComparator;
-    }
-
-    public void setGroupKeyComparator(Comparator<GTRecord> groupKeyComparator) {
-        this.groupKeyComparator = groupKeyComparator;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 82590a2..d91a0b4 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,18 +26,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.RawQueryLastHacker;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
@@ -392,11 +389,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
 
         if (enabled) {
-            CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
-            ImmutableBitSet cols = mapping.makeGridTableColumns(groupsD);
-
             context.enableStreamAggregate();
-            context.setGroupKeyComparator(GTRecord.getComparator(cols));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
deleted file mode 100644
index 52029d3..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.gtrecord;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-import com.google.common.collect.UnmodifiableIterator;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.PriorityQueue;
-
-/**
- * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
- */
-public class PartitionResultMerger implements Iterable<GTRecord> {
-    private final ImmutableList<PartitionResultIterator> partitionResults;
-    private final GTInfo info;
-    private final Comparator<GTRecord> comparator;
-
-    public PartitionResultMerger(
-            Iterable<PartitionResultIterator> partitionResults,
-            GTInfo info, Comparator<GTRecord> comparator) {
-        this.partitionResults = ImmutableList.copyOf(partitionResults);
-        this.info = info;
-        this.comparator = comparator;
-    }
-
-    @Override
-    public Iterator<GTRecord> iterator() {
-        if (partitionResults.size() == 1) {
-            return partitionResults.get(0);
-        }
-        return new MergingResultsIterator();
-    }
-
-    private class MergingResultsIterator extends UnmodifiableIterator<GTRecord> {
-        final GTRecord record = new GTRecord(info); // reuse to avoid object creation
-
-        PriorityQueue<PeekingIterator<GTRecord>> heap;
-
-        MergingResultsIterator() {
-            Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
-                public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
-                    return comparator.compare(o1.peek(), o2.peek());
-                }
-            };
-            this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
-
-            for (PartitionResultIterator it : partitionResults) {
-                if (it.hasNext()) {
-                    heap.offer(Iterators.peekingIterator(it));
-                }
-            }
-        }
-
-        @Override
-        public boolean hasNext() {
-            return !heap.isEmpty();
-        }
-
-        @Override
-        public GTRecord next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            // get smallest record
-            PeekingIterator<GTRecord> it = heap.poll();
-            // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
-            // so we must make a shallow copy of it.
-            record.shallowCopyFrom(it.next());
-
-            if (it.hasNext()) {
-                heap.offer(it);
-            }
-
-            return record;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 11f766c..3bac5ec 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -90,8 +90,8 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
             final Iterator<GTRecord> records, final GTScanRequest scanRequest,
             final int[] gtDimsIdx, final int[] gtMetricsIdx) {
 
-        boolean singlePartitionResult = records instanceof PartitionResultIterator;
-        if (context.isStreamAggregateEnabled() && !singlePartitionResult) {
+        boolean hasMultiplePartitions = records instanceof SortMergedPartitionResultIterator;
+        if (hasMultiplePartitions && context.isStreamAggregateEnabled()) {
             // input records are ordered, leverage stream aggregator to produce possibly fewer records
             IGTScanner inputScanner = new IGTScanner() {
                 public GTInfo getInfo() {
@@ -104,8 +104,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
                     return records;
                 }
             };
-            GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(
-                    inputScanner, scanRequest, context.getGroupKeyComparator());
+            GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(inputScanner, scanRequest);
             return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx);
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
new file mode 100644
index 0000000..21e61e3
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+/**
+ * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
+ */
+public class SortMergedPartitionResultIterator extends UnmodifiableIterator<GTRecord> {
+
+    final GTRecord record ; // reuse to avoid object creation
+    PriorityQueue<PeekingIterator<GTRecord>> heap;
+
+    SortMergedPartitionResultIterator(
+            List<PartitionResultIterator> partitionResults,
+            GTInfo info, final Comparator<GTRecord> comparator) {
+
+        this.record = new GTRecord(info);
+        Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
+            public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
+                return comparator.compare(o1.peek(), o2.peek());
+            }
+        };
+        this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
+
+        for (PartitionResultIterator it : partitionResults) {
+            if (it.hasNext()) {
+                heap.offer(Iterators.peekingIterator(it));
+            }
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !heap.isEmpty();
+    }
+
+    @Override
+    public GTRecord next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        // get smallest record
+        PeekingIterator<GTRecord> it = heap.poll();
+        // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
+        // so we must make a shallow copy of it.
+        record.shallowCopyFrom(it.next());
+
+        if (it.hasNext()) {
+            heap.offer(it);
+        }
+
+        return record;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
index 0f1e191..f1ab20c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -44,7 +44,7 @@ public class StorageResponseGTScatter implements IGTScanner {
     private IPartitionStreamer partitionStreamer;
     private final Iterator<byte[]> blocks;
     private final ImmutableBitSet columns;
-    private final StorageContext context;
+    private final ImmutableBitSet groupByDims;
     private final boolean needSorted; // whether scanner should return sorted records
 
     public StorageResponseGTScatter(GTScanRequest scanRequest, IPartitionStreamer partitionStreamer, StorageContext context) {
@@ -52,7 +52,7 @@ public class StorageResponseGTScatter implements IGTScanner {
         this.partitionStreamer = partitionStreamer;
         this.blocks = partitionStreamer.asByteArrayIterator();
         this.columns = scanRequest.getColumns();
-        this.context = context;
+        this.groupByDims = scanRequest.getAggrGroupBy();
         this.needSorted = (context.getFinalPushDownLimit() != Integer.MAX_VALUE) || context.isStreamAggregateEnabled();
     }
 
@@ -74,13 +74,16 @@ public class StorageResponseGTScatter implements IGTScanner {
             partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns));
         }
 
+        if (partitionResults.size() == 1) {
+            return partitionResults.get(0);
+        }
+
         if (!needSorted) {
             logger.debug("Using Iterators.concat to merge partition results");
             return Iterators.concat(partitionResults.iterator());
         }
 
-        logger.debug("Using PartitionResultMerger to merge partition results");
-        PartitionResultMerger merger = new PartitionResultMerger(partitionResults, info, context.getGroupKeyComparator());
-        return merger.iterator();
+        logger.debug("Using SortMergedPartitionResultIterator to merge partition results");
+        return new SortMergedPartitionResultIterator(partitionResults, info, GTRecord.getComparator(groupByDims));
     }
 }