You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/05 02:54:19 UTC
[05/31] incubator-kylin git commit: KYLIN-1094 use Kyro Serializer
for SparkCubing
KYLIN-1094 use Kyro Serializer for SparkCubing
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/730739f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/730739f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/730739f7
Branch: refs/heads/KYLIN-1112
Commit: 730739f70797f296cb9cb0a1cb94c8d7649f769e
Parents: 6ab480c
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Oct 29 11:30:06 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 5 09:53:15 2015 +0800
----------------------------------------------------------------------
commit_SHA1 | 16 --
.../cube/inmemcubing/InMemCubeBuilder.java | 159 ++-----------------
.../InMemCubeBuilderInputConverter.java | 146 +++++++++++++++++
.../cube/inmemcubing/InMemCubeBuilderUtils.java | 85 ++++++++++
.../apache/kylin/engine/spark/SparkCubing.java | 86 ++++++++--
5 files changed, 316 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/730739f7/commit_SHA1
----------------------------------------------------------------------
diff --git a/commit_SHA1 b/commit_SHA1
deleted file mode 100644
index d2f3970..0000000
--- a/commit_SHA1
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-b66c25803a2f976cca067148278dbe7d7b0d79ef
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/730739f7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 7d943a3..84abd47 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -17,14 +17,7 @@
package org.apache.kylin.cube.inmemcubing;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -32,11 +25,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.topn.Counter;
import org.apache.kylin.common.topn.TopNCounter;
-import org.apache.kylin.common.util.*;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.gridtable.CubeGridTable;
-import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.dict.Dictionary;
@@ -50,14 +44,12 @@ import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.measure.DoubleMutable;
import org.apache.kylin.metadata.measure.LongMutable;
import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
/**
* Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.
@@ -87,7 +79,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private CuboidResult baseResult;
private Object[] totalSumForSanityCheck;
private ICuboidCollector resultCollector;
- private Map<Integer, Dictionary<String>> topNDisplayColDictMap;
public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
super(cubeDesc, dictionaryMap);
@@ -100,35 +91,16 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
this.measureCount = cubeDesc.getMeasures().size();
this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
- Map<String, Integer> measureIndexMap = Maps.newHashMap();
List<String> metricsAggrFuncsList = Lists.newArrayList();
for (int i = 0; i < measureCount; i++) {
MeasureDesc measureDesc = measureDescs[i];
metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
- measureIndexMap.put(measureDesc.getName(), i);
}
this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
-
- initTopNDisplayColDictionaryMap();
}
- private void initTopNDisplayColDictionaryMap() {
- topNDisplayColDictMap = Maps.newHashMap();
- for (int measureIdx = 0; measureIdx < cubeDesc.getMeasures().size(); measureIdx++) {
- MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
- FunctionDesc func = measureDesc.getFunction();
- if (func.isTopN()) {
- int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
- int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
- TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
- @SuppressWarnings("unchecked")
- Dictionary<String> dictionary = (Dictionary<String>) dictionaryMap.get(displayCol);
- assert dictionary != null;
- topNDisplayColDictMap.put(displayColIdx, dictionary);
- }
- }
- }
+
private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
@@ -142,14 +114,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
return gridTable;
}
- private Pair<ImmutableBitSet, ImmutableBitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
- BitSet bitSet = BitSet.valueOf(new long[] { cuboidId });
- BitSet dimension = new BitSet();
- dimension.set(0, bitSet.cardinality());
- BitSet metrics = new BitSet();
- metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
- return new Pair<ImmutableBitSet, ImmutableBitSet>(new ImmutableBitSet(dimension), new ImmutableBitSet(metrics));
- }
@Override
public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
@@ -376,7 +340,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
int mbBefore = getSystemAvailMB();
int mbAfter = 0;
- Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
+ Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
@@ -442,27 +406,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
private CuboidResult aggregateCuboid(CuboidResult parent, long cuboidId) throws IOException {
- Pair<ImmutableBitSet, ImmutableBitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parent.cuboidId);
- ImmutableBitSet parentDimensions = columnBitSets.getFirst();
- ImmutableBitSet measureColumns = columnBitSets.getSecond();
- ImmutableBitSet childDimensions = parentDimensions;
-
- long mask = Long.highestOneBit(parent.cuboidId);
- long childCuboidId = cuboidId;
- long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parent.cuboidId);
- int index = 0;
- for (int i = 0; i < parentCuboidIdActualLength; i++) {
- if ((mask & parent.cuboidId) > 0) {
- if ((mask & childCuboidId) == 0) {
- // this dim will be aggregated
- childDimensions = childDimensions.set(index, false);
- }
- index++;
- }
- mask = mask >> 1;
- }
-
- return scanAndAggregateGridTable(parent.table, parent.cuboidId, cuboidId, childDimensions, measureColumns);
+ final Pair<ImmutableBitSet, ImmutableBitSet> allNeededColumns = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(parent.cuboidId, cuboidId, measureCount);
+ return scanAndAggregateGridTable(parent.table, parent.cuboidId, cuboidId, allNeededColumns.getFirst(), allNeededColumns.getSecond());
}
private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
@@ -557,12 +502,15 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
GTInfo info;
GTRecord record;
BlockingQueue<List<String>> input;
- ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ final InMemCubeBuilderInputConverter inMemCubeBuilderInputConverter;
public InputConverter(GTInfo info, BlockingQueue<List<String>> input) {
this.info = info;
this.input = input;
this.record = new GTRecord(info);
+ this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc,
+ InMemCubeBuilderUtils.createTopNDisplayColDictionaryMap(cubeDesc, intermediateTableDesc, dictionaryMap),
+ info);
}
@Override
@@ -586,7 +534,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
if (currentObject.size() == 0)
throw new IllegalStateException();
- buildGTRecord(currentObject, record);
+ inMemCubeBuilderInputConverter.convert(currentObject, record);
return record;
}
@@ -611,86 +559,5 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
return 0;
}
- private void buildGTRecord(List<String> row, GTRecord record) {
- Object[] dimensions = buildKey(row);
- Object[] metricsValues = buildValue(row);
- Object[] recordValues = new Object[dimensions.length + metricsValues.length];
- System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
- System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
- record.setValues(recordValues);
- }
-
- private Object[] buildKey(List<String> row) {
- int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
- Object[] key = new Object[keySize];
-
- for (int i = 0; i < keySize; i++) {
- key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
- }
-
- return key;
- }
-
- private Object[] buildValue(List<String> row) {
-
- Object[] values = new Object[measureCount];
- MeasureDesc measureDesc = null;
-
- for (int i = 0; i < measureCount; i++) {
- measureDesc = measureDescs[i];
-
- Object value = null;
- int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
- FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction();
- if (flatTableIdx == null) {
- value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
- } else if (function.isCount() || function.isHolisticCountDistinct()) {
- // note for holistic count distinct, this value will be ignored
- value = ONE;
- } else if (function.isTopN()) {
- // encode the key column with dict, and get the counter column;
- int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
- Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex);
- int keyColEncoded = displayColDict.getIdFromValue(row.get(keyColIndex));
- valueBuf.clear();
- valueBuf.putInt(displayColDict.getSizeOfId());
- valueBuf.putInt(keyColEncoded);
- if (flatTableIdx.length == 1) {
- // only displayCol, use 1.0 as counter
- valueBuf.putDouble(1.0);
- } else {
- // get the counter column value
- valueBuf.putDouble(Double.valueOf(row.get(flatTableIdx[0])));
- }
-
- value = measureCodec.getSerializer(i).valueOf(valueBuf.array());
-
- } else if (flatTableIdx.length == 1) {
- value = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0])));
- } else {
-
- byte[] result = null;
- for (int x = 0; x < flatTableIdx.length; x++) {
- byte[] split = toBytes(row.get(flatTableIdx[x]));
- if (result == null) {
- result = Arrays.copyOf(split, split.length);
- } else {
- byte[] newResult = new byte[result.length + split.length];
- System.arraycopy(result, 0, newResult, 0, result.length);
- System.arraycopy(split, 0, newResult, result.length, split.length);
- result = newResult;
- }
- }
- value = measureCodec.getSerializer(i).valueOf(result);
- }
- values[i] = value;
- }
- return values;
- }
-
- private byte[] toBytes(String v) {
- return v == null ? null : Bytes.toBytes(v);
- }
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/730739f7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
new file mode 100644
index 0000000..d9099ce
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.cube.inmemcubing;
+
+import com.google.common.base.Preconditions;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class InMemCubeBuilderInputConverter {
+
+ private static final LongMutable ONE = new LongMutable(1l);
+
+ private final CubeDesc cubeDesc;
+ private final CubeJoinedFlatTableDesc intermediateTableDesc;
+ private final MeasureDesc[] measureDescs;
+ private final MeasureCodec measureCodec;
+ private final int measureCount;
+ private final ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ private final Map<Integer, Dictionary<String>> topNDisplayColDictMap;
+ private final GTInfo gtInfo;
+
+
+ public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<Integer, Dictionary<String>> topNDisplayColDictMap, GTInfo gtInfo) {
+ this.cubeDesc = cubeDesc;
+ this.gtInfo = gtInfo;
+ this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+ this.measureCount = cubeDesc.getMeasures().size();
+ this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
+ this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+ this.topNDisplayColDictMap = Preconditions.checkNotNull(topNDisplayColDictMap, "topNDisplayColDictMap cannot be null");
+ }
+
+ public final GTRecord convert(List<String> row) {
+ final GTRecord record = new GTRecord(gtInfo);
+ convert(row, record);
+ return record;
+ }
+
+ public final void convert(List<String> row, GTRecord record) {
+ Object[] dimensions = buildKey(row);
+ Object[] metricsValues = buildValue(row);
+ Object[] recordValues = new Object[dimensions.length + metricsValues.length];
+ System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
+ System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
+ record.setValues(recordValues);
+ }
+
+ private Object[] buildKey(List<String> row) {
+ int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
+ Object[] key = new Object[keySize];
+
+ for (int i = 0; i < keySize; i++) {
+ key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+ }
+
+ return key;
+ }
+
+ private Object[] buildValue(List<String> row) {
+
+ Object[] values = new Object[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ MeasureDesc measureDesc = measureDescs[i];
+ int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
+ FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction();
+ if (flatTableIdx == null) {
+ values[i] = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
+ } else if (function.isCount() || function.isHolisticCountDistinct()) {
+ // note for holistic count distinct, this value will be ignored
+ values[i] = ONE;
+ } else if (function.isTopN()) {
+ // encode the key column with dict, and get the counter column;
+ int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
+ Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex);
+ int keyColEncoded = displayColDict.getIdFromValue(row.get(keyColIndex));
+ valueBuf.clear();
+ valueBuf.putInt(displayColDict.getSizeOfId());
+ valueBuf.putInt(keyColEncoded);
+ if (flatTableIdx.length == 1) {
+ // only displayCol, use 1.0 as counter
+ valueBuf.putDouble(1.0);
+ } else {
+ // get the counter column value
+ valueBuf.putDouble(Double.valueOf(row.get(flatTableIdx[0])));
+ }
+
+ values[i] = measureCodec.getSerializer(i).valueOf(valueBuf.array());
+
+ } else if (flatTableIdx.length == 1) {
+ values[i] = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0])));
+ } else {
+
+ byte[] result = null;
+ for (int x = 0; x < flatTableIdx.length; x++) {
+ byte[] split = toBytes(row.get(flatTableIdx[x]));
+ if (result == null) {
+ result = Arrays.copyOf(split, split.length);
+ } else {
+ byte[] newResult = new byte[result.length + split.length];
+ System.arraycopy(result, 0, newResult, 0, result.length);
+ System.arraycopy(split, 0, newResult, result.length, split.length);
+ result = newResult;
+ }
+ }
+ values[i] = measureCodec.getSerializer(i).valueOf(result);
+ }
+ }
+ return values;
+ }
+
+ private byte[] toBytes(String v) {
+ return v == null ? null : Bytes.toBytes(v);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/730739f7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
new file mode 100644
index 0000000..3b68d47
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.cube.inmemcubing;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ */
+public final class InMemCubeBuilderUtils {
+
+ public static final HashMap<Integer, Dictionary<String>> createTopNDisplayColDictionaryMap(CubeDesc cubeDesc, CubeJoinedFlatTableDesc intermediateTableDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ HashMap<Integer, Dictionary<String>> result = Maps.newHashMap();
+ for (int measureIdx = 0; measureIdx < cubeDesc.getMeasures().size(); measureIdx++) {
+ MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
+ FunctionDesc func = measureDesc.getFunction();
+ if (func.isTopN()) {
+ int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
+ int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
+ TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
+ @SuppressWarnings("unchecked")
+ Dictionary<String> dictionary = (Dictionary<String>) dictionaryMap.get(displayCol);
+ result.put(displayColIdx, Preconditions.checkNotNull(dictionary));
+ }
+ }
+ return result;
+ }
+
+ public static final Pair<ImmutableBitSet, ImmutableBitSet> getDimensionAndMetricColumnBitSet(final long cuboidId, final int measureCount) {
+ BitSet bitSet = BitSet.valueOf(new long[] { cuboidId });
+ BitSet dimension = new BitSet();
+ dimension.set(0, bitSet.cardinality());
+ BitSet metrics = new BitSet();
+ metrics.set(bitSet.cardinality(), bitSet.cardinality() + measureCount);
+ return Pair.newPair(new ImmutableBitSet(dimension), new ImmutableBitSet(metrics));
+ }
+
+ public static final Pair<ImmutableBitSet, ImmutableBitSet> getDimensionAndMetricColumnBitSet(final long baseCuboidId, final long childCuboidId, final int measureCount) {
+ final Pair<ImmutableBitSet, ImmutableBitSet> parentDimensionAndMetricColumnBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
+ ImmutableBitSet parentDimensions = parentDimensionAndMetricColumnBitSet.getFirst();
+ ImmutableBitSet measureColumns = parentDimensionAndMetricColumnBitSet.getSecond();
+ ImmutableBitSet childDimensions = parentDimensions;
+ long mask = Long.highestOneBit(baseCuboidId);
+ long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);
+ int index = 0;
+ for (int i = 0; i < parentCuboidIdActualLength; i++) {
+ if ((mask & baseCuboidId) > 0) {
+ if ((mask & childCuboidId) == 0) {
+ // this dim will be aggregated
+ childDimensions = childDimensions.set(index, false);
+ }
+ index++;
+ }
+ mask = mask >> 1;
+ }
+ return Pair.newPair(childDimensions, measureColumns);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/730739f7/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 0e5081e..e4fcc16 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -17,9 +17,8 @@
*/
package org.apache.kylin.engine.spark;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import com.google.common.base.*;
+import com.google.common.collect.*;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
@@ -28,6 +27,7 @@ import com.google.common.primitives.UnsignedBytes;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -54,6 +54,7 @@ import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.*;
import org.apache.kylin.cube.util.CubingUtils;
+import org.apache.kylin.dict.*;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.engine.mr.HadoopUtil;
@@ -78,15 +79,18 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
+import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
-
+import org.reflections.Reflections;
import scala.Tuple2;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.Executors;
@@ -143,12 +147,12 @@ public class SparkCubing extends AbstractApplication {
final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
final String[] columns = intermediateTable.columns();
- long start = System.currentTimeMillis();
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
final CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
- RowKeyDesc rowKey = cubeDesc.getRowkey();
+ final long start = System.currentTimeMillis();
+ final RowKeyDesc rowKey = cubeDesc.getRowkey();
for (int i = 0; i < baseCuboidColumn.size(); i++) {
TblColRef col = baseCuboidColumn.get(i);
if (!rowKey.isUseDictionary(col)) {
@@ -217,12 +221,13 @@ public class SparkCubing extends AbstractApplication {
}
CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-
+
final int[] rowKeyColumnIndexes = flatTableDesc.getRowKeyColumnIndexes();
final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-
final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+ final ByteArray[] row_hashcodes = new ByteArray[nRowKey];
+
for (Long cuboidId : allCuboidIds) {
BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
@@ -238,16 +243,15 @@ public class SparkCubing extends AbstractApplication {
}
allCuboidsBitSet.put(cuboidId, cuboidBitSet);
}
- final ByteArray[] row_hashcodes = new ByteArray[nRowKey];
for (int i = 0; i < nRowKey; ++i) {
row_hashcodes[i] = new ByteArray();
}
-
+
final HashMap<Long, HyperLogLogPlusCounter> samplingResult = rowJavaRDD.aggregate(zeroValue,
new Function2<HashMap<Long, HyperLogLogPlusCounter>,
List<String>,
HashMap<Long, HyperLogLogPlusCounter>>() {
-
+
final HashFunction hashFunction = Hashing.murmur3_32();
@Override
@@ -319,7 +323,7 @@ public class SparkCubing extends AbstractApplication {
}
}
}
-
+
for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
if (measureDesc.getFunction().isTopN()) {
List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
@@ -328,7 +332,7 @@ public class SparkCubing extends AbstractApplication {
}
}
-
+
final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
@Override
@@ -375,7 +379,7 @@ public class SparkCubing extends AbstractApplication {
writeToHFile2(javaPairRDD, dataTypes, measureSize, aggs, splitKeys, conf, url);
return url;
}
-
+
private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, final String[] dataTypes, final int measureSize, final MeasureAggregators aggs, final byte[][] splitKeys, final Configuration conf, final String hFileLocation) {
javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
@Override
@@ -505,13 +509,67 @@ public class SparkCubing extends AbstractApplication {
}
}
+ private Collection<String> getKyroClasses() {
+ Set<Class> kyroClasses = Sets.newHashSet();
+ kyroClasses.addAll(new Reflections("org.apache.kylin").getSubTypesOf(Serializable.class));
+ kyroClasses.addAll(new Reflections("org.apache.kylin.cube.model").getSubTypesOf(Object.class));
+ kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.model").getSubTypesOf(Object.class));
+ kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.measure").getSubTypesOf(Object.class));
+ kyroClasses.add(HashMap.class);
+ kyroClasses.add(org.apache.spark.sql.Row[].class);
+ kyroClasses.add(org.apache.spark.sql.Row.class);
+ kyroClasses.add(org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema.class);
+ kyroClasses.add(org.apache.spark.sql.types.StructType.class);
+ kyroClasses.add(org.apache.spark.sql.types.StructField[].class);
+ kyroClasses.add(org.apache.spark.sql.types.StructField.class);
+ kyroClasses.add(org.apache.spark.sql.types.DateType$.class);
+ kyroClasses.add(org.apache.spark.sql.types.Metadata.class);
+ kyroClasses.add(Object[].class);
+ kyroClasses.add(org.apache.spark.sql.types.StringType$.class);
+ kyroClasses.add(Hashing.murmur3_128().getClass());
+ kyroClasses.add(org.apache.spark.sql.columnar.CachedBatch.class);
+ kyroClasses.add(byte[][].class);
+ kyroClasses.add(org.apache.spark.sql.types.Decimal.class);
+ kyroClasses.add(scala.math.BigDecimal.class);
+ kyroClasses.add(java.math.BigDecimal.class);
+ kyroClasses.add(java.math.MathContext.class);
+ kyroClasses.add(java.math.RoundingMode.class);
+ kyroClasses.add(java.util.ArrayList.class);
+ kyroClasses.add(java.util.LinkedList.class);
+
+
+ ArrayList<String> result = Lists.newArrayList();
+ for (Class kyroClass : kyroClasses) {
+ result.add(kyroClass.getName());
+ }
+ result.add("scala.collection.immutable.Map$EmptyMap$");
+ result.add("org.apache.spark.sql.catalyst.expressions.GenericInternalRow");
+ result.add("org.apache.spark.unsafe.types.UTF8String");
+ return result;
+ }
+
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
SparkConf conf = new SparkConf().setAppName("Simple Application");
+ //memory conf
conf.set("spark.executor.memory", "6g");
conf.set("spark.storage.memoryFraction", "0.3");
+ //serialization conf
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.kryo.registrationRequired", "true");
+ final Iterable<String> allClasses = Iterables.filter(
+ Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()),
+ new Predicate<String>() {
+ @Override
+ public boolean apply(@Nullable String input) {
+ return input != null && input.trim().length() > 0;
+ }
+ });
+ System.out.println("kyro classes:" + allClasses.toString());
+ conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, ","));
+
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext sqlContext = new HiveContext(sc.sc());
final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable);