You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/29 11:16:35 UTC
incubator-kylin git commit: KYLIN-803 InMemCubeBuilder tolerate null
measure value
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 5a4f5b979 -> 035642be6
KYLIN-803 InMemCubeBuilder tolerate null measure value
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/035642be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/035642be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/035642be
Branch: refs/heads/0.8
Commit: 035642be68cd174afba0ed89e0ba2f182853f206
Parents: 5a4f5b9
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Jun 29 17:15:59 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Jun 29 17:15:59 2015 +0800
----------------------------------------------------------------------
.../job/hadoop/cubev2/InMemCuboidMapper.java | 23 +++++++++++------
.../job/inmemcubing/DoggedCubeBuilder.java | 27 +++++++++++++++++---
.../kylin/job/inmemcubing/InMemCubeBuilder.java | 8 ++++--
3 files changed, 45 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/035642be/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index e600b1d..497ee19 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -9,6 +9,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -85,10 +86,14 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
// put each row to the queue
List<String> row = HiveTableReader.getRowAsList(record);
- queue.put(row);
- counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + counter + " records!");
+ while (!future.isDone()) {
+ if (queue.offer(row, 1, TimeUnit.SECONDS)) {
+ counter++;
+ if (counter % BatchConstants.COUNTER_MAX == 0) {
+ logger.info("Handled " + counter + " records!");
+ }
+ break;
+ }
}
}
@@ -96,16 +101,18 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
protected void cleanup(Context context) throws IOException, InterruptedException {
logger.info("Totally handled " + counter + " records!");
- queue.put(new ArrayList<String>(0));
+ while (!future.isDone()) {
+ if (queue.offer(new ArrayList<String>(0), 1, TimeUnit.SECONDS)) {
+ break;
+ }
+ }
+
try {
future.get();
} catch (Exception e) {
- logger.error("stream build failed", e);
throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
}
queue.clear();
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/035642be/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
index 4fc3be2..f635af2 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
@@ -89,6 +89,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
last = new SplitThread();
splits.add(last);
last.start();
+ logger.info("Split #" + splits.size() + " kickoff");
}
eof = feedSomeInput(input, last, unitRows);
@@ -102,11 +103,19 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
merger.mergeAndOutput(splits, output);
- } catch (InterruptedException e) {
- throw new IOException(e);
+ } catch (Throwable e) {
+ logger.error("Dogged Cube Build error", e);
+ if (e instanceof Error)
+ throw (Error) e;
+ else if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new IOException(e);
} finally {
closeGirdTables(splits);
logger.info("Dogged Cube Build end, totally took " + (System.currentTimeMillis() - start) + " ms");
+ ensureExit(splits);
+ logger.info("Dogged Cube Build return");
}
}
@@ -124,6 +133,19 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
}
}
+ private void ensureExit(List<SplitThread> splits) throws IOException {
+ try {
+ for (int i = 0; i < splits.size(); i++) {
+ SplitThread split = splits.get(i);
+ if (split.isAlive()) {
+ abort(splits);
+ }
+ }
+ } catch (Throwable e) {
+ logger.error("Dogged Cube Build error", e);
+ }
+ }
+
private void checkException(List<SplitThread> splits) throws IOException {
for (int i = 0; i < splits.size(); i++) {
SplitThread split = splits.get(i);
@@ -161,7 +183,6 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
logger.error("Exception during in-mem cube build", t);
throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
}
-
}
private boolean feedSomeInput(BlockingQueue<List<String>> input, SplitThread split, int n) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/035642be/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 7576b04..f2ecad1 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -626,12 +626,12 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
} else if (flatTableIdx == null) {
value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
} else if (flatTableIdx.length == 1) {
- value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0])));
+ value = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0])));
} else {
byte[] result = null;
for (int x = 0; x < flatTableIdx.length; x++) {
- byte[] split = Bytes.toBytes(row.get(flatTableIdx[x]));
+ byte[] split = toBytes(row.get(flatTableIdx[x]));
if (result == null) {
result = Arrays.copyOf(split, split.length);
} else {
@@ -648,5 +648,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
return values;
}
+ private byte[] toBytes(String v) {
+ return v == null ? null : Bytes.toBytes(v);
+ }
+
}
}