You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/29 11:16:35 UTC

incubator-kylin git commit: KYLIN-803 InMemCubeBuilder tolerate null measure value

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 5a4f5b979 -> 035642be6


KYLIN-803 InMemCubeBuilder tolerate null measure value


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/035642be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/035642be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/035642be

Branch: refs/heads/0.8
Commit: 035642be68cd174afba0ed89e0ba2f182853f206
Parents: 5a4f5b9
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Jun 29 17:15:59 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Jun 29 17:15:59 2015 +0800

----------------------------------------------------------------------
 .../job/hadoop/cubev2/InMemCuboidMapper.java    | 23 +++++++++++------
 .../job/inmemcubing/DoggedCubeBuilder.java      | 27 +++++++++++++++++---
 .../kylin/job/inmemcubing/InMemCubeBuilder.java |  8 ++++--
 3 files changed, 45 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/035642be/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index e600b1d..497ee19 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -9,6 +9,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -85,10 +86,14 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
     public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
         // put each row to the queue
         List<String> row = HiveTableReader.getRowAsList(record);
-        queue.put(row);
-        counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
-            logger.info("Handled " + counter + " records!");
+        while (!future.isDone()) {
+            if (queue.offer(row, 1, TimeUnit.SECONDS)) {
+                counter++;
+                if (counter % BatchConstants.COUNTER_MAX == 0) {
+                    logger.info("Handled " + counter + " records!");
+                }
+                break;
+            }
         }
     }
 
@@ -96,16 +101,18 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
     protected void cleanup(Context context) throws IOException, InterruptedException {
         logger.info("Totally handled " + counter + " records!");
 
-        queue.put(new ArrayList<String>(0));
+        while (!future.isDone()) {
+            if (queue.offer(new ArrayList<String>(0), 1, TimeUnit.SECONDS)) {
+                break;
+            }
+        }
+        
         try {
             future.get();
         } catch (Exception e) {
-            logger.error("stream build failed", e);
             throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
         }
         queue.clear();
-
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/035642be/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
index 4fc3be2..f635af2 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
@@ -89,6 +89,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                         last = new SplitThread();
                         splits.add(last);
                         last.start();
+                        logger.info("Split #" + splits.size() + " kickoff");
                     }
 
                     eof = feedSomeInput(input, last, unitRows);
@@ -102,11 +103,19 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
 
                 merger.mergeAndOutput(splits, output);
 
-            } catch (InterruptedException e) {
-                throw new IOException(e);
+            } catch (Throwable e) {
+                logger.error("Dogged Cube Build error", e);
+                if (e instanceof Error)
+                    throw (Error) e;
+                else if (e instanceof RuntimeException)
+                    throw (RuntimeException) e;
+                else
+                    throw new IOException(e);
             } finally {
                 closeGirdTables(splits);
                 logger.info("Dogged Cube Build end, totally took " + (System.currentTimeMillis() - start) + " ms");
+                ensureExit(splits);
+                logger.info("Dogged Cube Build return");
             }
         }
 
@@ -124,6 +133,19 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
             }
         }
 
+        private void ensureExit(List<SplitThread> splits) throws IOException {
+            try {
+                for (int i = 0; i < splits.size(); i++) {
+                    SplitThread split = splits.get(i);
+                    if (split.isAlive()) {
+                        abort(splits);
+                    }
+                }
+            } catch (Throwable e) {
+                logger.error("Dogged Cube Build error", e);
+            }
+        }
+
         private void checkException(List<SplitThread> splits) throws IOException {
             for (int i = 0; i < splits.size(); i++) {
                 SplitThread split = splits.get(i);
@@ -161,7 +183,6 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                     logger.error("Exception during in-mem cube build", t);
                 throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
             }
-
         }
 
         private boolean feedSomeInput(BlockingQueue<List<String>> input, SplitThread split, int n) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/035642be/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 7576b04..f2ecad1 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -626,12 +626,12 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
                 } else if (flatTableIdx == null) {
                     value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
                 } else if (flatTableIdx.length == 1) {
-                    value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0])));
+                    value = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0])));
                 } else {
 
                     byte[] result = null;
                     for (int x = 0; x < flatTableIdx.length; x++) {
-                        byte[] split = Bytes.toBytes(row.get(flatTableIdx[x]));
+                        byte[] split = toBytes(row.get(flatTableIdx[x]));
                         if (result == null) {
                             result = Arrays.copyOf(split, split.length);
                         } else {
@@ -648,5 +648,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             return values;
         }
 
+        private byte[] toBytes(String v) {
+            return v == null ? null : Bytes.toBytes(v);
+        }
+        
     }
 }