You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/24 04:39:53 UTC

incubator-kylin git commit: KYLIN-803 half way

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-803 [created] 0d5398853


KYLIN-803 half way


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0d539885
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0d539885
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0d539885

Branch: refs/heads/KYLIN-803
Commit: 0d539885343b3a2b7f64e2bd9787cb47deb8f134
Parents: d020169
Author: Yang Li <li...@apache.org>
Authored: Wed Jun 24 10:39:23 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Jun 24 10:39:23 2015 +0800

----------------------------------------------------------------------
 .../kylin/cube/cuboid/CuboidScheduler.java      |  14 +-
 .../inmemcubing/AbstractInMemCubeBuilder.java   |  64 +++++-
 .../kylin/job/inmemcubing/InMemCubeBuilder.java | 206 ++-----------------
 .../kylin/storage/gridtable/GridTable.java      |  10 +-
 4 files changed, 96 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d539885/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index 07be092..bebfd08 100644
--- a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -19,8 +19,6 @@
 package org.apache.kylin.cube.cuboid;
 
 /** 
- * @author George Song (ysong1)
- * 
  */
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +40,18 @@ public class CuboidScheduler {
         this.max = (long) Math.pow(2, size) - 1;
         this.cache = new ConcurrentHashMap<Long, List<Long>>();
     }
+    
+    public int getCuboidCount() {
+        return getCuboidCount(Cuboid.getBaseCuboidId(cubeDef));
+    }
+
+    private int getCuboidCount(long cuboidId) {
+        int r = 1;
+        for (Long child : getSpanningCuboid(cuboidId)) {
+            r += getCuboidCount(child);
+        }
+        return r;
+    }
 
     public List<Long> getSpanningCuboid(long cuboid) {
         if (cuboid > max || cuboid < 0) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d539885/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
index 034c4cd..cf7c356 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
@@ -19,17 +19,26 @@ package org.apache.kylin.job.inmemcubing;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An interface alike abstract class. Hold common tunable parameters and nothing more.
  */
 abstract public class AbstractInMemCubeBuilder {
 
+    private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder .class);
+    
     final protected CubeDesc cubeDesc;
     final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
     
@@ -72,5 +81,58 @@ abstract public class AbstractInMemCubeBuilder {
         };
     }
     
-    abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter gtRecordWriter) throws IOException;
+    public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+        TreeMap<Long, CuboidResult> result = build(input);
+        for (CuboidResult cuboidResult : result.values()) {
+            outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+            cuboidResult.table.close();
+        }
+    }
+    
+    private void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
+        long startTime = System.currentTimeMillis();
+        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+        IGTScanner scanner = gridTable.scan(req);
+        for (GTRecord record : scanner) {
+            output.write(cuboidId, record);
+        }
+        scanner.close();
+        logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
+    }
+    
+    public TreeMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException {
+        final TreeMap<Long, CuboidResult> result = new TreeMap<Long, CuboidResult>();
+        ICuboidCollector collector = new ICuboidCollector() {
+            @Override
+            public void collect(CuboidResult cuboidResult) {
+                result.put(cuboidResult.cuboidId, cuboidResult);
+            }
+        };
+        build(input, collector);
+        return result;
+    }
+    
+    abstract public void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException;
+    
+    public static interface ICuboidCollector {
+        public void collect(CuboidResult result);
+    }
+    
+    public static class CuboidResult {
+        public long cuboidId;
+        public GridTable table;
+        public int nRows;
+        public long timeSpent;
+        public int aggrCacheMB;
+
+        public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+            this.cuboidId = cuboidId;
+            this.table = table;
+            this.nRows = nRows;
+            this.timeSpent = timeSpent;
+            this.aggrCacheMB = aggrCacheMB;
+        }
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d539885/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 8d0b0fb..996e747 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -16,21 +16,15 @@
  */
 package org.apache.kylin.job.inmemcubing;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.io.DoubleWritable;
@@ -57,7 +51,6 @@ import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.storage.gridtable.GTScanRequest;
 import org.apache.kylin.storage.gridtable.GridTable;
 import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.apache.kylin.storage.gridtable.IGTStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,8 +66,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
     private static final LongWritable ONE = new LongWritable(1l);
 
-    private final long baseCuboidId;
     private final CuboidScheduler cuboidScheduler;
+    private final long baseCuboidId;
+    private final int totalCuboidCount;
     private final CubeJoinedFlatTableDesc intermediateTableDesc;
     private final MeasureCodec measureCodec;
     private final String[] metricsAggrFuncs;
