You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/11/23 05:53:10 UTC
[08/18] kylin git commit: APACHE-KYLIN-2732: Introduce base cuboid as
a new input for cubing job
APACHE-KYLIN-2732: Introduce base cuboid as a new input for cubing job
Signed-off-by: Zhong <nj...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e83a2e5d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e83a2e5d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e83a2e5d
Branch: refs/heads/ci-dong
Commit: e83a2e5dc4a33beba7d0d9108057df070df97596
Parents: 07b7f82
Author: Wang Ken <mi...@ebay.com>
Authored: Mon Aug 28 11:34:17 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Nov 23 13:31:34 2017 +0800
----------------------------------------------------------------------
.../org/apache/kylin/cube/CubeInstance.java | 33 +++
.../cube/cuboid/TreeCuboidSchedulerManager.java | 102 +++++++++
.../inmemcubing/AbstractInMemCubeBuilder.java | 13 +-
.../ConsumeBlockingQueueController.java | 84 ++++++++
.../cube/inmemcubing/DoggedCubeBuilder.java | 117 ++--------
.../cube/inmemcubing/InMemCubeBuilder.java | 87 ++------
.../InMemCubeBuilderInputConverter.java | 149 -------------
.../kylin/cube/inmemcubing/InputConverter.java | 69 ++++++
.../cube/inmemcubing/InputConverterUnit.java | 33 +++
.../InputConverterUnitForBaseCuboid.java | 49 +++++
.../InputConverterUnitForRawData.java | 159 ++++++++++++++
.../RecordConsumeBlockingQueueController.java | 91 ++++++++
.../org/apache/kylin/gridtable/GTRecord.java | 5 +
.../engine/mr/common/AbstractHadoopJob.java | 12 +-
.../kylin/engine/mr/common/BatchConstants.java | 1 +
.../engine/mr/steps/InMemCuboidMapper.java | 129 +++--------
.../engine/mr/steps/InMemCuboidMapperBase.java | 216 +++++++++++++++++++
.../ITDoggedCubeBuilderStressTest.java | 3 +-
.../inmemcubing/ITDoggedCubeBuilderTest.java | 3 +-
.../inmemcubing/ITInMemCubeBuilderTest.java | 14 +-
20 files changed, 944 insertions(+), 425 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index cc56727..f6eceb6 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -18,6 +18,9 @@
package org.apache.kylin.cube;
+import static org.apache.kylin.cube.cuboid.CuboidModeEnum.CURRENT;
+import static org.apache.kylin.cube.cuboid.CuboidModeEnum.RECOMMEND;
+
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -29,6 +32,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.cuboid.TreeCuboidScheduler;
import org.apache.kylin.cube.model.CubeDesc;
@@ -327,6 +331,35 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
this.createTimeUTC = createTimeUTC;
}
+ public Set<Long> getCuboidsByMode(String cuboidModeName) {
+ return getCuboidsByMode(cuboidModeName == null ? null : CuboidModeEnum.getByModeName(cuboidModeName));
+ }
+
+ public Set<Long> getCuboidsByMode(CuboidModeEnum cuboidMode) {
+ if (cuboidMode == null || cuboidMode == CURRENT) {
+ return getCuboidScheduler().getAllCuboidIds();
+ }
+ Set<Long> cuboidsRecommend = getCuboidsRecommend();
+ if (cuboidsRecommend == null || cuboidMode == RECOMMEND) {
+ return cuboidsRecommend;
+ }
+ Set<Long> currentCuboids = getCuboidScheduler().getAllCuboidIds();
+ switch (cuboidMode) {
+ case RECOMMEND_EXISTING:
+ cuboidsRecommend.retainAll(currentCuboids);
+ return cuboidsRecommend;
+ case RECOMMEND_MISSING:
+ cuboidsRecommend.removeAll(currentCuboids);
+ return cuboidsRecommend;
+ case RECOMMEND_MISSING_WITH_BASE:
+ cuboidsRecommend.removeAll(currentCuboids);
+ currentCuboids.add(getCuboidScheduler().getBaseCuboidId());
+ return cuboidsRecommend;
+ default:
+ return null;
+ }
+ }
+
public Map<Long, Long> getCuboids() {
if (cuboidBytes == null)
return null;
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
new file mode 100644
index 0000000..5e8d965
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cuboid;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TreeCuboidSchedulerManager {
+ private static ConcurrentMap<String, TreeCuboidScheduler> cache = Maps.newConcurrentMap();
+
+ private class TreeCuboidSchedulerSyncListener extends Broadcaster.Listener {
+ @Override
+ public void onClearAll(Broadcaster broadcaster) throws IOException {
+ cache.clear();
+ }
+
+ @Override
+ public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey)
+ throws IOException {
+ cache.remove(cacheKey);
+ }
+ }
+
+ public TreeCuboidSchedulerManager() {
+ Broadcaster.getInstance(KylinConfig.getInstanceFromEnv())
+ .registerListener(new TreeCuboidSchedulerSyncListener(), "cube");
+ }
+
+ private static TreeCuboidSchedulerManager instance = new TreeCuboidSchedulerManager();
+
+ public static TreeCuboidSchedulerManager getInstance() {
+ return instance;
+ }
+
+ /**
+ *
+ * @param cubeName
+ * @return null if the cube has no pre-built cuboids
+ */
+ public static TreeCuboidScheduler getTreeCuboidScheduler(String cubeName) {
+ TreeCuboidScheduler result = cache.get(cubeName);
+ if (result == null) {
+ CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+ if (cubeInstance == null) {
+ return null;
+ }
+ TreeCuboidScheduler treeCuboidScheduler = getTreeCuboidScheduler(cubeInstance.getDescriptor(),
+ cubeManager.getCube(cubeName).getCuboids());
+ if (treeCuboidScheduler == null) {
+ return null;
+ }
+ cache.put(cubeName, treeCuboidScheduler);
+ result = treeCuboidScheduler;
+ }
+ return result;
+ }
+
+ public static TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, Map<Long, Long> cuboidsWithRowCnt) {
+ if (cuboidsWithRowCnt == null || cuboidsWithRowCnt.isEmpty()) {
+ return null;
+ }
+ return getTreeCuboidScheduler(cubeDesc, Lists.newArrayList(cuboidsWithRowCnt.keySet()), cuboidsWithRowCnt);
+ }
+
+ public static TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, List<Long> cuboidIds,
+ Map<Long, Long> cuboidsWithRowCnt) {
+ if (cuboidIds == null || cuboidsWithRowCnt == null) {
+ return null;
+ }
+ TreeCuboidScheduler treeCuboidScheduler = new TreeCuboidScheduler(cubeDesc, cuboidIds,
+ new TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt));
+ return treeCuboidScheduler;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index 952926c..df1fa7a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -19,7 +19,6 @@
package org.apache.kylin.cube.inmemcubing;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -78,12 +77,17 @@ abstract public class AbstractInMemCubeBuilder {
return this.reserveMemoryMB;
}
- public Runnable buildAsRunnable(final BlockingQueue<List<String>> input, final ICuboidWriter output) {
+ public Runnable buildAsRunnable(final BlockingQueue<String[]> input, final ICuboidWriter output) {
+ return buildAsRunnable(input, new InputConverterUnitForRawData(cubeDesc, flatDesc, dictionaryMap), output);
+ }
+
+ public <T> Runnable buildAsRunnable(final BlockingQueue<T> input, final InputConverterUnit<T> inputConverterUnit,
+ final ICuboidWriter output) {
return new Runnable() {
@Override
public void run() {
try {
- build(input, output);
+ build(input, inputConverterUnit, output);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -91,7 +95,8 @@ abstract public class AbstractInMemCubeBuilder {
};
}
- abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException;
+ abstract public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit,
+ ICuboidWriter output) throws IOException;
protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
long startTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
new file mode 100644
index 0000000..a9e55f7
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Lists;
+
+public class ConsumeBlockingQueueController<T> implements Iterator<T> {
+ public final static int DEFAULT_BATCH_SIZE = 1000;
+
+ private volatile boolean hasException = false;
+ private final BlockingQueue<T> input;
+ private final int batchSize;
+ private final List<T> batchBuffer;
+ private Iterator<T> internalIT;
+
+ private AtomicInteger outputRowCount = new AtomicInteger();
+
+ public ConsumeBlockingQueueController(BlockingQueue<T> input, int batchSize) {
+ this.input = input;
+ this.batchSize = batchSize;
+ this.batchBuffer = Lists.newArrayListWithExpectedSize(batchSize);
+ this.internalIT = batchBuffer.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (hasException) {
+ return false;
+ }
+ if (internalIT.hasNext()) {
+ return true;
+ } else {
+ batchBuffer.clear();
+ try {
+ batchBuffer.add(input.take());
+ outputRowCount.incrementAndGet();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ outputRowCount.addAndGet(input.drainTo(batchBuffer, batchSize - 1));
+ internalIT = batchBuffer.iterator();
+ }
+ return true;
+ }
+
+ @Override
+ public T next() {
+ return internalIT.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void findException() {
+ hasException = true;
+ }
+
+ public long getOutputRowCount() {
+ return outputRowCount.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index dd92a2b..ccd7137 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -20,21 +20,17 @@ package org.apache.kylin.cube.inmemcubing;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.TimeUnit;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequestBuilder;
@@ -55,7 +51,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder.class);
private int splitRowThreshold = Integer.MAX_VALUE;
- private int unitRows = 1000;
+ private int unitRows = ConsumeBlockingQueueController.DEFAULT_BATCH_SIZE;
public DoggedCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc,
Map<TblColRef, Dictionary<String>> dictionaryMap) {
@@ -72,8 +68,9 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
}
@Override
- public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
- new BuildOnce().build(input, output);
+ public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
+ throws IOException {
+ new BuildOnce().build(input, inputConverterUnit, output);
}
private class BuildOnce {
@@ -81,7 +78,8 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
BuildOnce() {
}
- public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+ public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
+ throws IOException {
final List<SplitThread> splits = new ArrayList<SplitThread>();
final Merger merger = new Merger();
@@ -89,32 +87,23 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
logger.info("Dogged Cube Build start");
try {
- SplitThread last = null;
- boolean eof = false;
+ while (true) {
+ SplitThread last = new SplitThread(splits.size() + 1, RecordConsumeBlockingQueueController
+ .getQueueController(inputConverterUnit, input, unitRows));
+ splits.add(last);
- while (!eof) {
+ last.start();
+ logger.info("Split #" + splits.size() + " kickoff");
- if (last != null && shouldCutSplit(splits)) {
- cutSplit(last);
- last = null;
- }
+ // Build splits sequentially
+ last.join();
checkException(splits);
-
- if (last == null) {
- last = new SplitThread();
- splits.add(last);
- last.start();
- logger.info("Split #" + splits.size() + " kickoff");
+ if (last.inputController.ifEnd()) {
+ break;
}
-
- eof = feedSomeInput(input, last, unitRows);
}
- for (SplitThread split : splits) {
- split.join();
- }
- checkException(splits);
logger.info("Dogged Cube Build splits complete, took " + (System.currentTimeMillis() - start) + " ms");
merger.mergeAndOutput(splits, output);
@@ -202,81 +191,18 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
}
}
-
- private boolean feedSomeInput(BlockingQueue<List<String>> input, SplitThread split, int n) {
- try {
- int i = 0;
- while (i < n) {
- List<String> record = input.take();
- i++;
-
- while (split.inputQueue.offer(record, 1, TimeUnit.SECONDS) == false) {
- if (split.exception != null)
- return true; // got some error
- }
- split.inputRowCount++;
-
- if (record == null || record.isEmpty()) {
- return true;
- }
- }
- return false;
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
-
- private void cutSplit(SplitThread last) {
- try {
- // signal the end of input
- while (last.isAlive()) {
- if (last.inputQueue.offer(Collections.<String> emptyList())) {
- break;
- }
- Thread.sleep(1000);
- }
-
- // wait cuboid build done
- last.join();
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
-
- private boolean shouldCutSplit(List<SplitThread> splits) {
- int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
- int nSplit = splits.size();
- long splitRowCount = nSplit == 0 ? 0 : splits.get(nSplit - 1).inputRowCount;
-
- logger.info(splitRowCount + " records went into split #" + nSplit + "; " + systemAvailMB + " MB left, " + reserveMemoryMB + " MB threshold");
-
- if (splitRowCount >= splitRowThreshold) {
- logger.info("Split cut due to hitting splitRowThreshold " + splitRowThreshold);
- return true;
- }
-
- if (systemAvailMB <= reserveMemoryMB * 1.5) {
- logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + reserveMemoryMB + "*1.5 MB");
- return true;
- }
-
- return false;
- }
}
private class SplitThread extends Thread {
- final BlockingQueue<List<String>> inputQueue = new ArrayBlockingQueue<List<String>>(16);
+ final RecordConsumeBlockingQueueController<?> inputController;
final InMemCubeBuilder builder;
ConcurrentNavigableMap<Long, CuboidResult> buildResult;
- long inputRowCount = 0;
RuntimeException exception;
- public SplitThread() {
+ public SplitThread(final int num, final RecordConsumeBlockingQueueController<?> inputController) {
+ super("SplitThread" + num);
+ this.inputController = inputController;
this.builder = new InMemCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
this.builder.setConcurrentThreads(taskThreadCount);
this.builder.setReserveMemoryMB(reserveMemoryMB);
@@ -285,12 +211,13 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
@Override
public void run() {
try {
- buildResult = builder.build(inputQueue);
+ buildResult = builder.build(inputController);
} catch (Exception e) {
if (e instanceof RuntimeException)
this.exception = (RuntimeException) e;
else
this.exception = new RuntimeException(e);
+ inputController.findException();
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 684c26b..f63b53f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -87,6 +87,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private CuboidResult baseResult;
private Object[] totalSumForSanityCheck;
private ICuboidCollector resultCollector;
+ private boolean ifBaseCuboidCollected = true;
public InMemCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc,
Map<TblColRef, Dictionary<String>> dictionaryMap) {
@@ -121,8 +122,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
@Override
- public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
- ConcurrentNavigableMap<Long, CuboidResult> result = build(input);
+ public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
+ throws IOException {
+ ConcurrentNavigableMap<Long, CuboidResult> result = build(
+ RecordConsumeBlockingQueueController.getQueueController(inputConverterUnit, input));
try {
for (CuboidResult cuboidResult : result.values()) {
outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
@@ -133,7 +136,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
}
- public ConcurrentNavigableMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException {
+ public <T> ConcurrentNavigableMap<Long, CuboidResult> build(RecordConsumeBlockingQueueController<T> input)
+ throws IOException {
+ if (input.inputConverterUnit instanceof InputConverterUnitForBaseCuboid) {
+ ifBaseCuboidCollected = false;
+ }
final ConcurrentNavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
build(input, new ICuboidCollector() {
@Override
@@ -150,7 +157,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
void collect(CuboidResult result);
}
- private void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException {
+ private <T> void build(RecordConsumeBlockingQueueController<T> input, ICuboidCollector collector)
+ throws IOException {
long startTime = System.currentTimeMillis();
logger.info("In Mem Cube Build start, " + cubeDesc.getName());
@@ -326,7 +334,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
memBudget = new MemoryBudgetController(budget);
}
- private CuboidResult createBaseCuboid(BlockingQueue<List<String>> input) throws IOException {
+ private <T> CuboidResult createBaseCuboid(RecordConsumeBlockingQueueController<T> input) throws IOException {
long startTime = System.currentTimeMillis();
logger.info("Calculating base cuboid " + baseCuboidId);
@@ -356,10 +364,15 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
logger.info("Wild estimate of base aggr cache is " + mbEstimateBaseAggrCache + " MB");
- return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0);
+ return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0, ifBaseCuboidCollected);
}
private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+ return updateCuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB, true);
+ }
+
+ private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB,
+ boolean ifCollect) {
if (aggrCacheMB <= 0 && baseResult != null) {
aggrCacheMB = (int) Math.round(//
(DERIVE_AGGR_CACHE_CONSTANT_FACTOR + DERIVE_AGGR_CACHE_VARIABLE_FACTOR * nRows / baseResult.nRows) //
@@ -369,7 +382,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
taskCuboidCompleted.incrementAndGet();
- resultCollector.collect(result);
+ if (ifCollect) {
+ resultCollector.collect(result);
+ }
return result;
}
@@ -508,62 +523,4 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
return comp < 0 ? -1 : (comp > 0 ? 1 : 0);
}
}
-
- // ============================================================================
-
- private class InputConverter implements IGTScanner {
- GTInfo info;
- GTRecord record;
- BlockingQueue<List<String>> input;
- final InMemCubeBuilderInputConverter inMemCubeBuilderInputConverter;
-
- public InputConverter(GTInfo info, BlockingQueue<List<String>> input) {
- this.info = info;
- this.input = input;
- this.record = new GTRecord(info);
- this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, flatDesc, dictionaryMap, info);
- }
-
- @Override
- public Iterator<GTRecord> iterator() {
- return new Iterator<GTRecord>() {
-
- List<String> currentObject = null;
-
- @Override
- public boolean hasNext() {
- try {
- currentObject = input.take();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- return currentObject != null && currentObject.size() > 0;
- }
-
- @Override
- public GTRecord next() {
- if (currentObject.size() == 0)
- throw new IllegalStateException();
-
- inMemCubeBuilderInputConverter.convert(currentObject, record);
- return record;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public GTInfo getInfo() {
- return info;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
deleted file mode 100644
index 6dd20d8..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.cube.inmemcubing;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class InMemCubeBuilderInputConverter {
-
- @SuppressWarnings("unused")
- private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderInputConverter.class);
-
- public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
-
- private final CubeJoinedFlatTableEnrich flatDesc;
- private final MeasureDesc[] measureDescs;
- private final MeasureIngester<?>[] measureIngesters;
- private final int measureCount;
- private final Map<TblColRef, Dictionary<String>> dictionaryMap;
- private final GTInfo gtInfo;
- protected List<byte[]> nullBytes;
-
- public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) {
- this.gtInfo = gtInfo;
- this.flatDesc = new CubeJoinedFlatTableEnrich(flatDesc, cubeDesc);
- this.measureCount = cubeDesc.getMeasures().size();
- this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
- this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures());
- this.dictionaryMap = dictionaryMap;
- initNullBytes(cubeDesc);
- }
-
- public final GTRecord convert(List<String> row) {
- final GTRecord record = new GTRecord(gtInfo);
- convert(row, record);
- return record;
- }
-
- public final void convert(List<String> row, GTRecord record) {
- Object[] dimensions = buildKey(row);
- Object[] metricsValues = buildValue(row);
- Object[] recordValues = new Object[dimensions.length + metricsValues.length];
- System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
- System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
- record.setValues(recordValues);
- }
-
- private Object[] buildKey(List<String> row) {
- int keySize = flatDesc.getRowKeyColumnIndexes().length;
- Object[] key = new Object[keySize];
-
- for (int i = 0; i < keySize; i++) {
- key[i] = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
- if (key[i] != null && isNull(Bytes.toBytes((String) key[i]))) {
- key[i] = null;
- }
- }
-
- return key;
- }
-
- private Object[] buildValue(List<String> row) {
- Object[] values = new Object[measureCount];
- for (int i = 0; i < measureCount; i++) {
- values[i] = buildValueOf(i, row);
- }
- return values;
- }
-
- private Object buildValueOf(int idxOfMeasure, List<String> row) {
- MeasureDesc measure = measureDescs[idxOfMeasure];
- FunctionDesc function = measure.getFunction();
- int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure];
-
- int paramCount = function.getParameterCount();
- String[] inputToMeasure = new String[paramCount];
-
- // pick up parameter values
- ParameterDesc param = function.getParameter();
- int paramColIdx = 0; // index among parameters of column type
- for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
- String value;
- if (function.isCount()) {
- value = "1";
- } else if (param.isColumnType()) {
- value = row.get(colIdxOnFlatTable[paramColIdx++]);
- } else {
- value = param.getValue();
- }
- inputToMeasure[i] = value;
- }
-
- return measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
- }
-
- private void initNullBytes(CubeDesc cubeDesc) {
- nullBytes = Lists.newArrayList();
- nullBytes.add(HIVE_NULL);
- String[] nullStrings = cubeDesc.getNullStrings();
- if (nullStrings != null) {
- for (String s : nullStrings) {
- nullBytes.add(Bytes.toBytes(s));
- }
- }
- }
-
- private boolean isNull(byte[] v) {
- for (byte[] nullByte : nullBytes) {
- if (Bytes.equals(v, nullByte))
- return true;
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverter.java
new file mode 100644
index 0000000..664f784
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.IGTScanner;
+
+public class InputConverter<T> implements IGTScanner {
+ private GTInfo info;
+ private GTRecord record;
+ private RecordConsumeBlockingQueueController<T> inputController;
+
+ public InputConverter(GTInfo info, RecordConsumeBlockingQueueController<T> inputController) {
+ this.info = info;
+ this.inputController = inputController;
+ this.record = new GTRecord(info);
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return new Iterator<GTRecord>() {
+
+ @Override
+ public boolean hasNext() {
+ return inputController.hasNext();
+ }
+
+ @Override
+ public GTRecord next() {
+ inputController.inputConverterUnit.convert(inputController.next(), record);
+ return record;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
new file mode 100644
index 0000000..fe32937
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import org.apache.kylin.gridtable.GTRecord;
+
+public interface InputConverterUnit<T> {
+ public void convert(T currentObject, GTRecord record);
+
+ public boolean ifEnd(T currentObject);
+
+ public boolean ifCut(T currentObject);
+
+ public T getEmptyUnit();
+
+ public T getCutUnit();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
new file mode 100644
index 0000000..9110a87
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.gridtable.GTRecord;
+
+public class InputConverterUnitForBaseCuboid implements InputConverterUnit<ByteArray> {
+
+ public static final ByteArray EMPTY_ROW = new ByteArray();
+ public static final ByteArray CUT_ROW = new ByteArray(0);
+
+ public void convert(ByteArray currentObject, GTRecord record) {
+ record.loadColumns(currentObject.asBuffer());
+ }
+
+ public boolean ifEnd(ByteArray currentObject) {
+ return currentObject == EMPTY_ROW;
+ }
+
+ public ByteArray getEmptyUnit() {
+ return EMPTY_ROW;
+ }
+
+ public ByteArray getCutUnit() {
+ return CUT_ROW;
+ }
+
+ @Override
+ public boolean ifCut(ByteArray currentObject) {
+ return currentObject == CUT_ROW;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
new file mode 100644
index 0000000..f6548b2
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.cube.inmemcubing;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class InputConverterUnitForRawData implements InputConverterUnit<String[]> {
+
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(InputConverterUnitForRawData.class);
+
+ public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
+ public static final String[] EMPTY_ROW = new String[0];
+ public static final String[] CUT_ROW = { "" };
+
+ private final CubeJoinedFlatTableEnrich flatDesc;
+ private final MeasureDesc[] measureDescs;
+ private final MeasureIngester<?>[] measureIngesters;
+ private final int measureCount;
+ private final Map<TblColRef, Dictionary<String>> dictionaryMap;
+ protected List<byte[]> nullBytes;
+
+ public InputConverterUnitForRawData(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc,
+ Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ this.flatDesc = new CubeJoinedFlatTableEnrich(flatDesc, cubeDesc);
+ this.measureCount = cubeDesc.getMeasures().size();
+ this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
+ this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+ this.dictionaryMap = dictionaryMap;
+ initNullBytes(cubeDesc);
+ }
+
+ public final void convert(String[] row, GTRecord record) {
+ Object[] dimensions = buildKey(row);
+ Object[] metricsValues = buildValue(row);
+ Object[] recordValues = new Object[dimensions.length + metricsValues.length];
+ System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
+ System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
+ record.setValues(recordValues);
+ }
+
+ public boolean ifEnd(String[] currentObject) {
+ return currentObject == EMPTY_ROW;
+ }
+
+ public boolean ifCut(String[] currentObject) {
+ return currentObject == CUT_ROW;
+ }
+
+ public String[] getEmptyUnit() {
+ return EMPTY_ROW;
+ }
+
+ public String[] getCutUnit() {
+ return CUT_ROW;
+ }
+
+ private Object[] buildKey(String[] row) {
+ int keySize = flatDesc.getRowKeyColumnIndexes().length;
+ Object[] key = new Object[keySize];
+
+ for (int i = 0; i < keySize; i++) {
+ key[i] = row[flatDesc.getRowKeyColumnIndexes()[i]];
+ if (key[i] != null && isNull(Bytes.toBytes((String) key[i]))) {
+ key[i] = null;
+ }
+ }
+
+ return key;
+ }
+
+ private Object[] buildValue(String[] row) {
+ Object[] values = new Object[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ values[i] = buildValueOf(i, row);
+ }
+ return values;
+ }
+
+ private Object buildValueOf(int idxOfMeasure, String[] row) {
+ MeasureDesc measure = measureDescs[idxOfMeasure];
+ FunctionDesc function = measure.getFunction();
+ int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure];
+
+ int paramCount = function.getParameterCount();
+ String[] inputToMeasure = new String[paramCount];
+
+ // pick up parameter values
+ ParameterDesc param = function.getParameter();
+ int paramColIdx = 0; // index among parameters of column type
+ for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
+ String value;
+ if (function.isCount()) {
+ value = "1";
+ } else if (param.isColumnType()) {
+ value = row[colIdxOnFlatTable[paramColIdx++]];
+ } else {
+ value = param.getValue();
+ }
+ inputToMeasure[i] = value;
+ }
+
+ return measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
+ }
+
+ private void initNullBytes(CubeDesc cubeDesc) {
+ nullBytes = Lists.newArrayList();
+ nullBytes.add(HIVE_NULL);
+ String[] nullStrings = cubeDesc.getNullStrings();
+ if (nullStrings != null) {
+ for (String s : nullStrings) {
+ nullBytes.add(Bytes.toBytes(s));
+ }
+ }
+ }
+
+ private boolean isNull(byte[] v) {
+ for (byte[] nullByte : nullBytes) {
+ if (Bytes.equals(v, nullByte))
+ return true;
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
new file mode 100644
index 0000000..49cbe1f
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.util.concurrent.BlockingQueue;
+
+public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueueController<T> {
+
+ public final InputConverterUnit<T> inputConverterUnit;
+
+ private RecordConsumeBlockingQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input, int batchSize) {
+ super(input, batchSize);
+ this.inputConverterUnit = inputConverterUnit;
+ }
+
+ private T currentObject = null;
+ private volatile boolean ifEnd = false;
+ private volatile boolean cut = false;
+ private long outputRowCountCut = 0L;
+
+ @Override
+ public boolean hasNext() {
+ if (currentObject != null) {
+ return hasNext(currentObject);
+ }
+ if (!super.hasNext()) {
+ return false;
+ }
+ currentObject = super.next();
+ return hasNext(currentObject);
+ }
+
+ @Override
+ public T next() {
+ if (ifEnd())
+ throw new IllegalStateException();
+
+ T result = currentObject;
+ currentObject = null;
+ return result;
+ }
+
+ public boolean ifEnd() {
+ return ifEnd;
+ }
+
+ private boolean hasNext(T object) {
+ if (inputConverterUnit.ifEnd(object)) {
+ ifEnd = true;
+ return false;
+ }else if(cut){
+ return false;
+ }else if(inputConverterUnit.ifCut(object)){
+ return false;
+ }
+ return true;
+ }
+
+ public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input){
+ return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, DEFAULT_BATCH_SIZE);
+ }
+
+ public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input, int batchSize){
+ return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, batchSize);
+ }
+
+ public void forceCutPipe() {
+ cut = true;
+ outputRowCountCut = getOutputRowCount();
+ }
+
+ public long getOutputRowCountAfterCut() {
+ return getOutputRowCount() - outputRowCountCut;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index ba3b1c4..36bd095 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -251,6 +251,11 @@ public class GTRecord implements Comparable<GTRecord> {
loadColumns(info.colBlocks[c], buf);
}
+ /** change pointers to point to data in given buffer, UNLIKE deserialize */
+ public void loadColumns(ByteBuffer buf) {
+ loadColumns(info.colAll, buf);
+ }
+
/**
* Change pointers to point to data in given buffer, UNLIKE deserialize
* @param selectedCols positions of column to load
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 1756251..fd212be 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -506,9 +506,19 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException {
+ attachSegmentMetadata(segment, conf, true, true);
+ }
+
+ protected void attachSegmentMetadata(CubeSegment segment, Configuration conf, boolean ifDictIncluded,
+ boolean ifStatsIncluded) throws IOException {
Set<String> dumpList = new LinkedHashSet<>();
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
- dumpList.addAll(segment.getDictionaryPaths());
+ if (ifDictIncluded) {
+ dumpList.addAll(segment.getDictionaryPaths());
+ }
+ if (ifStatsIncluded) {
+ dumpList.add(segment.getStatisticsResourcePath());
+ }
dumpKylinPropsAndMetadata(segment.getProject(), dumpList, segment.getConfig(), conf);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 25a67f9..aaf2654 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -29,6 +29,7 @@ public interface BatchConstants {
* ConFiGuration entry names for MR jobs
*/
+ String CFG_CUBOID_MODE = "cuboid.mode";
String CFG_CUBE_NAME = "cube.name";
String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
String CFG_CUBE_SEGMENT_ID = "cube.segment.id";
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index a7b3923..859e126 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -19,133 +19,60 @@
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnitForRawData;
import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
-/**
- */
-public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArrayWritable, ByteArrayWritable> {
-
- private static final Logger logger = LoggerFactory.getLogger(InMemCuboidMapper.class);
+public class InMemCuboidMapper<KEYIN>
+ extends InMemCuboidMapperBase<KEYIN, Object, ByteArrayWritable, ByteArrayWritable, String[]> {
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private CubeSegment cubeSegment;
- private IMRTableInputFormat flatTableInputFormat;
- private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(64);
- private Future<?> future;
+ private IMRInput.IMRTableInputFormat flatTableInputFormat;
@Override
protected void doSetup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
+ super.doSetup(context);
- Configuration conf = context.getConfiguration();
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
- String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
- cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
- cubeSegment = cube.getSegmentById(segmentID);
flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
- IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
-
- Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
-
- // dictionary
- for (TblColRef col : cubeDesc.getAllColumnsHaveDictionary()) {
- Dictionary<?> dict = cubeSegment.getDictionary(col);
- if (dict == null) {
- logger.warn("Dictionary for " + col + " was not found.");
- }
-
- dictionaryMap.put(col, cubeSegment.getDictionary(col));
- }
-
- int taskCount = config.getCubeAlgorithmInMemConcurrentThreads();
- DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cubeSegment.getCuboidScheduler(), flatDesc,
- dictionaryMap);
- cubeBuilder.setReserveMemoryMB(calculateReserveMB(context.getConfiguration()));
- cubeBuilder.setConcurrentThreads(taskCount);
-
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
-
}
- private int calculateReserveMB(Configuration configuration) {
- int sysAvailMB = MemoryBudgetController.getSystemAvailMB();
- int mrReserve = configuration.getInt("mapreduce.task.io.sort.mb", 100);
- int sysReserve = Math.max(sysAvailMB / 10, 100);
- int reserveMB = mrReserve + sysReserve;
- logger.info("Reserve " + reserveMB + " MB = " + mrReserve + " (MR reserve) + " + sysReserve + " (SYS reserve)");
- return reserveMB;
+ @Override
+ protected InputConverterUnit<String[]> getInputConverterUnit() {
+ Preconditions.checkNotNull(cubeDesc);
+ Preconditions.checkNotNull(dictionaryMap);
+ return new InputConverterUnitForRawData(cubeDesc, flatDesc, dictionaryMap);
}
@Override
- public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
- // put each row to the queue
- Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);
-
- for(String[] row: rowCollection) {
- List<String> rowAsList = Arrays.asList(row);
- while (!future.isDone()) {
- if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
- break;
- }
- }
- }
+ protected String[] getRecordFromKeyValue(KEYIN key, Object value) {
+ return flatTableInputFormat.parseMapperInput(value).iterator().next();
}
@Override
- protected void doCleanup(Context context) throws IOException, InterruptedException {
- logger.info("Totally handled " + mapCounter + " records!");
-
- while (!future.isDone()) {
- if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) {
- break;
- }
- }
-
- try {
- future.get();
- } catch (Exception e) {
- throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
- }
- queue.clear();
+ protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
+ int reserveMemoryMB, CuboidScheduler cuboidScheduler, InputConverterUnit<String[]> inputConverterUnit) {
+ AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
+ cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
+ cubeBuilder.setConcurrentThreads(taskThreadCount);
+
+ ExecutorService executorService = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("inmemory-cube-building-mapper-%d").build());
+ return executorService.submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit,
+ new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
}
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
new file mode 100644
index 0000000..43f95e5
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.cuboid.DefaultCuboidScheduler;
+import org.apache.kylin.cube.cuboid.TreeCuboidSchedulerManager;
+import org.apache.kylin.cube.inmemcubing.ConsumeBlockingQueueController;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+/**
+ */
+public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T> extends KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+ private static final Logger logger = LoggerFactory.getLogger(InMemCuboidMapperBase.class);
+
+ private int reserveMemoryMB;
+ private int nSplit = 1;
+ private int countOfLastSplit = 0;
+ private int counter = 0;
+ private int splitRowThreshold = Integer.MAX_VALUE;
+ private int unitRows = ConsumeBlockingQueueController.DEFAULT_BATCH_SIZE;
+
+ protected CubeInstance cube;
+ protected CubeDesc cubeDesc;
+ protected CubeSegment cubeSegment;
+ protected Map<TblColRef, Dictionary<String>> dictionaryMap;
+ protected IJoinedFlatTableDesc flatDesc;
+
+ protected int taskThreadCount;
+ protected BlockingQueue<T> queue = new LinkedBlockingQueue<>(2000);
+ protected InputConverterUnit<T> inputConverterUnit;
+ private Future<?> future;
+
+ protected abstract InputConverterUnit<T> getInputConverterUnit();
+
+ protected abstract Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap, int reserveMemoryMB, //
+ CuboidScheduler cuboidScheduler, InputConverterUnit<T> inputConverterUnit);
+
+ protected abstract T getRecordFromKeyValue(KEYIN key, VALUEIN value);
+
+ @Override
+ protected void doSetup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ Configuration conf = context.getConfiguration();
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+ cube = CubeManager.getInstance(config).getCube(cubeName);
+ cubeDesc = cube.getDescriptor();
+ String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+ cubeSegment = cube.getSegmentById(segmentID);
+ flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+
+ dictionaryMap = Maps.newHashMap();
+
+ // dictionary
+ for (TblColRef col : cubeDesc.getAllColumnsHaveDictionary()) {
+ Dictionary<?> dict = cubeSegment.getDictionary(col);
+ if (dict == null) {
+ logger.warn("Dictionary for " + col + " was not found.");
+ }
+
+ dictionaryMap.put(col, cubeSegment.getDictionary(col));
+ }
+
+ // check memory more often if a single row is big
+ if (cubeDesc.hasMemoryHungryMeasures()) {
+ unitRows /= 10;
+ }
+
+ String cuboidModeName = conf.get(BatchConstants.CFG_CUBOID_MODE);
+ CuboidScheduler cuboidScheduler = TreeCuboidSchedulerManager.getTreeCuboidScheduler(cubeDesc, //
+ CuboidStatsReaderUtil.readCuboidStatsFromSegment(cube.getCuboidsByMode(cuboidModeName), cubeSegment));
+ if (cuboidScheduler == null) {
+ cuboidScheduler = new DefaultCuboidScheduler(cubeDesc);
+ }
+
+ taskThreadCount = config.getCubeAlgorithmInMemConcurrentThreads();
+ reserveMemoryMB = calculateReserveMB(conf);
+ inputConverterUnit = getInputConverterUnit();
+ future = getCubingThreadFuture(context, dictionaryMap, reserveMemoryMB, cuboidScheduler, inputConverterUnit);
+ }
+
+ private int calculateReserveMB(Configuration configuration) {
+ int sysAvailMB = MemoryBudgetController.getSystemAvailMB();
+ int mrReserve = configuration.getInt("mapreduce.task.io.sort.mb", 100);
+ int sysReserve = Math.max(sysAvailMB / 10, 100);
+ int reserveMB = mrReserve + sysReserve;
+ logger.info("Reserve " + reserveMB + " MB = " + mrReserve + " (MR reserve) + " + sysReserve + " (SYS reserve)");
+ return reserveMB;
+ }
+
+ @Override
+ public void doMap(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
+ // put each row to the queue
+ T row = getRecordFromKeyValue(key, value);
+
+ if (offer(context, row, 1, TimeUnit.MINUTES, 60)) {
+ counter++;
+ countOfLastSplit++;
+ if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Handled " + counter + " records, internal queue size = " + queue.size());
+ }
+ } else {
+ throw new IOException("Failed to offer row to internal queue due to queue full!");
+ }
+
+ if (counter % unitRows == 0 && shouldCutSplit(nSplit, countOfLastSplit)) {
+ if (offer(context, inputConverterUnit.getCutUnit(), 1, TimeUnit.MINUTES, 60)) {
+ countOfLastSplit = 0;
+ } else {
+ throw new IOException("Failed to offer row to internal queue due to queue full!");
+ }
+ nSplit++;
+ }
+ }
+
+ @Override
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ logger.info("Totally handled " + mapCounter + " records!");
+
+ while (!future.isDone()) {
+ if (queue.offer(inputConverterUnit.getEmptyUnit(), 1, TimeUnit.SECONDS)) {
+ break;
+ }
+ }
+
+ futureGet(context);
+ queue.clear();
+ }
+
+ private boolean shouldCutSplit(int nSplit, long splitRowCount) {
+ int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
+
+ logger.info(splitRowCount + " records went into split #" + nSplit + "; " + systemAvailMB + " MB left, " + reserveMemoryMB + " MB threshold");
+
+ if (splitRowCount >= splitRowThreshold) {
+ logger.info("Split cut due to hitting splitRowThreshold " + splitRowThreshold);
+ return true;
+ }
+
+ if (systemAvailMB <= reserveMemoryMB) {
+ logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + reserveMemoryMB + " MB");
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean offer(Context context, T row, long timeout, TimeUnit unit, int nRound) throws IOException, InterruptedException {
+ while (nRound > 0) {
+ if (queue.offer(row, timeout, unit)) {
+ return true;
+ }
+ if (future.isDone()) {
+ futureGet(context);
+ throw new IOException("Failed to build cube in mapper due to cubing thread exit unexpectedly");
+ }
+ nRound--;
+ }
+ return false;
+ }
+
+ private void futureGet(Context context) throws IOException {
+ try {
+ future.get();
+ } catch (Exception e) {
+ throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
index 5e5b16a..fa2d792 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
@@ -19,7 +19,6 @@
package org.apache.kylin.cube.inmemcubing;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
@@ -79,7 +78,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
@Test
public void test() throws Exception {
- ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+ ArrayBlockingQueue<String[]> queue = new ArrayBlockingQueue<String[]>(1000);
ExecutorService executorService = Executors.newSingleThreadExecutor();
long randSeed = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
index 1e10d79..0338da8 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -26,7 +26,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
@@ -83,7 +82,7 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
@Test
public void test() throws Exception {
- ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+ ArrayBlockingQueue<String[]> queue = new ArrayBlockingQueue<String[]>(1000);
ExecutorService executorService = Executors.newSingleThreadExecutor();
long randSeed = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/kylin/blob/e83a2e5d/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index 49d267a..ad754cd 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -113,7 +113,7 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
//DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
cubeBuilder.setConcurrentThreads(nThreads);
- ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+ ArrayBlockingQueue<String[]> queue = new ArrayBlockingQueue<String[]>(1000);
ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
@@ -144,11 +144,13 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
}
}
- static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
+ static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<String[]> queue, int count)
+ throws IOException, InterruptedException {
feedData(cube, flatTable, queue, count, 0);
}
- static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
+ static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<String[]> queue, int count,
+ long randSeed) throws IOException, InterruptedException {
IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
int nColumns = flatDesc.getAllColumns().size();
@@ -177,14 +179,14 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
// output with random data
for (; count > 0; count--) {
- ArrayList<String> row = new ArrayList<String>(nColumns);
+ String[] row = new String[nColumns];
for (int i = 0; i < nColumns; i++) {
String[] candidates = distincts.get(i);
- row.add(candidates[rand.nextInt(candidates.length)]);
+ row[i] = candidates[rand.nextInt(candidates.length)];
}
queue.put(row);
}
- queue.put(new ArrayList<String>(0));
+ queue.put(InputConverterUnitForRawData.EMPTY_ROW);
}
static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {