You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2017/12/04 16:51:26 UTC

[1/2] kylin git commit: Revert "APACHE-KYLIN-2732: fix bug of missing records"

Repository: kylin
Updated Branches:
  refs/heads/master 43d02f473 -> 5a6e6d7e3


Revert "APACHE-KYLIN-2732: fix bug of missing records"

This reverts commit 84779827ad56673848c0e2f6b589a406dac1bce2.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d103e639
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d103e639
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d103e639

Branch: refs/heads/master
Commit: d103e639db0530cb21d3f98584fe2552ddfe8b59
Parents: 43d02f4
Author: Zhong <nj...@apache.org>
Authored: Tue Dec 5 00:37:27 2017 +0800
Committer: Zhong <nj...@apache.org>
Committed: Tue Dec 5 00:37:27 2017 +0800

----------------------------------------------------------------------
 .../ConsumeBlockingQueueController.java         |   6 +-
 .../RecordConsumeBlockingQueueController.java   |  50 ++++----
 .../ConsumeBlockingQueueControllerTest.java     | 127 -------------------
 3 files changed, 27 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d103e639/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
index 8875618..a9e55f7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
@@ -48,7 +48,7 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> {
         if (hasException) {
             return false;
         }
-        if (hasNextInBuffer()) {
+        if (internalIT.hasNext()) {
             return true;
         } else {
             batchBuffer.clear();
@@ -74,10 +74,6 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> {
         throw new UnsupportedOperationException();
     }
 
-    protected boolean hasNextInBuffer() {
-        return internalIT.hasNext();
-    }
-
     public void findException() {
         hasException = true;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d103e639/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
index 8c51fc6..49cbe1f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.cube.inmemcubing;
 
-import java.util.NoSuchElementException;
 import java.util.concurrent.BlockingQueue;
 
 public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueueController<T> {
@@ -32,39 +31,25 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
    
     private T currentObject = null;
     private volatile boolean ifEnd = false;
-    private volatile boolean ifCut = false;
+    private volatile boolean cut = false;
+    private long outputRowCountCut = 0L;
 
     @Override
     public boolean hasNext() {
         if (currentObject != null) {
-            return true;
+            return hasNext(currentObject);
         }
-        if (ifCut) {
-            if (!super.hasNextInBuffer()) {
-                return false;
-            }
-        } else {
-            if (!super.hasNext()) {
-                return false;
-            }
-        }
-
-        currentObject = super.next();
-        if (inputConverterUnit.ifEnd(currentObject)) {
-            ifEnd = true;
+        if (!super.hasNext()) {
             return false;
-        } else if (inputConverterUnit.ifCut(currentObject)) {
-            ifCut = true;
-            currentObject = null;
-            return hasNext();
         }
-        return true;
+        currentObject = super.next();
+        return hasNext(currentObject);
     }
 
     @Override
     public T next() {
-        if (ifEnd() || currentObject == null)
-            throw new NoSuchElementException();
+        if (ifEnd())
+            throw new IllegalStateException();
 
         T result = currentObject;
         currentObject = null;
@@ -74,6 +59,18 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
     public boolean ifEnd() {
         return ifEnd;
     }
+
+    private boolean hasNext(T object) {
+        if (inputConverterUnit.ifEnd(object)) {
+            ifEnd = true;
+            return false;
+        }else if(cut){
+            return false;
+        }else if(inputConverterUnit.ifCut(object)){
+            return false;
+        }
+        return true;
+    }
     
     public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input){
         return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, DEFAULT_BATCH_SIZE);
@@ -84,6 +81,11 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
     }
 
     public void forceCutPipe() {
-        ifCut = true;
+        cut = true;
+        outputRowCountCut = getOutputRowCount();
+    }
+
+    public long getOutputRowCountAfterCut() {
+        return getOutputRowCount() - outputRowCountCut;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d103e639/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
deleted file mode 100644
index 681ae62..0000000
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.inmemcubing;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.kylin.gridtable.GTRecord;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConsumeBlockingQueueControllerTest {
-    private static final Logger logger = LoggerFactory.getLogger(ConsumeBlockingQueueControllerTest.class);
-
-    @Test
-    public void testIterator() {
-        final int nRecord = 4345;
-        final int nCut = 2000;
-        final int nBatch = 60;
-
-        final BlockingQueue<String> input = new LinkedBlockingQueue<>();
-        Thread producer = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    for (int i = 1; i <= nRecord; i++) {
-                        input.put("test");
-                        if (i % nCut == 0) {
-                            input.put(InputConverterUnitTest.CUT_ROW);
-                        }
-                    }
-                    input.put(InputConverterUnitTest.END_ROW);
-                } catch (InterruptedException e) {
-                    logger.warn("Fail to produce records into BlockingQueue due to: " + e);
-                }
-            }
-        });
-
-        final AtomicInteger nRecordConsumed = new AtomicInteger(0);
-        Thread consumer = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                int nSplit = 0;
-                while (true) {
-                    RecordConsumeBlockingQueueController blockingQueueController = RecordConsumeBlockingQueueController
-                            .getQueueController(new InputConverterUnitTest(), input, nBatch);
-                    while (blockingQueueController.hasNext()) {
-                        blockingQueueController.next();
-                        nRecordConsumed.incrementAndGet();
-                    }
-                    System.out.println(nRecordConsumed.get() + " records consumed when finished split " + nSplit);
-                    nSplit++;
-
-                    if (blockingQueueController.ifEnd()) {
-                        break;
-                    }
-                }
-            }
-        });
-
-        producer.start();
-        consumer.start();
-
-        try {
-            producer.join();
-            consumer.join();
-        } catch (InterruptedException e) {
-            logger.warn("Fail to join threads: " + e);
-        }
-
-        Assert.assertEquals(nRecord, nRecordConsumed.get());
-    }
-
-    private static class InputConverterUnitTest implements InputConverterUnit<String> {
-        public static final String END_ROW = new String();
-        public static final String CUT_ROW = "0";
-
-        @Override
-        public void convert(String currentObject, GTRecord record) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public boolean ifEnd(String currentObject) {
-            return currentObject == END_ROW;
-        }
-
-        @Override
-        public boolean ifCut(String currentObject) {
-            return currentObject == CUT_ROW;
-        }
-
-        @Override
-        public String getEndRow() {
-            return END_ROW;
-        }
-
-        @Override
-        public String getCutRow() {
-            return CUT_ROW;
-        }
-
-        @Override
-        public boolean ifChange() {
-            throw new UnsupportedOperationException();
-        }
-    }
-}


