You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:05:07 UTC

[30/50] incubator-kylin git commit: KYLIN-653 use a FIFOIterable to solve ConcurrentModificationException

KYLIN-653 use a FIFOIterable to solve ConcurrentModificationException


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d09e00d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d09e00d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d09e00d6

Branch: refs/heads/streaming-localdict
Commit: d09e00d6c60e6d0e88c2512041c76456dd5fb64d
Parents: 4df0531
Author: honma <ho...@ebay.com>
Authored: Fri Mar 27 11:23:11 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Mar 27 15:16:20 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/util/FIFOIterable.java  | 20 ++++++++++++
 .../apache/kylin/common/util/FIFOIterator.java  | 34 ++++++++++++++++++++
 .../org/apache/kylin/common/util/BasicTest.java | 12 +++++--
 .../model/IIKeyValueCodecWithState.java         |  6 ++--
 .../IIKeyValueCodecWithStateTest.java           | 16 ++++++---
 5 files changed, 80 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d09e00d6/common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java b/common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
new file mode 100644
index 0000000..c0f7d68
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
@@ -0,0 +1,20 @@
+package org.apache.kylin.common.util;
+
+import java.util.Iterator;
+import java.util.Queue;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/27/15.
+ */
+public class FIFOIterable<T> implements Iterable<T> {
+    private Queue<T> q;
+
+    public FIFOIterable(Queue<T> q) {
+        this.q = q;
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        return new FIFOIterator<T>(q);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d09e00d6/common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java b/common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
new file mode 100644
index 0000000..6751cb0
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
@@ -0,0 +1,34 @@
+package org.apache.kylin.common.util;
+
+import java.util.Iterator;
+import java.util.Queue;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/27/15.
+ *
+ * Normal iterators in Collections are fail-safe,
+ * i.e. adding elements to a queue will break current iterator.
+ * The FIFOIterator is stateless, it only check the first element of a Queue
+ */
+public class FIFOIterator<T> implements Iterator<T> {
+    private Queue<T> q;
+
+    public FIFOIterator(Queue<T> q) {
+        this.q = q;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !q.isEmpty();
+    }
+
+    @Override
+    public T next() {
+        return q.poll();
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d09e00d6/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index a480ebd..0b92bf9 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -22,9 +22,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.Calendar;
+import java.util.*;
 import java.util.concurrent.*;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.configuration.ConfigurationException;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -75,7 +76,7 @@ public class BasicTest {
 
         a.setTimeInMillis(current);
         b.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), a.get(Calendar.MINUTE));
-        c.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY),0);
+        c.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), 0);
 
         System.out.println(time(b.getTimeInMillis()));
         System.out.println(time(c.getTimeInMillis()));
@@ -85,6 +86,13 @@ public class BasicTest {
     @Test
     @Ignore("fix it later")
     public void test2() throws IOException, ConfigurationException {
+        Queue<String> a = new LinkedList<>();
+        Iterator<String> i = new FIFOIterator<String>(a);
+        System.out.println(i.hasNext());
+        a.add("1");
+        System.out.println(i.hasNext());
+        System.out.println(i.next());
+
     }
 
     private static String time(long t) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d09e00d6/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
index e838283..82f1020 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
@@ -2,8 +2,10 @@ package org.apache.kylin.invertedindex.model;
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedList;
 
 import com.google.common.base.Preconditions;
+import org.apache.kylin.common.util.FIFOIterator;
 import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
 
@@ -25,12 +27,12 @@ public class IIKeyValueCodecWithState extends IIKeyValueCodec {
 
     protected static class IIRowDecoderWithState extends IIRowDecoder {
 
-        final ArrayList<IIRow> buffer = Lists.newArrayList();
+        final LinkedList<IIRow> buffer = Lists.newLinkedList();
         private Iterator<Slice> superIterator = null;
 
         private IIRowDecoderWithState(TableRecordInfoDigest digest, Iterator<IIRow> iiRowIterator) {
             super(digest, iiRowIterator);
-            this.feedingIterator = buffer.iterator();
+            this.feedingIterator = new FIFOIterator<>(buffer);
         }
 
         private Iterator<Slice> getSuperIterator() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d09e00d6/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
index 25e250c..416d31a 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
@@ -8,6 +8,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.annotation.Nullable;
 
+import org.apache.kylin.common.util.FIFOIterable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
@@ -19,6 +20,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
 import org.apache.kylin.invertedindex.model.IIRow;
 import org.apache.kylin.invertedindex.model.KeyValueCodec;
 import org.apache.kylin.streaming.Stream;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,7 +40,7 @@ public class IIKeyValueCodecWithStateTest extends LocalFileMetadataTestCase {
 
     final String[] inputs = new String[] { //
     "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
-            "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,2012-08-16,43479,10000807,26.2474,0", //
+            "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,0,2012-08-16,43479,10000807,26.2474,0", //
             "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
 
     @Before
@@ -64,14 +66,19 @@ public class IIKeyValueCodecWithStateTest extends LocalFileMetadataTestCase {
         future.get();
     }
 
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
     @Test
     public void basicTest() {
-        ArrayList<IIRow> buffer = Lists.newArrayList();
+        Queue<IIRow> buffer = Lists.newLinkedList();
+        FIFOIterable bufferIterable = new FIFOIterable(buffer);
         TableRecordInfo info = new TableRecordInfo(iiDesc);
         TableRecordInfoDigest digest = info.getDigest();
-        int columnCount = digest.getColumnCount();
         KeyValueCodec codec = new IIKeyValueCodecWithState(digest);
-        Iterator<Slice> slices = codec.decodeKeyValue(buffer).iterator();
+        Iterator<Slice> slices = codec.decodeKeyValue(bufferIterable).iterator();
 
         Assert.assertTrue(!slices.hasNext());
         Assert.assertEquals(iiRowList.size(), digest.getColumnCount());
@@ -87,5 +94,6 @@ public class IIKeyValueCodecWithStateTest extends LocalFileMetadataTestCase {
         }
 
         Slice newSlice = slices.next();
+        Assert.assertEquals(newSlice.getLocalDictionaries().get(0).getSize(), 2);
     }
 }