You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ga...@apache.org on 2017/03/23 02:34:24 UTC
kylin git commit: KYLIN-2501 bugfix, pass IT
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2501 10fc77660 -> 498aa2562
KYLIN-2501 bugfix, pass IT
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/498aa256
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/498aa256
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/498aa256
Branch: refs/heads/KYLIN-2501
Commit: 498aa256224ff8e74100b5dade6bd9edaad8eb28
Parents: 10fc776
Author: gaodayue <ga...@meituan.com>
Authored: Wed Mar 22 16:31:45 2017 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Wed Mar 22 17:53:13 2017 +0800
----------------------------------------------------------------------
.../gridtable/GTStreamAggregateScanner.java | 24 +++--
.../apache/kylin/storage/StorageContext.java | 12 ---
.../gtrecord/GTCubeStorageQueryBase.java | 7 --
.../storage/gtrecord/PartitionResultMerger.java | 100 -------------------
.../gtrecord/SegmentCubeTupleIterator.java | 7 +-
.../SortMergedPartitionResultIterator.java | 81 +++++++++++++++
.../gtrecord/StorageResponseGTScatter.java | 13 ++-
7 files changed, 108 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
index 1fde423..4eb5791 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
@@ -18,6 +18,7 @@
package org.apache.kylin.gridtable;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.kylin.GTForwardingScanner;
@@ -38,11 +39,10 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
private final GTScanRequest req;
private final Comparator<GTRecord> keyComparator;
- public GTStreamAggregateScanner(IGTScanner delegated,
- GTScanRequest req, Comparator<GTRecord> keyComparator) {
+ public GTStreamAggregateScanner(IGTScanner delegated, GTScanRequest scanRequest) {
super(delegated);
- this.req = req;
- this.keyComparator = keyComparator;
+ this.req = Preconditions.checkNotNull(scanRequest, "scanRequest");
+ this.keyComparator = GTRecord.getComparator(scanRequest.getAggrGroupBy());
}
@Override
@@ -172,14 +172,22 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
private class StreamMergeValuesIterator extends AbstractStreamMergeIterator<Object[]> {
private int[] gtDimsIdx;
- private int[] gtMetricsIdx;
+ private int[] gtMetricsIdx; // specify which metric to return and their order
+ private int[] aggIdx; // specify the ith returning metric's aggStates index
+
private Object[] result; // avoid object creation
StreamMergeValuesIterator(Iterator<GTRecord> input, int[] gtDimsIdx, int[] gtMetricsIdx) {
super(input);
this.gtDimsIdx = gtDimsIdx;
this.gtMetricsIdx = gtMetricsIdx;
- result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+ this.aggIdx = new int[gtMetricsIdx.length];
+ for (int i = 0; i < aggIdx.length; i++) {
+ int metricIdx = gtMetricsIdx[i];
+ aggIdx[i] = metrics.trueBitIndexOf(metricIdx);
+ }
+
+ this.result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
}
private void decodeAndSetDimensions(GTRecord record) {
@@ -202,8 +210,8 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
protected Object[] finalizeResult(GTRecord record, Object[] aggStates) {
decodeAndSetDimensions(record);
// set metrics
- for (int i = 0; i < gtMetricsIdx.length; i++) {
- result[gtDimsIdx.length + i] = aggStates[i];
+ for (int i = 0; i < aggIdx.length; i++) {
+ result[gtDimsIdx.length + i] = aggStates[aggIdx[i]];
}
return result;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index bb17054..4522261 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -18,12 +18,10 @@
package org.apache.kylin.storage;
-import java.util.Comparator;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
import org.slf4j.Logger;
@@ -49,9 +47,7 @@ public class StorageContext {
private boolean exactAggregation = false;
private boolean needStorageAggregation = false;
private boolean enableCoprocessor = false;
-
private boolean enableStreamAggregate = false;
- private Comparator<GTRecord> groupKeyComparator;
private IStorageQuery storageQuery;
private AtomicLong processedRowCount = new AtomicLong();
@@ -242,12 +238,4 @@ public class StorageContext {
public void enableStreamAggregate() {
this.enableStreamAggregate = true;
}
-
- public Comparator<GTRecord> getGroupKeyComparator() {
- return groupKeyComparator;
- }
-
- public void setGroupKeyComparator(Comparator<GTRecord> groupKeyComparator) {
- this.groupKeyComparator = groupKeyComparator;
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 82590a2..d91a0b4 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,18 +26,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.RawQueryLastHacker;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
@@ -392,11 +389,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
if (enabled) {
- CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
- ImmutableBitSet cols = mapping.makeGridTableColumns(groupsD);
-
context.enableStreamAggregate();
- context.setGroupKeyComparator(GTRecord.getComparator(cols));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
deleted file mode 100644
index 52029d3..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.gtrecord;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-import com.google.common.collect.UnmodifiableIterator;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.PriorityQueue;
-
-/**
- * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
- */
-public class PartitionResultMerger implements Iterable<GTRecord> {
- private final ImmutableList<PartitionResultIterator> partitionResults;
- private final GTInfo info;
- private final Comparator<GTRecord> comparator;
-
- public PartitionResultMerger(
- Iterable<PartitionResultIterator> partitionResults,
- GTInfo info, Comparator<GTRecord> comparator) {
- this.partitionResults = ImmutableList.copyOf(partitionResults);
- this.info = info;
- this.comparator = comparator;
- }
-
- @Override
- public Iterator<GTRecord> iterator() {
- if (partitionResults.size() == 1) {
- return partitionResults.get(0);
- }
- return new MergingResultsIterator();
- }
-
- private class MergingResultsIterator extends UnmodifiableIterator<GTRecord> {
- final GTRecord record = new GTRecord(info); // reuse to avoid object creation
-
- PriorityQueue<PeekingIterator<GTRecord>> heap;
-
- MergingResultsIterator() {
- Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
- public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
- return comparator.compare(o1.peek(), o2.peek());
- }
- };
- this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
-
- for (PartitionResultIterator it : partitionResults) {
- if (it.hasNext()) {
- heap.offer(Iterators.peekingIterator(it));
- }
- }
- }
-
- @Override
- public boolean hasNext() {
- return !heap.isEmpty();
- }
-
- @Override
- public GTRecord next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- // get smallest record
- PeekingIterator<GTRecord> it = heap.poll();
- // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
- // so we must make a shallow copy of it.
- record.shallowCopyFrom(it.next());
-
- if (it.hasNext()) {
- heap.offer(it);
- }
-
- return record;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 11f766c..3bac5ec 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -90,8 +90,8 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
final Iterator<GTRecord> records, final GTScanRequest scanRequest,
final int[] gtDimsIdx, final int[] gtMetricsIdx) {
- boolean singlePartitionResult = records instanceof PartitionResultIterator;
- if (context.isStreamAggregateEnabled() && !singlePartitionResult) {
+ boolean hasMultiplePartitions = records instanceof SortMergedPartitionResultIterator;
+ if (hasMultiplePartitions && context.isStreamAggregateEnabled()) {
// input records are ordered, leverage stream aggregator to produce possibly fewer records
IGTScanner inputScanner = new IGTScanner() {
public GTInfo getInfo() {
@@ -104,8 +104,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
return records;
}
};
- GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(
- inputScanner, scanRequest, context.getGroupKeyComparator());
+ GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(inputScanner, scanRequest);
return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
new file mode 100644
index 0000000..21e61e3
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+/**
+ * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
+ */
+public class SortMergedPartitionResultIterator extends UnmodifiableIterator<GTRecord> {
+
+ final GTRecord record ; // reuse to avoid object creation
+ PriorityQueue<PeekingIterator<GTRecord>> heap;
+
+ SortMergedPartitionResultIterator(
+ List<PartitionResultIterator> partitionResults,
+ GTInfo info, final Comparator<GTRecord> comparator) {
+
+ this.record = new GTRecord(info);
+ Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
+ public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
+ return comparator.compare(o1.peek(), o2.peek());
+ }
+ };
+ this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
+
+ for (PartitionResultIterator it : partitionResults) {
+ if (it.hasNext()) {
+ heap.offer(Iterators.peekingIterator(it));
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !heap.isEmpty();
+ }
+
+ @Override
+ public GTRecord next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ // get smallest record
+ PeekingIterator<GTRecord> it = heap.poll();
+ // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
+ // so we must make a shallow copy of it.
+ record.shallowCopyFrom(it.next());
+
+ if (it.hasNext()) {
+ heap.offer(it);
+ }
+
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/498aa256/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
index 0f1e191..f1ab20c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -44,7 +44,7 @@ public class StorageResponseGTScatter implements IGTScanner {
private IPartitionStreamer partitionStreamer;
private final Iterator<byte[]> blocks;
private final ImmutableBitSet columns;
- private final StorageContext context;
+ private final ImmutableBitSet groupByDims;
private final boolean needSorted; // whether scanner should return sorted records
public StorageResponseGTScatter(GTScanRequest scanRequest, IPartitionStreamer partitionStreamer, StorageContext context) {
@@ -52,7 +52,7 @@ public class StorageResponseGTScatter implements IGTScanner {
this.partitionStreamer = partitionStreamer;
this.blocks = partitionStreamer.asByteArrayIterator();
this.columns = scanRequest.getColumns();
- this.context = context;
+ this.groupByDims = scanRequest.getAggrGroupBy();
this.needSorted = (context.getFinalPushDownLimit() != Integer.MAX_VALUE) || context.isStreamAggregateEnabled();
}
@@ -74,13 +74,16 @@ public class StorageResponseGTScatter implements IGTScanner {
partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns));
}
+ if (partitionResults.size() == 1) {
+ return partitionResults.get(0);
+ }
+
if (!needSorted) {
logger.debug("Using Iterators.concat to merge partition results");
return Iterators.concat(partitionResults.iterator());
}
- logger.debug("Using PartitionResultMerger to merge partition results");
- PartitionResultMerger merger = new PartitionResultMerger(partitionResults, info, context.getGroupKeyComparator());
- return merger.iterator();
+ logger.debug("Using SortMergedPartitionResultIterator to merge partition results");
+ return new SortMergedPartitionResultIterator(partitionResults, info, GTRecord.getComparator(groupByDims));
}
}