You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/05 09:16:36 UTC

incubator-kylin git commit: minor, set InMemCubeBuilder don't enforce output order by default

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 4f7b1bffa -> d1a2682a5


minor, set InMemCubeBuilder don't enforce output order by default


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d1a2682a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d1a2682a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d1a2682a

Branch: refs/heads/0.8.0
Commit: d1a2682a520207ac4a95ccafe8fafdb73827f8c8
Parents: 4f7b1bf
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Jun 5 15:15:49 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Jun 5 15:16:21 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/inmemcubing/InMemCubeBuilder.java | 286 +++++++++++++------
 1 file changed, 194 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1a2682a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 650fdb3..5833b0f 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -16,12 +16,32 @@
  */
 package org.apache.kylin.job.inmemcubing;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.*;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -34,16 +54,19 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.*;
+import org.apache.kylin.storage.gridtable.GTAggregateScanner;
+import org.apache.kylin.storage.gridtable.GTBuilder;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.IGTStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  */
@@ -73,13 +96,11 @@ public class InMemCubeBuilder implements Runnable {
     private Throwable[] taskThreadExceptions;
     private TreeSet<CuboidTask> taskPending;
     private AtomicInteger taskCuboidCompleted;
-    private CuboidResult baseResult;
 
-    private SortedMap<Long, CuboidResult> outputPending;
-    private Thread outputThread;
-    private Throwable outputThreadException;
+    private OutputThread outputThread;
     private int outputCuboidExpected;
-
+    private boolean outputOrderRequired;
+    private CuboidResult baseResult;
     private Object[] totalSumForSanityCheck;
 
     public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap, ICuboidWriter gtRecordWriter) {
@@ -140,6 +161,10 @@ public class InMemCubeBuilder implements Runnable {
         this.taskThreadCount = n;
     }
 
+    public void setOutputOrder(boolean required) {
+        this.outputOrderRequired = required;
+    }
+
     private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
         GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
 
@@ -183,10 +208,8 @@ public class InMemCubeBuilder implements Runnable {
         taskThreadExceptions = new Throwable[taskThreadCount];
 
         // output goes in a separate thread to leverage any async-ness
-        outputPending = prepareOutputPending();
-        outputCuboidExpected = outputPending.size();
-        outputThread = prepareOutputThread();
-        outputThreadException = null;
+        outputThread = new OutputThread();
+        outputCuboidExpected = outputThread.getOutputCuboidExpected();
 
         // build base cuboid
         baseResult = createBaseCuboid();
@@ -233,8 +256,8 @@ public class InMemCubeBuilder implements Runnable {
             if (t != null)
                 errors.add(t);
         }
-        if (outputThreadException != null) {
-            errors.add(outputThreadException);
+        if (outputThread.getException() != null) {
+            errors.add(outputThread.getException());
         }
         if (errors.isEmpty()) {
             return;
@@ -286,6 +309,7 @@ public class InMemCubeBuilder implements Runnable {
 
                     CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
                     addChildTasks(newCuboid);
+                    task.parent.markOneSpanningDone();
                     taskCuboidCompleted.incrementAndGet();
 
                     if (taskCuboidCompleted.get() == outputCuboidExpected) {
@@ -323,50 +347,6 @@ public class InMemCubeBuilder implements Runnable {
         }
     }
 
-    private SortedMap<Long, CuboidResult> prepareOutputPending() {
-        TreeMap<Long, CuboidResult> result = new TreeMap<Long, CuboidResult>();
-        prepareOutputPendingRecursive(Cuboid.getBaseCuboidId(cubeDesc), result);
-        return Collections.synchronizedSortedMap(result);
-    }
-
-    private void prepareOutputPendingRecursive(Long cuboidId, TreeMap<Long, CuboidResult> result) {
-        result.put(cuboidId, new CuboidResult(cuboidId, null, 0, 0, 0));
-        for (Long child : cuboidScheduler.getSpanningCuboid(cuboidId)) {
-            prepareOutputPendingRecursive(child, result);
-        }
-    }
-
-    private Thread prepareOutputThread() {
-        return new Thread("CuboidOutput") {
-            public void run() {
-                try {
-                    while (!outputPending.isEmpty()) {
-                        CuboidResult result = outputPending.get(outputPending.firstKey());
-                        synchronized (result) {
-                            while (result.table == null && taskHasNoException()) {
-                                try {
-                                    result.wait(60000);
-                                } catch (InterruptedException e) {
-                                    logger.error("interrupted", e);
-                                }
-                            }
-                        }
-
-                        // if task error occurs
-                        if (result.table == null)
-                            break;
-
-                        outputCuboid(result.cuboidId, result.table);
-                        outputPending.remove(result.cuboidId);
-                    }
-                } catch (Throwable ex) {
-                    logger.error("output thread exception", ex);
-                    outputThreadException = ex;
-                }
-            }
-        };
-    }
-
     private int getSystemAvailMB() {
         Runtime.getRuntime().gc();
         try {
@@ -434,18 +414,12 @@ public class InMemCubeBuilder implements Runnable {
         return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, mbBaseAggrCache);
     }
 
-    private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int mbBaseAggrCache) {
-        CuboidResult result = outputPending.get(cuboidId);
-        result.table = table;
-        result.nRows = nRows;
-        result.timeSpent = timeSpent;
-        result.aggrCacheMB = mbBaseAggrCache;
-        if (result.aggrCacheMB <= 0) {
-            result.aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
-        }
-        synchronized (result) {
-            result.notify();
+    private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+        if (aggrCacheMB <= 0) {
+            aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
         }
+        CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
+        outputThread.addOutput(result);
         return result;
     }
 
@@ -579,23 +553,14 @@ public class InMemCubeBuilder implements Runnable {
         }
     }
 
-    private void outputCuboid(long cuboidId, GridTable gridTable) throws IOException {
-        long startTime = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
-        IGTScanner scanner = gridTable.scan(req);
-        for (GTRecord record : scanner) {
-            this.outputWriter.write(cuboidId, record);
-        }
-        scanner.close();
-        logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
-
-        closeStore(gridTable);
-    }
-
-    private void closeStore(GridTable gt) throws IOException {
+    private void closeStore(GridTable gt) {
         IGTStore store = gt.getStore();
         if (store instanceof Closeable) {
-            ((Closeable) store).close();
+            try {
+                ((Closeable) store).close();
+            } catch (IOException e) {
+                logger.warn("Close " + store + " exception", e);
+            }
         }
     }
 
@@ -617,13 +582,15 @@ public class InMemCubeBuilder implements Runnable {
         }
     }
 
-    private static class CuboidResult {
+    private class CuboidResult {
         long cuboidId;
         GridTable table;
         int nRows;
         @SuppressWarnings("unused")
         long timeSpent;
         int aggrCacheMB;
+        boolean outputDone;
+        int spanningDone;
 
         public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
             this.cuboidId = cuboidId;
@@ -632,6 +599,141 @@ public class InMemCubeBuilder implements Runnable {
             this.timeSpent = timeSpent;
             this.aggrCacheMB = aggrCacheMB;
         }
+
+        synchronized void markOutputDone() {
+            outputDone = true;
+            closeIfAllDone();
+        }
+
+        synchronized void markOneSpanningDone() {
+            spanningDone++;
+            closeIfAllDone();
+        }
+
+        private void closeIfAllDone() {
+            if (outputDone && spanningDone == cuboidScheduler.getSpanningCuboid(cuboidId).size()) {
+                closeStore(table);
+            }
+        }
+    }
+
+    // ============================================================================
+
+    private class OutputThread extends Thread {
+        private SortedMap<Long, Long> outputSequence; // synchronized sorted map
+        private LinkedBlockingDeque<CuboidResult> outputPending;
+        private int outputCount;
+        private int outputCuboidExpected;
+        private Throwable outputThreadException;
+
+        OutputThread() {
+            super("CuboidOutput");
+            this.outputSequence = prepareOutputSequence();
+            this.outputPending = new LinkedBlockingDeque<CuboidResult>();
+            this.outputCount = 0;
+            this.outputCuboidExpected = outputSequence.size();
+
+            if (outputOrderRequired == false)
+                outputSequence = null;
+        }
+
+        public int getOutputCuboidExpected() {
+            return outputCuboidExpected;
+        }
+
+        private SortedMap<Long, Long> prepareOutputSequence() {
+            TreeMap<Long, Long> result = new TreeMap<Long, Long>();
+            prepareOutputPendingRecursive(Cuboid.getBaseCuboidId(cubeDesc), result);
+            return Collections.synchronizedSortedMap(result);
+        }
+
+        private void prepareOutputPendingRecursive(Long cuboidId, TreeMap<Long, Long> result) {
+            result.put(cuboidId, cuboidId);
+            for (Long child : cuboidScheduler.getSpanningCuboid(cuboidId)) {
+                prepareOutputPendingRecursive(child, result);
+            }
+        }
+
+        public void addOutput(CuboidResult result) {
+            // if output is NOT ordered
+            if (outputSequence == null) {
+                outputPending.addLast(result);
+            }
+            // if output is ordered
+            else {
+                Long cuboidId = outputSequence.get(result.cuboidId);
+                synchronized (cuboidId) {
+                    outputPending.addFirst(result);
+                    cuboidId.notify();
+                }
+            }
+        }
+
+        private CuboidResult nextOutput() throws InterruptedException {
+            CuboidResult result = null;
+
+            // if output is NOT ordered
+            if (outputSequence == null) {
+                while (result == null && taskHasNoException()) {
+                    result = outputPending.pollFirst(60, TimeUnit.SECONDS);
+                }
+            }
+            // if output is ordered
+            else {
+                Long nextCuboidId = outputSequence.get(outputSequence.firstKey());
+                synchronized (nextCuboidId) {
+                    while ((result = findPendingOutput(nextCuboidId)) == null && taskHasNoException()) {
+                        nextCuboidId.wait(60000);
+                    }
+                }
+                outputSequence.remove(result.cuboidId);
+            }
+
+            return result;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (outputCount < outputCuboidExpected) {
+                    CuboidResult result = nextOutput();
+
+                    // if task error occurs
+                    if (result == null || result.table == null)
+                        break;
+
+                    outputCuboid(result.cuboidId, result.table);
+                    outputCount++;
+                    result.markOutputDone();
+                }
+            } catch (Throwable ex) {
+                logger.error("output thread exception", ex);
+                outputThreadException = ex;
+            }
+        }
+
+        private void outputCuboid(long cuboidId, GridTable gridTable) throws IOException {
+            long startTime = System.currentTimeMillis();
+            GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+            IGTScanner scanner = gridTable.scan(req);
+            for (GTRecord record : scanner) {
+                outputWriter.write(cuboidId, record);
+            }
+            scanner.close();
+            logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+        }
+
+        private CuboidResult findPendingOutput(Long cuboidId) {
+            for (CuboidResult r : outputPending) {
+                if (r.cuboidId == cuboidId)
+                    return r;
+            }
+            return null;
+        }
+
+        public Throwable getException() {
+            return outputThreadException;
+        }
     }
 
     // ============================================================================