You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/23 12:01:41 UTC

incubator-kylin git commit: KYLIN-803 function done but merge output is very slow, need refactoring

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 10f075d63 -> d02016944


KYLIN-803 function done but merge output is very slow, need refactoring


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d0201694
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d0201694
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d0201694

Branch: refs/heads/0.8
Commit: d0201694446e7665d937287ea2d24b738a2869df
Parents: 10f075d
Author: Yang Li <li...@apache.org>
Authored: Tue Jun 23 07:19:27 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Jun 23 18:00:24 2015 +0800

----------------------------------------------------------------------
 .../common/hll/HyperLogLogPlusCounter.java      |   5 +
 .../common/util/LocalFileMetadataTestCase.java  |   4 +
 .../job/inmemcubing/DoggedCubeBuilder.java      | 253 ++++++++++++++++---
 .../kylin/job/inmemcubing/InMemCubeBuilder.java |   4 +-
 .../job/inmemcubing/DoggedCubeBuilderTest.java  | 156 ++++++++++++
 .../job/inmemcubing/InMemCubeBuilderTest.java   |  68 ++---
 .../rest/controller/UserControllerTest.java     |   2 +-
 .../kylin/rest/service/CacheServiceTest.java    |   2 +-
 .../kylin/rest/service/ServiceTestBase.java     |   2 +-
 .../storage/cube/CuboidToGridTableMapping.java  |  16 +-
 10 files changed, 436 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0201694/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
index 251aed1..50babd9 100644
--- a/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
+++ b/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
@@ -120,6 +120,11 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
         }
         return size;
     }
+    
+    @Override
+    public String toString() {
+        return "" + getCountEstimate();
+    }
 
     // ============================================================================
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0201694/common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java b/common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
index 1bd218e..6939e20 100644
--- a/common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
+++ b/common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
@@ -32,6 +32,10 @@ public class LocalFileMetadataTestCase extends AbstractKylinTestCase {
 
     @Override
     public void createTestMetadata() {
+        staticCreateTestMetadata();
+    }
+    
+    public static void staticCreateTestMetadata() {
         staticCreateTestMetadata(LOCALMETA_TEST_DATA);
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0201694/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
index 02f24f7..c451692 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
@@ -19,20 +19,28 @@ package org.apache.kylin.job.inmemcubing;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.measure.MeasureAggregators;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
 import org.apache.kylin.storage.gridtable.GTRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * When base cuboid does not fit in memory, cut the input into multiple splits and merge the split outputs at last.
  */
@@ -40,10 +48,18 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
 
     private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder.class);
 
+    private int splitRowThreshold = Integer.MAX_VALUE;
+    private int unitRows = 1000;
+
     public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
         super(cubeDesc, dictionaryMap);
     }
 
+    public void setSplitRowThreshold(int rowThreshold) {
+        this.splitRowThreshold = rowThreshold;
+        this.unitRows = Math.min(unitRows, rowThreshold);
+    }
+
     @Override
     public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
         new BuildOnce().build(input, output);
@@ -51,9 +67,10 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
 
     private class BuildOnce {
 
+        final List<SplitThread> splits = new ArrayList<SplitThread>();
+        final Merger merger = new Merger();
+
         public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
-            List<SplitThread> splits = new ArrayList<SplitThread>();
-            Merger merger = new Merger();
             SplitThread last = null;
             boolean eof = false;
 
@@ -63,20 +80,20 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                     cutSplit(last);
                     last = null;
                 }
-                
+
                 checkException(splits);
-                
+
                 if (last == null) {
-                    last = new SplitThread(merger.newMergeSlot());
+                    last = new SplitThread(merger);
                     splits.add(last);
                     last.start();
                 }
-                
-                eof = feedSomeInput(input, last, 1000);
+
+                eof = feedSomeInput(input, last, unitRows);
             }
-            
+
             merger.mergeAndOutput(splits, output);
-            
+
             checkException(splits);
         }
 
@@ -92,7 +109,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
             for (SplitThread split : splits) {
                 split.builder.abort();
             }
