You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2017/12/04 16:51:26 UTC
[1/2] kylin git commit: Revert "APACHE-KYLIN-2732: fix bug of missing
records"
Repository: kylin
Updated Branches:
refs/heads/master 43d02f473 -> 5a6e6d7e3
Revert "APACHE-KYLIN-2732: fix bug of missing records"
This reverts commit 84779827ad56673848c0e2f6b589a406dac1bce2.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d103e639
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d103e639
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d103e639
Branch: refs/heads/master
Commit: d103e639db0530cb21d3f98584fe2552ddfe8b59
Parents: 43d02f4
Author: Zhong <nj...@apache.org>
Authored: Tue Dec 5 00:37:27 2017 +0800
Committer: Zhong <nj...@apache.org>
Committed: Tue Dec 5 00:37:27 2017 +0800
----------------------------------------------------------------------
.../ConsumeBlockingQueueController.java | 6 +-
.../RecordConsumeBlockingQueueController.java | 50 ++++----
.../ConsumeBlockingQueueControllerTest.java | 127 -------------------
3 files changed, 27 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/d103e639/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
index 8875618..a9e55f7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
@@ -48,7 +48,7 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> {
if (hasException) {
return false;
}
- if (hasNextInBuffer()) {
+ if (internalIT.hasNext()) {
return true;
} else {
batchBuffer.clear();
@@ -74,10 +74,6 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> {
throw new UnsupportedOperationException();
}
- protected boolean hasNextInBuffer() {
- return internalIT.hasNext();
- }
-
public void findException() {
hasException = true;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d103e639/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
index 8c51fc6..49cbe1f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
@@ -18,7 +18,6 @@
package org.apache.kylin.cube.inmemcubing;
-import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueueController<T> {
@@ -32,39 +31,25 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
private T currentObject = null;
private volatile boolean ifEnd = false;
- private volatile boolean ifCut = false;
+ private volatile boolean cut = false;
+ private long outputRowCountCut = 0L;
@Override
public boolean hasNext() {
if (currentObject != null) {
- return true;
+ return hasNext(currentObject);
}
- if (ifCut) {
- if (!super.hasNextInBuffer()) {
- return false;
- }
- } else {
- if (!super.hasNext()) {
- return false;
- }
- }
-
- currentObject = super.next();
- if (inputConverterUnit.ifEnd(currentObject)) {
- ifEnd = true;
+ if (!super.hasNext()) {
return false;
- } else if (inputConverterUnit.ifCut(currentObject)) {
- ifCut = true;
- currentObject = null;
- return hasNext();
}
- return true;
+ currentObject = super.next();
+ return hasNext(currentObject);
}
@Override
public T next() {
- if (ifEnd() || currentObject == null)
- throw new NoSuchElementException();
+ if (ifEnd())
+ throw new IllegalStateException();
T result = currentObject;
currentObject = null;
@@ -74,6 +59,18 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
public boolean ifEnd() {
return ifEnd;
}
+
+ private boolean hasNext(T object) {
+ if (inputConverterUnit.ifEnd(object)) {
+ ifEnd = true;
+ return false;
+ }else if(cut){
+ return false;
+ }else if(inputConverterUnit.ifCut(object)){
+ return false;
+ }
+ return true;
+ }
public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input){
return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, DEFAULT_BATCH_SIZE);
@@ -84,6 +81,11 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
}
public void forceCutPipe() {
- ifCut = true;
+ cut = true;
+ outputRowCountCut = getOutputRowCount();
+ }
+
+ public long getOutputRowCountAfterCut() {
+ return getOutputRowCount() - outputRowCountCut;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d103e639/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
deleted file mode 100644
index 681ae62..0000000
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.inmemcubing;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.kylin.gridtable.GTRecord;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConsumeBlockingQueueControllerTest {
- private static final Logger logger = LoggerFactory.getLogger(ConsumeBlockingQueueControllerTest.class);
-
- @Test
- public void testIterator() {
- final int nRecord = 4345;
- final int nCut = 2000;
- final int nBatch = 60;
-
- final BlockingQueue<String> input = new LinkedBlockingQueue<>();
- Thread producer = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- for (int i = 1; i <= nRecord; i++) {
- input.put("test");
- if (i % nCut == 0) {
- input.put(InputConverterUnitTest.CUT_ROW);
- }
- }
- input.put(InputConverterUnitTest.END_ROW);
- } catch (InterruptedException e) {
- logger.warn("Fail to produce records into BlockingQueue due to: " + e);
- }
- }
- });
-
- final AtomicInteger nRecordConsumed = new AtomicInteger(0);
- Thread consumer = new Thread(new Runnable() {
- @Override
- public void run() {
- int nSplit = 0;
- while (true) {
- RecordConsumeBlockingQueueController blockingQueueController = RecordConsumeBlockingQueueController
- .getQueueController(new InputConverterUnitTest(), input, nBatch);
- while (blockingQueueController.hasNext()) {
- blockingQueueController.next();
- nRecordConsumed.incrementAndGet();
- }
- System.out.println(nRecordConsumed.get() + " records consumed when finished split " + nSplit);
- nSplit++;
-
- if (blockingQueueController.ifEnd()) {
- break;
- }
- }
- }
- });
-
- producer.start();
- consumer.start();
-
- try {
- producer.join();
- consumer.join();
- } catch (InterruptedException e) {
- logger.warn("Fail to join threads: " + e);
- }
-
- Assert.assertEquals(nRecord, nRecordConsumed.get());
- }
-
- private static class InputConverterUnitTest implements InputConverterUnit<String> {
- public static final String END_ROW = new String();
- public static final String CUT_ROW = "0";
-
- @Override
- public void convert(String currentObject, GTRecord record) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean ifEnd(String currentObject) {
- return currentObject == END_ROW;
- }
-
- @Override
- public boolean ifCut(String currentObject) {
- return currentObject == CUT_ROW;
- }
-
- @Override
- public String getEndRow() {
- return END_ROW;
- }
-
- @Override
- public String getCutRow() {
- return CUT_ROW;
- }
-
- @Override
- public boolean ifChange() {
- throw new UnsupportedOperationException();
- }
- }
-}
[2/2] kylin git commit: APACHE-KYLIN-2732: better way to fix bug of
missing records
Posted by nj...@apache.org.
APACHE-KYLIN-2732: better way to fix bug of missing records
Signed-off-by: Zhong <nj...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5a6e6d7e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5a6e6d7e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5a6e6d7e
Branch: refs/heads/master
Commit: 5a6e6d7e38342691a057345a9d51c4375df8dc8f
Parents: d103e63
Author: Wang Ken <mi...@ebay.com>
Authored: Mon Dec 4 13:13:18 2017 +0800
Committer: Zhong <nj...@apache.org>
Committed: Tue Dec 5 00:38:22 2017 +0800
----------------------------------------------------------------------
.../ConsumeBlockingQueueController.java | 2 +-
.../cube/inmemcubing/DoggedCubeBuilder.java | 19 ++-
.../RecordConsumeBlockingQueueController.java | 43 +++----
.../ConsumeBlockingQueueControllerTest.java | 124 +++++++++++++++++++
.../inmemcubing/ITDoggedCubeBuilderTest.java | 5 +-
.../inmemcubing/ITInMemCubeBuilderTest.java | 13 ++
6 files changed, 164 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
index a9e55f7..81f44a0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
@@ -78,7 +78,7 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> {
hasException = true;
}
- public long getOutputRowCount() {
+ public int getOutputRowCount() {
return outputRowCount.get();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index ccd7137..d761505 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -50,7 +50,6 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder.class);
- private int splitRowThreshold = Integer.MAX_VALUE;
private int unitRows = ConsumeBlockingQueueController.DEFAULT_BATCH_SIZE;
public DoggedCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc,
@@ -62,11 +61,6 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
unitRows /= 10;
}
- public void setSplitRowThreshold(int rowThreshold) {
- this.splitRowThreshold = rowThreshold;
- this.unitRows = Math.min(unitRows, rowThreshold);
- }
-
@Override
public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
throws IOException {
@@ -80,6 +74,9 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
throws IOException {
+ final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController
+ .getQueueController(inputConverterUnit, input, unitRows);
+
final List<SplitThread> splits = new ArrayList<SplitThread>();
final Merger merger = new Merger();
@@ -88,8 +85,11 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
try {
while (true) {
- SplitThread last = new SplitThread(splits.size() + 1, RecordConsumeBlockingQueueController
- .getQueueController(inputConverterUnit, input, unitRows));
+ if (inputController.ifEnd()) {
+ break;
+ }
+
+ SplitThread last = new SplitThread(splits.size() + 1, inputController);
splits.add(last);
last.start();
@@ -99,9 +99,6 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
last.join();
checkException(splits);
- if (last.inputController.ifEnd()) {
- break;
- }
}
logger.info("Dogged Cube Build splits complete, took " + (System.currentTimeMillis() - start) + " ms");
http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
index 49cbe1f..5fc3e32 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
@@ -31,24 +31,34 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
private T currentObject = null;
private volatile boolean ifEnd = false;
- private volatile boolean cut = false;
- private long outputRowCountCut = 0L;
@Override
- public boolean hasNext() {
+ public boolean hasNext() { // should be idempotent
+ if (ifEnd) {
+ return false;
+ }
if (currentObject != null) {
- return hasNext(currentObject);
+ return true;
}
if (!super.hasNext()) {
return false;
}
currentObject = super.next();
- return hasNext(currentObject);
+
+ if (inputConverterUnit.ifEnd(currentObject)) {
+ ifEnd = true;
+ return false;
+ } else if (inputConverterUnit.ifCut(currentObject)) {
+ currentObject = null;
+ hasNext();
+ return false;
+ }
+ return true;
}
@Override
public T next() {
- if (ifEnd())
+ if (ifEnd() || currentObject == null)
throw new IllegalStateException();
T result = currentObject;
@@ -59,18 +69,6 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
public boolean ifEnd() {
return ifEnd;
}
-
- private boolean hasNext(T object) {
- if (inputConverterUnit.ifEnd(object)) {
- ifEnd = true;
- return false;
- }else if(cut){
- return false;
- }else if(inputConverterUnit.ifCut(object)){
- return false;
- }
- return true;
- }
public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input){
return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, DEFAULT_BATCH_SIZE);
@@ -79,13 +77,4 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input, int batchSize){
return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, batchSize);
}
-
- public void forceCutPipe() {
- cut = true;
- outputRowCountCut = getOutputRowCount();
- }
-
- public long getOutputRowCountAfterCut() {
- return getOutputRowCount() - outputRowCountCut;
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
new file mode 100644
index 0000000..a443596
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.gridtable.GTRecord;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumeBlockingQueueControllerTest {
+ private static final Logger logger = LoggerFactory.getLogger(ConsumeBlockingQueueControllerTest.class);
+
+ @Test
+ public void testIterator() {
+ final int nRecord = 4345;
+ final int nCut = 2000;
+ final int nBatch = 60;
+
+ final BlockingQueue<String> input = new LinkedBlockingQueue<>();
+ final RecordConsumeBlockingQueueController inputController = RecordConsumeBlockingQueueController
+ .getQueueController(new InputConverterUnitTest(), input, nBatch);
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (int i = 1; i <= nRecord; i++) {
+ input.put("test");
+ if (i % nCut == 0) {
+ input.put(InputConverterUnitTest.CUT_ROW);
+ }
+ }
+ input.put(InputConverterUnitTest.END_ROW);
+ } catch (InterruptedException e) {
+ logger.warn("Fail to produce records into BlockingQueue due to: " + e);
+ }
+ }
+ }).start();
+
+ final AtomicInteger nRecordConsumed = new AtomicInteger(0);
+ final AtomicInteger nSplit = new AtomicInteger(0);
+ while (true) {
+ // producer done & consume the end row flag
+ if (inputController.ifEnd()) {
+ break;
+ }
+ nSplit.incrementAndGet();
+ Thread consumer = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (inputController.hasNext()) {
+ inputController.next();
+ nRecordConsumed.incrementAndGet();
+ }
+ System.out.println(nRecordConsumed.get() + " records consumed when finished split " + nSplit.get());
+ }
+ });
+ consumer.start();
+ try {
+ consumer.join();
+ } catch (InterruptedException e) {
+ logger.warn("Fail to join consumer thread: " + e);
+ }
+ }
+
+ Assert.assertEquals(nRecord, nRecordConsumed.get());
+ }
+
+ private static class InputConverterUnitTest implements InputConverterUnit<String> {
+ public static final String END_ROW = new String();
+ public static final String CUT_ROW = "0";
+
+ @Override
+ public void convert(String currentObject, GTRecord record) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean ifEnd(String currentObject) {
+ return currentObject == END_ROW;
+ }
+
+ @Override
+ public boolean ifCut(String currentObject) {
+ return currentObject == CUT_ROW;
+ }
+
+ @Override
+ public String getEndRow() {
+ return END_ROW;
+ }
+
+ @Override
+ public String getCutRow() {
+ return CUT_ROW;
+ }
+
+ @Override
+ public boolean ifChange() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
index 0338da8..865cdbb 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -89,12 +89,11 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
doggedBuilder.setConcurrentThreads(THREADS);
- doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
FileRecordWriter doggedResult = new FileRecordWriter();
{
Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
- ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+ ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed, SPLIT_ROWS);
future.get();
doggedResult.close();
}
@@ -110,7 +109,7 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
inmemResult.close();
}
- fileCompare(doggedResult.file, inmemResult.file);
+ fileCompare(inmemResult.file, doggedResult.file);
doggedResult.file.delete();
inmemResult.file.delete();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index 3f97f80..f1b65b0 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -151,6 +151,11 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<String[]> queue, int count,
long randSeed) throws IOException, InterruptedException {
+ feedData(cube, flatTable, queue, count, randSeed, Integer.MAX_VALUE);
+ }
+
+ static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<String[]> queue, int count,
+ long randSeed, int splitRowThreshold) throws IOException, InterruptedException {
IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
int nColumns = flatDesc.getAllColumns().size();
@@ -178,6 +183,7 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
rand.setSeed(randSeed);
// output with random data
+ int countOfLastSplit = 0;
for (; count > 0; count--) {
String[] row = new String[nColumns];
for (int i = 0; i < nColumns; i++) {
@@ -185,6 +191,13 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
row[i] = candidates[rand.nextInt(candidates.length)];
}
queue.put(row);
+
+ // put cut row if possible
+ countOfLastSplit++;
+ if (countOfLastSplit >= splitRowThreshold) {
+ queue.put(InputConverterUnitForRawData.CUT_ROW);
+ countOfLastSplit = 0;
+ }
}
queue.put(InputConverterUnitForRawData.END_ROW);
}