You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2017/12/02 17:24:12 UTC

[19/19] kylin git commit: APACHE-KYLIN-2732: fix bug of missing records

APACHE-KYLIN-2732: fix bug of missing records

Signed-off-by: Zhong <nj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/84779827
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/84779827
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/84779827

Branch: refs/heads/master
Commit: 84779827ad56673848c0e2f6b589a406dac1bce2
Parents: 8aef23a
Author: Wang Ken <mi...@ebay.com>
Authored: Sat Dec 2 14:40:03 2017 +0800
Committer: Zhong <nj...@apache.org>
Committed: Sun Dec 3 00:55:36 2017 +0800

----------------------------------------------------------------------
 .../ConsumeBlockingQueueController.java         |   6 +-
 .../RecordConsumeBlockingQueueController.java   |  50 ++++----
 .../ConsumeBlockingQueueControllerTest.java     | 127 +++++++++++++++++++
 3 files changed, 156 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/84779827/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
index a9e55f7..8875618 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
@@ -48,7 +48,7 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> {
         if (hasException) {
             return false;
         }
-        if (internalIT.hasNext()) {
+        if (hasNextInBuffer()) {
             return true;
         } else {
             batchBuffer.clear();
@@ -74,6 +74,10 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> {
         throw new UnsupportedOperationException();
     }
 
+    protected boolean hasNextInBuffer() {
+        return internalIT.hasNext();
+    }
+
     public void findException() {
         hasException = true;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/84779827/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
index 49cbe1f..8c51fc6 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.cube.inmemcubing;
 
+import java.util.NoSuchElementException;
 import java.util.concurrent.BlockingQueue;
 
 public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueueController<T> {
@@ -31,25 +32,39 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
    
     private T currentObject = null;
     private volatile boolean ifEnd = false;
-    private volatile boolean cut = false;
-    private long outputRowCountCut = 0L;
+    private volatile boolean ifCut = false;
 
     @Override
     public boolean hasNext() {
         if (currentObject != null) {
-            return hasNext(currentObject);
+            return true;
         }
-        if (!super.hasNext()) {
-            return false;
+        if (ifCut) {
+            if (!super.hasNextInBuffer()) {
+                return false;
+            }
+        } else {
+            if (!super.hasNext()) {
+                return false;
+            }
         }
+
         currentObject = super.next();
-        return hasNext(currentObject);
+        if (inputConverterUnit.ifEnd(currentObject)) {
+            ifEnd = true;
+            return false;
+        } else if (inputConverterUnit.ifCut(currentObject)) {
+            ifCut = true;
+            currentObject = null;
+            return hasNext();
+        }
+        return true;
     }
 
     @Override
     public T next() {
-        if (ifEnd())
-            throw new IllegalStateException();
+        if (ifEnd() || currentObject == null)
+            throw new NoSuchElementException();
 
         T result = currentObject;
         currentObject = null;
@@ -59,18 +74,6 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
     public boolean ifEnd() {
         return ifEnd;
     }
-
-    private boolean hasNext(T object) {
-        if (inputConverterUnit.ifEnd(object)) {
-            ifEnd = true;
-            return false;
-        }else if(cut){
-            return false;
-        }else if(inputConverterUnit.ifCut(object)){
-            return false;
-        }
-        return true;
-    }
     
     public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input){
         return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, DEFAULT_BATCH_SIZE);
@@ -81,11 +84,6 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu
     }
 
     public void forceCutPipe() {
-        cut = true;
-        outputRowCountCut = getOutputRowCount();
-    }
-
-    public long getOutputRowCountAfterCut() {
-        return getOutputRowCount() - outputRowCountCut;
+        ifCut = true;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/84779827/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
new file mode 100644
index 0000000..681ae62
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.gridtable.GTRecord;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumeBlockingQueueControllerTest {
+    private static final Logger logger = LoggerFactory.getLogger(ConsumeBlockingQueueControllerTest.class);
+
+    @Test
+    public void testIterator() {
+        final int nRecord = 4345;
+        final int nCut = 2000;
+        final int nBatch = 60;
+
+        final BlockingQueue<String> input = new LinkedBlockingQueue<>();
+        Thread producer = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    for (int i = 1; i <= nRecord; i++) {
+                        input.put("test");
+                        if (i % nCut == 0) {
+                            input.put(InputConverterUnitTest.CUT_ROW);
+                        }
+                    }
+                    input.put(InputConverterUnitTest.END_ROW);
+                } catch (InterruptedException e) {
+                    logger.warn("Fail to produce records into BlockingQueue due to: " + e);
+                }
+            }
+        });
+
+        final AtomicInteger nRecordConsumed = new AtomicInteger(0);
+        Thread consumer = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                int nSplit = 0;
+                while (true) {
+                    RecordConsumeBlockingQueueController blockingQueueController = RecordConsumeBlockingQueueController
+                            .getQueueController(new InputConverterUnitTest(), input, nBatch);
+                    while (blockingQueueController.hasNext()) {
+                        blockingQueueController.next();
+                        nRecordConsumed.incrementAndGet();
+                    }
+                    System.out.println(nRecordConsumed.get() + " records consumed when finished split " + nSplit);
+                    nSplit++;
+
+                    if (blockingQueueController.ifEnd()) {
+                        break;
+                    }
+                }
+            }
+        });
+
+        producer.start();
+        consumer.start();
+
+        try {
+            producer.join();
+            consumer.join();
+        } catch (InterruptedException e) {
+            logger.warn("Fail to join threads: " + e);
+        }
+
+        Assert.assertEquals(nRecord, nRecordConsumed.get());
+    }
+
+    private static class InputConverterUnitTest implements InputConverterUnit<String> {
+        public static final String END_ROW = new String();
+        public static final String CUT_ROW = "0";
+
+        @Override
+        public void convert(String currentObject, GTRecord record) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean ifEnd(String currentObject) {
+            return currentObject == END_ROW;
+        }
+
+        @Override
+        public boolean ifCut(String currentObject) {
+            return currentObject == CUT_ROW;
+        }
+
+        @Override
+        public String getEndRow() {
+            return END_ROW;
+        }
+
+        @Override
+        public String getCutRow() {
+            return CUT_ROW;
+        }
+
+        @Override
+        public boolean ifChange() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}