[2/2] kylin git commit: APACHE-KYLIN-2732: better way to fix bug of missing records

Posted by nj...@apache.org.
APACHE-KYLIN-2732: better way to fix bug of missing records

Signed-off-by: Zhong <nj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5a6e6d7e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5a6e6d7e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5a6e6d7e

Branch: refs/heads/master
Commit: 5a6e6d7e38342691a057345a9d51c4375df8dc8f
Parents: d103e63
Author: Wang Ken <mi...@ebay.com>
Authored: Mon Dec 4 13:13:18 2017 +0800
Committer: Zhong <nj...@apache.org>
Committed: Tue Dec 5 00:38:22 2017 +0800

----------------------------------------------------------------------
 .../ConsumeBlockingQueueController.java         |   2 +-
 .../cube/inmemcubing/DoggedCubeBuilder.java     |  19 ++-
 .../RecordConsumeBlockingQueueController.java   |  43 +++----
 .../ConsumeBlockingQueueControllerTest.java     | 124 +++++++++++++++++++
 .../inmemcubing/ITDoggedCubeBuilderTest.java    |   5 +-
 .../inmemcubing/ITInMemCubeBuilderTest.java     |  13 ++
 6 files changed, 164 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
index a9e55f7..81f44a0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
@@ -78,7 +78,7 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> {
         hasException = true;
     }
 