-            
+
             ArrayList<Throwable> errors = new ArrayList<Throwable>();
             for (SplitThread split : splits) {
                 try {
@@ -103,7 +120,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                 if (split.exception != null)
                     errors.add(split.exception);
             }
-            
+
             if (errors.isEmpty()) {
                 return;
             } else if (errors.size() == 1) {
@@ -126,18 +143,19 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                 while (i < n) {
                     List<String> record = input.take();
                     i++;
-                    
+
                     while (split.inputQueue.offer(record, 1, TimeUnit.SECONDS) == false) {
                         if (split.exception != null)
                             return true; // got some error
                     }
-                    
+                    split.inputRowCount++;
+
                     if (record == null || record.isEmpty()) {
                         return true;
                     }
                 }
                 return false;
-                
+
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
@@ -152,7 +170,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                     }
                     Thread.sleep(1000);
                 }
-                
+
                 // wait cuboid build done (but still pending output)
                 while (last.isAlive()) {
                     if (last.builder.isAllCuboidDone()) {
@@ -166,25 +184,31 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
         }
 
         private boolean shouldCutSplit() {
-            return MemoryBudgetController.getSystemAvailMB() <= reserveMemoryMB;
+            int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
+            int nSplit = splits.size();
+            long splitRowCount = nSplit == 0 ? 0 : splits.get(nSplit - 1).inputRowCount;
+            
+            logger.debug(splitRowCount + " records went into split #" + nSplit + "; " + systemAvailMB + " MB left, " + reserveMemoryMB + " MB threshold");
+            
+            return splitRowCount >= splitRowThreshold || systemAvailMB <= reserveMemoryMB;
         }
     }
 
     private class SplitThread extends Thread {
-        final BlockingQueue<List<String>> inputQueue = new ArrayBlockingQueue<List<String>>(64);
-        final BlockingQueue<Pair<Long, GTRecord>> outputQueue = new ArrayBlockingQueue<Pair<Long, GTRecord>>(64);
+        final BlockingQueue<List<String>> inputQueue = new ArrayBlockingQueue<List<String>>(16);
         final InMemCubeBuilder builder;
         final MergeSlot output;
 
+        long inputRowCount = 0;
         RuntimeException exception;
 
-        public SplitThread(MergeSlot output) {
+        public SplitThread(Merger merger) {
             this.builder = new InMemCubeBuilder(cubeDesc, dictionaryMap);
             this.builder.setConcurrentThreads(taskThreadCount);
-            this.builder.setOutputOrder(true); // sort merge requires order
+            this.builder.setOutputOrder(true); // merge sort requires order
             this.builder.setReserveMemoryMB(reserveMemoryMB);
-            
-            this.output = output;
+
+            this.output = merger.newMergeSlot(this);
         }
 
         @Override
@@ -199,39 +223,186 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
             }
         }
     }