@@ -88,15 +82,15 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private TreeSet<CuboidTask> taskPending;
     private AtomicInteger taskCuboidCompleted = new AtomicInteger(0);
 
-    private OutputThread outputThread;
-    private int outputCuboidExpected;
     private CuboidResult baseResult;
     private Object[] totalSumForSanityCheck;
+    private ICuboidCollector resultCollector;
 
     public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
         super(cubeDesc, dictionaryMap);
         this.cuboidScheduler = new CuboidScheduler(cubeDesc);
         this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        this.totalCuboidCount = cuboidScheduler.getCuboidCount();
         this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
         this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
 
@@ -153,7 +147,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     @Override
-    public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+    public void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException {
         long startTime = System.currentTimeMillis();
         logger.info("In Mem Cube Build start, " + cubeDesc.getName());
 
@@ -163,14 +157,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         taskThreads = prepareTaskThreads();
         taskThreadExceptions = new Throwable[taskThreadCount];
 
-        // output goes in a separate thread to leverage any async-ness
-        outputThread = new OutputThread(output);
-        outputCuboidExpected = outputThread.getOutputCuboidExpected();
-
         // build base cuboid
+        resultCollector = collector;
         totalSumForSanityCheck = null;
         baseResult = createBaseCuboid(input);
-        taskCuboidCompleted.incrementAndGet();
         if (baseResult.nRows == 0)
             return;
 
@@ -180,11 +170,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         // kick off N-D cuboid tasks and output
         addChildTasks(baseResult);
         start(taskThreads);
-        start(outputThread);
 
         // wait complete
         join(taskThreads);
-        join(outputThread);
 
         long endTime = System.currentTimeMillis();
         logger.info("In Mem Cube Build end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms");
@@ -194,7 +182,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     
     public void abort() {
         interrupt(taskThreads);
-        interrupt(outputThread);
     }
 
     private void start(Thread... threads) {
@@ -223,9 +210,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             if (t != null)
                 errors.add(t);
         }
-        if (outputThread.getException() != null) {
-            errors.add(outputThread.getException());
-        }
         if (errors.isEmpty()) {
             return;
         } else if (errors.size() == 1) {
@@ -250,7 +234,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     public boolean isAllCuboidDone() {
-        return taskCuboidCompleted.get() == outputCuboidExpected;
+        return taskCuboidCompleted.get() == totalCuboidCount;
     }
     
     private class CuboidTaskThread extends Thread {
@@ -280,8 +264,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
                     CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
                     addChildTasks(newCuboid);
-                    task.parent.markOneSpanningDone();
-                    taskCuboidCompleted.incrementAndGet();
 
                     if (isAllCuboidDone()) {
                         for (Thread t : taskThreads) {
@@ -387,8 +369,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         if (aggrCacheMB <= 0) {
             aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
         }
+        
         CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
-        outputThread.addOutput(result);
+        taskCuboidCompleted.incrementAndGet();
+        
+        resultCollector.collect(result);
         return result;
     }
 
@@ -397,7 +382,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() {
             @Override
             public int freeUp(int mb) {
-                return 0; // cannot free up
+                return 0; // cannot free up on demand
             }
 
             @Override
@@ -493,17 +478,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         }
     }
 
-    private void closeStore(GridTable gt) {
-        IGTStore store = gt.getStore();
-        if (store instanceof Closeable) {
-            try {
-                ((Closeable) store).close();
-            } catch (IOException e) {
-                logger.warn("Close " + store + " exception", e);
-            }
-        }
-    }
-
     // ===========================================================================
 
     private static class CuboidTask implements Comparable<CuboidTask> {
@@ -522,162 +496,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         }
     }
 
-    private class CuboidResult {
-        long cuboidId;
-        GridTable table;
-        int nRows;
-        @SuppressWarnings("unused")
-        long timeSpent;
-        int aggrCacheMB;
-        boolean outputDone;
-        int spanningDone;
-
-        public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
-            this.cuboidId = cuboidId;
-            this.table = table;
-            this.nRows = nRows;
-            this.timeSpent = timeSpent;
-            this.aggrCacheMB = aggrCacheMB;
-        }
-
-        synchronized void markOutputDone() {
-            outputDone = true;
-            closeIfAllDone();
-        }
-
-        synchronized void markOneSpanningDone() {
-            spanningDone++;
-            closeIfAllDone();
-        }
-
-        private void closeIfAllDone() {
-            if (outputDone && spanningDone == cuboidScheduler.getSpanningCuboid(cuboidId).size()) {
-                closeStore(table);
-            }
-        }
-    }
-
-    // ============================================================================
-
-    private class OutputThread extends Thread {
-        private ICuboidWriter output;
-        private SortedMap<Long, Long> outputSequence; // synchronized sorted map
-        private LinkedBlockingDeque<CuboidResult> outputPending;
-        private int outputCount;
-        private int outputCuboidExpected;
-        private Throwable outputThreadException;
-
-        OutputThread(ICuboidWriter output) {
-            super("CuboidOutput");
-            this.output = output;
-            this.outputSequence = prepareOutputSequence();
-            this.outputPending = new LinkedBlockingDeque<CuboidResult>();
-            this.outputCount = 0;
-            this.outputCuboidExpected = outputSequence.size();
-
-            if (outputOrderRequired == false)
-                outputSequence = null;
-        }
-
-        public int getOutputCuboidExpected() {
-            return outputCuboidExpected;
-        }
-
-        private SortedMap<Long, Long> prepareOutputSequence() {
-            TreeMap<Long, Long> result = new TreeMap<Long, Long>();
-            prepareOutputPendingRecursive(Cuboid.getBaseCuboidId(cubeDesc), result);
-            return Collections.synchronizedSortedMap(result);
-        }
-
-        private void prepareOutputPendingRecursive(Long cuboidId, TreeMap<Long, Long> result) {
-            result.put(cuboidId, cuboidId);
-            for (Long child : cuboidScheduler.getSpanningCuboid(cuboidId)) {
-                prepareOutputPendingRecursive(child, result);
-            }
-        }
-
-        public void addOutput(CuboidResult result) {
-            // if output is NOT ordered
-            if (outputSequence == null) {
-                outputPending.addLast(result);
-            }
-            // if output is ordered
-            else {
-                Long cuboidId = outputSequence.get(result.cuboidId);
-                synchronized (cuboidId) {
-                    outputPending.addFirst(result);
-                    cuboidId.notify();
-                }
-            }
-        }
-
-        private CuboidResult nextOutput() throws InterruptedException {
-            CuboidResult result = null;
-
-            // if output is NOT ordered
-            if (outputSequence == null) {
-                while (result == null && taskHasNoException()) {
-                    result = outputPending.pollFirst(60, TimeUnit.SECONDS);
-                }
-            }
-            // if output is ordered
-            else {
-                Long nextCuboidId = outputSequence.get(outputSequence.firstKey());
-                synchronized (nextCuboidId) {
-                    while ((result = findPendingOutput(nextCuboidId)) == null && taskHasNoException()) {
-                        nextCuboidId.wait(60000);
-                    }
-                }
-                outputSequence.remove(result.cuboidId);
-            }
-
-            return result;
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (outputCount < outputCuboidExpected) {
-                    CuboidResult result = nextOutput();
-
-                    // if task error occurs
-                    if (result == null || result.table == null)
-                        break;
-
-                    outputCuboid(result.cuboidId, result.table);
-                    outputCount++;
-                    result.markOutputDone();
-                }
-            } catch (Throwable ex) {
-                logger.error("output thread exception", ex);
-                outputThreadException = ex;
-            }
-        }
-
-        private void outputCuboid(long cuboidId, GridTable gridTable) throws IOException {
-            long startTime = System.currentTimeMillis();
-            GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
-            IGTScanner scanner = gridTable.scan(req);
-            for (GTRecord record : scanner) {
-                output.write(cuboidId, record);
-            }
-            scanner.close();
-            logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
-        }
-
-        private CuboidResult findPendingOutput(Long cuboidId) {
-            for (CuboidResult r : outputPending) {
-                if (r.cuboidId == cuboidId)
-                    return r;
-            }
-            return null;
-        }
-
-        public Throwable getException() {
-            return outputThreadException;
-        }
-    }
-
     // ============================================================================
 
     private class InputConverter implements IGTScanner {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d539885/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
index 20b543a..092227b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
@@ -1,8 +1,9 @@
 package org.apache.kylin.storage.gridtable;
 
+import java.io.Closeable;
 import java.io.IOException;
 
-public class GridTable {
+public class GridTable implements Closeable {
 
     final GTInfo info;
     final IGTStore store;
@@ -50,4 +51,11 @@ public class GridTable {
     public IGTStore getStore() {
         return store;
     }
+
+    @Override
+    public void close() throws IOException {
+        if (store instanceof Closeable) {
+            ((Closeable) store).close();
+        }
+    }
 }