You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2017/12/02 17:24:12 UTC
[19/19] kylin git commit: APACHE-KYLIN-2732: fix bug of missing
records
APACHE-KYLIN-2732: fix bug of missing records
Signed-off-by: Zhong <nj...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/84779827
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/84779827
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/84779827
Branch: refs/heads/master
Commit: 84779827ad56673848c0e2f6b589a406dac1bce2
Parents: 8aef23a
Author: Wang Ken <mi...@ebay.com>
Authored: Sat Dec 2 14:40:03 2017 +0800
Committer: Zhong <nj...@apache.org>
Committed: Sun Dec 3 00:55:36 2017 +0800
----------------------------------------------------------------------
.../ConsumeBlockingQueueController.java | 6 +-
.../RecordConsumeBlockingQueueController.java | 50 ++++----
.../ConsumeBlockingQueueControllerTest.java | 127 +++++++++++++++++++
3 files changed, 156 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/84779827/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
index a9e55f7..8875618 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
@@ -48,7 +48,7 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> {
if (hasException) {
return false;
}
- if (internalIT.hasNext()) {
+ if (hasNextInBuffer()) {
return true;
} else {
batchBuffer.clear();
@@ -74,6 +74,10 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> {
throw new UnsupportedOperationException();
}
+ protected boolean hasNextInBuffer() {
+ return internalIT.hasNext();
+ }
+
public void findException() {
hasException = true;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/84779827/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
index 49cbe1f..8c51fc6 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
@@ -18,6 +18,7 @@
package org.apache.kylin.cube.inmemcubing;
+import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueueController<T> {
@@ -31,25 +32,39 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
private T currentObject = null;
private volatile boolean ifEnd = false;
- private volatile boolean cut = false;
- private long outputRowCountCut = 0L;
+ private volatile boolean ifCut = false;
@Override
public boolean hasNext() {
if (currentObject != null) {
- return hasNext(currentObject);
+ return true;
}
- if (!super.hasNext()) {
- return false;
+ if (ifCut) {
+ if (!super.hasNextInBuffer()) {
+ return false;
+ }
+ } else {
+ if (!super.hasNext()) {
+ return false;
+ }
}
+
currentObject = super.next();
- return hasNext(currentObject);
+ if (inputConverterUnit.ifEnd(currentObject)) {
+ ifEnd = true;
+ return false;
+ } else if (inputConverterUnit.ifCut(currentObject)) {
+ ifCut = true;
+ currentObject = null;
+ return hasNext();
+ }
+ return true;
}
@Override
public T next() {
- if (ifEnd())
- throw new IllegalStateException();
+ if (ifEnd() || currentObject == null)
+ throw new NoSuchElementException();
T result = currentObject;
currentObject = null;
@@ -59,18 +74,6 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
public boolean ifEnd() {
return ifEnd;
}
-
- private boolean hasNext(T object) {
- if (inputConverterUnit.ifEnd(object)) {
- ifEnd = true;
- return false;
- }else if(cut){
- return false;
- }else if(inputConverterUnit.ifCut(object)){
- return false;
- }
- return true;
- }
public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input){
return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, DEFAULT_BATCH_SIZE);
@@ -81,11 +84,6 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
}
public void forceCutPipe() {
- cut = true;
- outputRowCountCut = getOutputRowCount();
- }
-
- public long getOutputRowCountAfterCut() {
- return getOutputRowCount() - outputRowCountCut;
+ ifCut = true;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/84779827/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
new file mode 100644
index 0000000..681ae62
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.gridtable.GTRecord;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumeBlockingQueueControllerTest {
+ private static final Logger logger = LoggerFactory.getLogger(ConsumeBlockingQueueControllerTest.class);
+
+ @Test
+ public void testIterator() {
+ final int nRecord = 4345;
+ final int nCut = 2000;
+ final int nBatch = 60;
+
+ final BlockingQueue<String> input = new LinkedBlockingQueue<>();
+ Thread producer = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (int i = 1; i <= nRecord; i++) {
+ input.put("test");
+ if (i % nCut == 0) {
+ input.put(InputConverterUnitTest.CUT_ROW);
+ }
+ }
+ input.put(InputConverterUnitTest.END_ROW);
+ } catch (InterruptedException e) {
+ logger.warn("Fail to produce records into BlockingQueue due to: " + e);
+ }
+ }
+ });
+
+ final AtomicInteger nRecordConsumed = new AtomicInteger(0);
+ Thread consumer = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ int nSplit = 0;
+ while (true) {
+ RecordConsumeBlockingQueueController blockingQueueController = RecordConsumeBlockingQueueController
+ .getQueueController(new InputConverterUnitTest(), input, nBatch);
+ while (blockingQueueController.hasNext()) {
+ blockingQueueController.next();
+ nRecordConsumed.incrementAndGet();
+ }
+ System.out.println(nRecordConsumed.get() + " records consumed when finished split " + nSplit);
+ nSplit++;
+
+ if (blockingQueueController.ifEnd()) {
+ break;
+ }
+ }
+ }
+ });
+
+ producer.start();
+ consumer.start();
+
+ try {
+ producer.join();
+ consumer.join();
+ } catch (InterruptedException e) {
+ logger.warn("Fail to join threads: " + e);
+ }
+
+ Assert.assertEquals(nRecord, nRecordConsumed.get());
+ }
+
+ private static class InputConverterUnitTest implements InputConverterUnit<String> {
+ public static final String END_ROW = new String();
+ public static final String CUT_ROW = "0";
+
+ @Override
+ public void convert(String currentObject, GTRecord record) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean ifEnd(String currentObject) {
+ return currentObject == END_ROW;
+ }
+
+ @Override
+ public boolean ifCut(String currentObject) {
+ return currentObject == CUT_ROW;
+ }
+
+ @Override
+ public String getEndRow() {
+ return END_ROW;
+ }
+
+ @Override
+ public String getCutRow() {
+ return CUT_ROW;
+ }
+
+ @Override
+ public boolean ifChange() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}