-    
+
     private class Merger {
-        
-        public MergeSlot newMergeSlot() {
-            return new MergeSlot();
+
+        MeasureAggregators reuseAggrs;
+        Object[] reuseMetricsArray;
+        ByteArray reuseMetricsSpace;
+
+        long lastCuboidColumnCount;
+        ImmutableBitSet lastMetricsColumns;
+
+        Merger() {
+            MeasureDesc[] measures = CuboidToGridTableMapping.getMeasureSequenceOnGridTable(cubeDesc);
+            reuseAggrs = new MeasureAggregators(measures);
+            reuseMetricsArray = new Object[measures.length];
         }
 
-        public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) {
-            // TODO
+        public MergeSlot newMergeSlot(SplitThread split) {
+            return new MergeSlot(split);
+        }
+
+        public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) throws IOException {
+            LinkedList<MergeSlot> open = Lists.newLinkedList();
+            for (SplitThread split : splits)
+                open.add(split.output);
+
+            if (splits.size() == 1) {
+                splits.get(0).output.directOutput = output;
+            }
+
+            try {
+                PriorityQueue<MergeSlot> heap = new PriorityQueue<MergeSlot>();
+                boolean hasMore = true;
+
+                while (hasMore) {
+                    takeRecordsFromAllOpenSlots(open, heap);
+                    hasMore = mergeAndOutputOneRecord(heap, open, output);
+                }
+
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private void takeRecordsFromAllOpenSlots(LinkedList<MergeSlot> open, PriorityQueue<MergeSlot> heap) throws InterruptedException {
+            while (!open.isEmpty()) {
+                MergeSlot slot = open.getFirst();
+                // ready one record in the slot
+                if (slot.readySignal.poll(1, TimeUnit.SECONDS) != null) {
+                    open.removeFirst();
+                    heap.add(slot);
+                } else if (slot.isClosed()) {
+                    open.removeFirst();
+                }
+            }
+            return;
+        }
+
+        private boolean mergeAndOutputOneRecord(PriorityQueue<MergeSlot> heap, LinkedList<MergeSlot> open, ICuboidWriter output) throws IOException, InterruptedException {
+            MergeSlot smallest = heap.poll();
+            if (smallest == null)
+                return false;
+            open.add(smallest);
+
+            if (smallest.isSameKey(heap.peek())) {
+                Object[] metrics = getMetricsValues(smallest.record);
+                reuseAggrs.reset();
+                reuseAggrs.aggregate(metrics);
+                do {
+                    MergeSlot slot = heap.poll();
+                    open.add(slot);
+                    metrics = getMetricsValues(slot.record);
+                    reuseAggrs.aggregate(metrics);
+                } while (smallest.isSameKey(heap.peek()));
+                
+                reuseAggrs.collectStates(metrics);
+                setMetricsValues(smallest.record, metrics);
+            }
+
+            output.write(smallest.cuboidId, smallest.record);
+
+            for (MergeSlot slot : open) {
+                slot.consumedSignal.put(this);
+            }
+            return true;
+        }
+
+        private void setMetricsValues(GTRecord record, Object[] metricsValues) {
+            ImmutableBitSet metrics = getMetricsColumns(record);
+
+            if (reuseMetricsSpace == null) {
+                reuseMetricsSpace = new ByteArray(record.getInfo().getMaxColumnLength(metrics));
+            }
+
+            record.setValues(metrics, reuseMetricsSpace, metricsValues);
+        }
+
+        private Object[] getMetricsValues(GTRecord record) {
+            ImmutableBitSet metrics = getMetricsColumns(record);
+            return record.getValues(metrics, reuseMetricsArray);
+        }
+
+        private ImmutableBitSet getMetricsColumns(GTRecord record) {
+            // metrics columns always come after dimension columns
+            if (lastCuboidColumnCount == record.getInfo().getColumnCount())
+                return lastMetricsColumns;
+
+            int to = record.getInfo().getColumnCount();
+            int from = to - reuseMetricsArray.length;
+            lastCuboidColumnCount = record.getInfo().getColumnCount();
+            lastMetricsColumns = new ImmutableBitSet(from, to);
+            return lastMetricsColumns;
         }
     }
-    
-    private static class MergeSlot implements ICuboidWriter {
-        
-        BlockingQueue<MergeSlot> queue = new ArrayBlockingQueue<MergeSlot>(1);
+
+    private static class MergeSlot implements ICuboidWriter, Comparable<MergeSlot> {
+
+        final SplitThread split;
+        final BlockingQueue<Object> readySignal = new ArrayBlockingQueue<Object>(1);
+        final BlockingQueue<Object> consumedSignal = new ArrayBlockingQueue<Object>(1);
+
+        ICuboidWriter directOutput = null;
         long cuboidId;
         GTRecord record;
-        
+
+        public MergeSlot(SplitThread split) {
+            this.split = split;
+        }
+
         @Override
         public void write(long cuboidId, GTRecord record) throws IOException {
+            // when only one split left
+            if (directOutput != null) {
+                directOutput.write(cuboidId, record);
+                return;
+            }
+
             this.cuboidId = cuboidId;
             this.record = record;
-        
+
             try {
-                // deliver the record
-                queue.put(this);
-                
-                // confirm merger consumed (took) the record
-                queue.put(this);
-                
+                // signal record is ready
+                readySignal.put(this);
+
+                // wait record be consumed
+                consumedSignal.take();
+
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
         }
+
+        public boolean isClosed() {
+            return split.isAlive() == false;
+        }
+
+        @Override
+        public int compareTo(MergeSlot o) {
+            long cuboidComp = this.cuboidId - o.cuboidId;
+            if (cuboidComp != 0)
+                return cuboidComp < 0 ? -1 : 1;
+            else
+                return this.record.compareTo(o.record);
+        }
+
+        public boolean isSameKey(MergeSlot o) {
+            if (o == null)
+                return false;
+
+            if (this.cuboidId != o.cuboidId)
+                return false;
+
+            // note GTRecord.equals() don't work because the two GTRecord comes from different GridTable
+            ImmutableBitSet pk = this.record.getInfo().getPrimaryKey();
+            for (int i = 0; i < pk.trueBitCount(); i++) {
+                int c = pk.trueBitAt(i);
+                if (this.record.get(c).equals(o.record.get(c)) == false)
+                    return false;
+            }
+            return true;
+        }
+
     };
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0201694/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 53d7486..8d0b0fb 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -86,7 +86,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private Thread[] taskThreads;
     private Throwable[] taskThreadExceptions;
     private TreeSet<CuboidTask> taskPending;
-    private AtomicInteger taskCuboidCompleted;
+    private AtomicInteger taskCuboidCompleted = new AtomicInteger(0);
 
     private OutputThread outputThread;
     private int outputCuboidExpected;
@@ -159,7 +159,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
         // multiple threads to compute cuboid in parallel
         taskPending = new TreeSet<CuboidTask>();
-        taskCuboidCompleted = new AtomicInteger(0);
+        taskCuboidCompleted.set(0);
         taskThreads = prepareTaskThreads();
         taskThreadExceptions = new Throwable[taskThreadCount];
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0201694/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
new file mode 100644
index 0000000..5c19df3
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
@@ -0,0 +1,156 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderTest.class);
+
+    private static final int INPUT_ROWS = 10000;
+    private static final int SPLIT_ROWS = 5000;
+    private static final int THREADS = 4;
+
+    private static CubeInstance cube;
+    private static String flatTable;
+    private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+
+    @BeforeClass
+    public static void before() throws IOException {
+        staticCreateTestMetadata();
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+        flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
+        dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+    }
+
+    @AfterClass
+    public static void after() throws Exception {
+        staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        long randSeed = 101;
+
+        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        doggedBuilder.setOutputOrder(true);
+        doggedBuilder.setConcurrentThreads(THREADS);
+        doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
+        FileRecordWriter doggedResult = new FileRecordWriter();
+
+        {
+            Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
+            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            future.get();
+            doggedResult.close();
+        }
+
+        InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        inmemBuilder.setOutputOrder(true);
+        inmemBuilder.setConcurrentThreads(THREADS);
+        FileRecordWriter inmemResult = new FileRecordWriter();
+
+        {
+            Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            future.get();
+            inmemResult.close();
+        }
+
+        fileCompare(doggedResult.file, inmemResult.file);
+        doggedResult.file.delete();
+        inmemResult.file.delete();
+    }
+
+    private void fileCompare(File file, File file2) throws IOException {
+        BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
+        BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8"));
+
+        String line1, line2;
+        do {
+            line1 = r1.readLine();
+            line2 = r2.readLine();
+            
+            assertEquals(line1, line2);
+            
+        } while (line1 != null || line2 != null);
+
+        r1.close();
+        r2.close();
+    }
+
+    class FileRecordWriter implements ICuboidWriter {
+
+        File file;
+        PrintWriter writer;
+
+        FileRecordWriter() throws IOException {
+            file = File.createTempFile("DoggedCubeBuilderTest_", ".data");
+            writer = new PrintWriter(file, "UTF-8");
+        }
+
+        @Override
+        public void write(long cuboidId, GTRecord record) throws IOException {
+            writer.print(cuboidId);
+            writer.print(", ");
+            writer.print(record.toString());
+            writer.println();
+        }
+
+        public void close() {
+            writer.close();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0201694/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
index 34e37f2..2a4cf8a 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -42,8 +42,8 @@ import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.lookup.FileTableReader;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,42 +57,45 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
 
     private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class);
 
-    private KylinConfig kylinConfig;
-    private CubeManager cubeManager;
-
-    @Before
-    public void before() {
-        createTestMetadata();
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        cubeManager = CubeManager.getInstance(kylinConfig);
+    private static final int INPUT_ROWS = 70000;
+    private static final int THREADS = 4;
+    
+    private static CubeInstance cube;
+    private static String flatTable;
+    private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+    
+    @BeforeClass
+    public static void before() throws IOException {
+        staticCreateTestMetadata();
+        
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+        
+        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+        flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
+        dictionaryMap = getDictionaryMap(cube, flatTable);
     }
 
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
+    @AfterClass
+    public static void after() throws Exception {
+        staticCleanupTestMetadata();
     }
 
     @Test
     public void test() throws Exception {
-        final int inputRows = 70000;
-        final int threads = 4;
-
-        final CubeInstance cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
-        final String flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
-
-        Map<TblColRef, Dictionary<?>> dictionaryMap = getDictionaryMap(cube, flatTable);
-        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
 
         InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        cubeBuilder.setConcurrentThreads(threads);
+        cubeBuilder.setOutputOrder(true);
+        cubeBuilder.setConcurrentThreads(THREADS);
+        
+        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
         ExecutorService executorService = Executors.newSingleThreadExecutor();
 
         try {
             // round 1
             {
                 Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
-                feedData(cube, flatTable, queue, inputRows);
+                feedData(cube, flatTable, queue, INPUT_ROWS);
                 future.get();
             }
             
@@ -106,7 +109,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
             // round 3
             {
                 Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
-                feedData(cube, flatTable, queue, inputRows);
+                feedData(cube, flatTable, queue, INPUT_ROWS);
                 future.get();
             }
             
@@ -116,7 +119,11 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
         }
     }
 
-    private void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
+    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
+        feedData(cube, flatTable, queue, count, 0);
+    }
+    
+    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
         CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
         int nColumns = flatTableDesc.getColumnList().size();
 
@@ -139,8 +146,11 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
             distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
         }
 
-        // output with random data
         Random rand = new Random();
+        if (randSeed != 0)
+            rand.setSeed(randSeed);
+        
+        // output with random data
         for (; count > 0; count--) {
             ArrayList<String> row = new ArrayList<String>(nColumns);
             for (int i = 0; i < nColumns; i++) {
@@ -152,7 +162,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
         queue.put(new ArrayList<String>(0));
     }
 
-    private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
+    static Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
         Map<TblColRef, Dictionary<?>> result = Maps.newHashMap();
         CubeDesc desc = cube.getDescriptor();
         CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
@@ -171,7 +181,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
         return result;
     }
 
-    private List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
+    private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
         List<byte[]> result = Lists.newArrayList();
         FileTableReader reader = new FileTableReader(flatTable, nColumns);
         while (reader.next()) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0201694/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
index 68df596..ffd7b45 100644
--- a/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
@@ -43,7 +43,7 @@ public class UserControllerTest extends ServiceTestBase {
 
     @BeforeClass
     public static void setupResource() {
-        staticCreateTestMetadata(LOCALMETA_TEST_DATA);
+        staticCreateTestMetadata();
         List<GrantedAuthority> authorities = new ArrayList<GrantedAuthority>();
         User user = new User("ADMIN", "ADMIN", authorities);
         Authentication authentication = new TestingAuthenticationToken(user, "ADMIN", "ROLE_ADMIN");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0201694/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 3ccef39..9f1549e 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -63,7 +63,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
 
     @BeforeClass
     public static void beforeClass() throws Exception {
-        staticCreateTestMetadata(LOCALMETA_TEST_DATA);
+        staticCreateTestMetadata();
         configA = KylinConfig.getInstanceFromEnv();
         configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
         configB.setMetadataUrl("../examples/test_metadata");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0201694/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
index 699c008..064badb 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
@@ -45,7 +45,7 @@ public class ServiceTestBase extends LocalFileMetadataTestCase {
 
     @BeforeClass
     public static void setupResource() throws Exception {
-        staticCreateTestMetadata(LOCALMETA_TEST_DATA);
+        staticCreateTestMetadata();
         Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", "ROLE_ADMIN");
         SecurityContextHolder.getContext().setAuthentication(authentication);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0201694/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java b/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
index 82abf57..0bf4573 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
@@ -7,6 +7,7 @@ import java.util.Map;
 
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.metadata.model.DataType;
@@ -37,7 +38,7 @@ public class CuboidToGridTableMapping {
         this.cuboid = cuboid;
         init();
     }
-
+    
     private void init() {
         int gtColIdx = 0;
         gtDataTypes = Lists.newArrayList();
@@ -155,4 +156,17 @@ public class CuboidToGridTableMapping {
         }
         return result.isEmpty() ? Collections.<Integer, Integer>emptyMap() : result;
     }
+
+    public static MeasureDesc[] getMeasureSequenceOnGridTable(CubeDesc cube) {
+        MeasureDesc[] result = new MeasureDesc[cube.getMeasures().size()];
+        int i = 0;
+        for (HBaseColumnFamilyDesc familyDesc : cube.getHbaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+                for (MeasureDesc m : hbaseColDesc.getMeasures()) {
+                    result[i++] = m;
+                }
+            }
+        }
+        return result;
+    }
 }