-    public long getOutputRowCount() {
+    public int getOutputRowCount() {
         return outputRowCount.get();
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index ccd7137..d761505 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -50,7 +50,6 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
 
     private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder.class);
 
-    private int splitRowThreshold = Integer.MAX_VALUE;
     private int unitRows = ConsumeBlockingQueueController.DEFAULT_BATCH_SIZE;
 
     public DoggedCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc,
@@ -62,11 +61,6 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
             unitRows /= 10;
     }
 
-    public void setSplitRowThreshold(int rowThreshold) {
-        this.splitRowThreshold = rowThreshold;
-        this.unitRows = Math.min(unitRows, rowThreshold);
-    }
-
     @Override
     public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
             throws IOException {
@@ -80,6 +74,9 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
 
         public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
                 throws IOException {
+            final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController
+                    .getQueueController(inputConverterUnit, input, unitRows);
+
             final List<SplitThread> splits = new ArrayList<SplitThread>();
             final Merger merger = new Merger();
 
@@ -88,8 +85,11 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
 
             try {
                 while (true) {
-                    SplitThread last = new SplitThread(splits.size() + 1, RecordConsumeBlockingQueueController
-                            .getQueueController(inputConverterUnit, input, unitRows));
+                    if (inputController.ifEnd()) {
+                        break;
+                    }
+
+                    SplitThread last = new SplitThread(splits.size() + 1, inputController);
                     splits.add(last);
 
                     last.start();
@@ -99,9 +99,6 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                     last.join();
 
                     checkException(splits);
-                    if (last.inputController.ifEnd()) {
-                        break;
-                    }
                 }
 
                 logger.info("Dogged Cube Build splits complete, took " + (System.currentTimeMillis() - start) + " ms");

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
index 49cbe1f..5fc3e32 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
@@ -31,24 +31,34 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
    
     private T currentObject = null;
     private volatile boolean ifEnd = false;
-    private volatile boolean cut = false;
-    private long outputRowCountCut = 0L;
 
     @Override
-    public boolean hasNext() {
+    public boolean hasNext() { // should be idempotent
+        if (ifEnd) {
+            return false;
+        }
         if (currentObject != null) {
-            return hasNext(currentObject);
+            return true;
         }
         if (!super.hasNext()) {
             return false;
         }
         currentObject = super.next();
-        return hasNext(currentObject);
+
+        if (inputConverterUnit.ifEnd(currentObject)) {
+            ifEnd = true;
+            return false;
+        } else if (inputConverterUnit.ifCut(currentObject)) {
+            currentObject = null;
+            hasNext();
+            return false;
+        }
+        return true;
     }
 
     @Override
     public T next() {
-        if (ifEnd())
+        if (ifEnd() || currentObject == null)
             throw new IllegalStateException();
 
         T result = currentObject;
@@ -59,18 +69,6 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
     public boolean ifEnd() {
         return ifEnd;
     }
-
-    private boolean hasNext(T object) {
-        if (inputConverterUnit.ifEnd(object)) {
-            ifEnd = true;
-            return false;
-        }else if(cut){
-            return false;
-        }else if(inputConverterUnit.ifCut(object)){
-            return false;
-        }
-        return true;
-    }
     
     public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input){
         return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, DEFAULT_BATCH_SIZE);
@@ -79,13 +77,4 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
     public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input, int batchSize){
         return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, batchSize);
     }
-
-    public void forceCutPipe() {
-        cut = true;
-        outputRowCountCut = getOutputRowCount();
-    }
-
-    public long getOutputRowCountAfterCut() {
-        return getOutputRowCount() - outputRowCountCut;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
new file mode 100644
index 0000000..a443596
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.gridtable.GTRecord;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumeBlockingQueueControllerTest {
+    private static final Logger logger = LoggerFactory.getLogger(ConsumeBlockingQueueControllerTest.class);
+
+    @Test
+    public void testIterator() {
+        final int nRecord = 4345;
+        final int nCut = 2000;
+        final int nBatch = 60;
+
+        final BlockingQueue<String> input = new LinkedBlockingQueue<>();
+        final RecordConsumeBlockingQueueController inputController = RecordConsumeBlockingQueueController
+                .getQueueController(new InputConverterUnitTest(), input, nBatch);
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    for (int i = 1; i <= nRecord; i++) {
+                        input.put("test");
+                        if (i % nCut == 0) {
+                            input.put(InputConverterUnitTest.CUT_ROW);
+                        }
+                    }
+                    input.put(InputConverterUnitTest.END_ROW);
+                } catch (InterruptedException e) {
+                    logger.warn("Fail to produce records into BlockingQueue due to: " + e);
+                }
+            }
+        }).start();
+
+        final AtomicInteger nRecordConsumed = new AtomicInteger(0);
+        final AtomicInteger nSplit = new AtomicInteger(0);
+        while (true) {
+            // producer done & consume the end row flag
+            if (inputController.ifEnd()) {
+                break;
+            }
+            nSplit.incrementAndGet();
+            Thread consumer = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    while (inputController.hasNext()) {
+                        inputController.next();
+                        nRecordConsumed.incrementAndGet();
+                    }
+                    System.out.println(nRecordConsumed.get() + " records consumed when finished split " + nSplit.get());
+                }
+            });
+            consumer.start();
+            try {
+                consumer.join();
+            } catch (InterruptedException e) {
+                logger.warn("Fail to join consumer thread: " + e);
+            }
+        }
+
+        Assert.assertEquals(nRecord, nRecordConsumed.get());
+    }
+
+    private static class InputConverterUnitTest implements InputConverterUnit<String> {
+        public static final String END_ROW = new String();
+        public static final String CUT_ROW = "0";
+
+        @Override
+        public void convert(String currentObject, GTRecord record) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean ifEnd(String currentObject) {
+            return currentObject == END_ROW;
+        }
+
+        @Override
+        public boolean ifCut(String currentObject) {
+            return currentObject == CUT_ROW;
+        }
+
+        @Override
+        public String getEndRow() {
+            return END_ROW;
+        }
+
+        @Override
+        public String getCutRow() {
+            return CUT_ROW;
+        }
+
+        @Override
+        public boolean ifChange() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
index 0338da8..865cdbb 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -89,12 +89,11 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
         DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
         doggedBuilder.setConcurrentThreads(THREADS);
-        doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
         FileRecordWriter doggedResult = new FileRecordWriter();
 
         {
             Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
-            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed, SPLIT_ROWS);
             future.get();
             doggedResult.close();
         }
@@ -110,7 +109,7 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
             inmemResult.close();
         }
 
-        fileCompare(doggedResult.file, inmemResult.file);
+        fileCompare(inmemResult.file, doggedResult.file);
         doggedResult.file.delete();
         inmemResult.file.delete();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index 3f97f80..f1b65b0 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -151,6 +151,11 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
 
     static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<String[]> queue, int count,
             long randSeed) throws IOException, InterruptedException {
+        feedData(cube, flatTable, queue, count, randSeed, Integer.MAX_VALUE);
+    }
+
+    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<String[]> queue, int count,
+            long randSeed, int splitRowThreshold) throws IOException, InterruptedException {
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
         int nColumns = flatDesc.getAllColumns().size();
 
@@ -178,6 +183,7 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
             rand.setSeed(randSeed);
 
         // output with random data
+        int countOfLastSplit = 0;
         for (; count > 0; count--) {
             String[] row = new String[nColumns];
             for (int i = 0; i < nColumns; i++) {
@@ -185,6 +191,13 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
                 row[i] = candidates[rand.nextInt(candidates.length)];
             }
             queue.put(row);
+
+            // put cut row if possible
+            countOfLastSplit++;
+            if (countOfLastSplit >= splitRowThreshold) {
+                queue.put(InputConverterUnitForRawData.CUT_ROW);
+                countOfLastSplit = 0;
+            }
         }
         queue.put(InputConverterUnitForRawData.END_ROW);
     }