You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:05:07 UTC
[30/50] incubator-kylin git commit: KYLIN-653 use a FIFOIterable to
solve ConcurrentModificationException
KYLIN-653 use a FIFOIterable to solve ConcurrentModificationException
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d09e00d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d09e00d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d09e00d6
Branch: refs/heads/streaming-localdict
Commit: d09e00d6c60e6d0e88c2512041c76456dd5fb64d
Parents: 4df0531
Author: honma <ho...@ebay.com>
Authored: Fri Mar 27 11:23:11 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Mar 27 15:16:20 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/util/FIFOIterable.java | 20 ++++++++++++
.../apache/kylin/common/util/FIFOIterator.java | 34 ++++++++++++++++++++
.../org/apache/kylin/common/util/BasicTest.java | 12 +++++--
.../model/IIKeyValueCodecWithState.java | 6 ++--
.../IIKeyValueCodecWithStateTest.java | 16 ++++++---
5 files changed, 80 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d09e00d6/common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java b/common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
new file mode 100644
index 0000000..c0f7d68
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
@@ -0,0 +1,20 @@
+package org.apache.kylin.common.util;
+
+import java.util.Iterator;
+import java.util.Queue;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/27/15.
+ */
+public class FIFOIterable<T> implements Iterable<T> {
+ private Queue<T> q;
+
+ public FIFOIterable(Queue<T> q) {
+ this.q = q;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new FIFOIterator<T>(q);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d09e00d6/common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java b/common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
new file mode 100644
index 0000000..6751cb0
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
@@ -0,0 +1,34 @@
+package org.apache.kylin.common.util;
+
+import java.util.Iterator;
+import java.util.Queue;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/27/15.
+ *
+ * Normal iterators in Collections are fail-safe,
+ * i.e. adding elements to a queue will break current iterator.
+ * The FIFOIterator is stateless, it only check the first element of a Queue
+ */
+public class FIFOIterator<T> implements Iterator<T> {
+ private Queue<T> q;
+
+ public FIFOIterator(Queue<T> q) {
+ this.q = q;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !q.isEmpty();
+ }
+
+ @Override
+ public T next() {
+ return q.poll();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d09e00d6/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index a480ebd..0b92bf9 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -22,9 +22,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.Calendar;
+import java.util.*;
import java.util.concurrent.*;
+import com.google.common.collect.Lists;
import org.apache.commons.configuration.ConfigurationException;
import org.junit.Ignore;
import org.junit.Test;
@@ -75,7 +76,7 @@ public class BasicTest {
a.setTimeInMillis(current);
b.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), a.get(Calendar.MINUTE));
- c.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY),0);
+ c.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), 0);
System.out.println(time(b.getTimeInMillis()));
System.out.println(time(c.getTimeInMillis()));
@@ -85,6 +86,13 @@ public class BasicTest {
@Test
@Ignore("fix it later")
public void test2() throws IOException, ConfigurationException {
+ Queue<String> a = new LinkedList<>();
+ Iterator<String> i = new FIFOIterator<String>(a);
+ System.out.println(i.hasNext());
+ a.add("1");
+ System.out.println(i.hasNext());
+ System.out.println(i.next());
+
}
private static String time(long t) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d09e00d6/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
index e838283..82f1020 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
@@ -2,8 +2,10 @@ package org.apache.kylin.invertedindex.model;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import com.google.common.base.Preconditions;
+import org.apache.kylin.common.util.FIFOIterator;
import org.apache.kylin.invertedindex.index.Slice;
import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
@@ -25,12 +27,12 @@ public class IIKeyValueCodecWithState extends IIKeyValueCodec {
protected static class IIRowDecoderWithState extends IIRowDecoder {
- final ArrayList<IIRow> buffer = Lists.newArrayList();
+ final LinkedList<IIRow> buffer = Lists.newLinkedList();
private Iterator<Slice> superIterator = null;
private IIRowDecoderWithState(TableRecordInfoDigest digest, Iterator<IIRow> iiRowIterator) {
super(digest, iiRowIterator);
- this.feedingIterator = buffer.iterator();
+ this.feedingIterator = new FIFOIterator<>(buffer);
}
private Iterator<Slice> getSuperIterator() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d09e00d6/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
index 25e250c..416d31a 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
@@ -8,6 +8,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
+import org.apache.kylin.common.util.FIFOIterable;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
@@ -19,6 +20,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
import org.apache.kylin.invertedindex.model.IIRow;
import org.apache.kylin.invertedindex.model.KeyValueCodec;
import org.apache.kylin.streaming.Stream;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -38,7 +40,7 @@ public class IIKeyValueCodecWithStateTest extends LocalFileMetadataTestCase {
final String[] inputs = new String[] { //
"FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
- "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,2012-08-16,43479,10000807,26.2474,0", //
+ "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,0,2012-08-16,43479,10000807,26.2474,0", //
"ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
@Before
@@ -64,14 +66,19 @@ public class IIKeyValueCodecWithStateTest extends LocalFileMetadataTestCase {
future.get();
}
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
@Test
public void basicTest() {
- ArrayList<IIRow> buffer = Lists.newArrayList();
+ Queue<IIRow> buffer = Lists.newLinkedList();
+ FIFOIterable bufferIterable = new FIFOIterable(buffer);
TableRecordInfo info = new TableRecordInfo(iiDesc);
TableRecordInfoDigest digest = info.getDigest();
- int columnCount = digest.getColumnCount();
KeyValueCodec codec = new IIKeyValueCodecWithState(digest);
- Iterator<Slice> slices = codec.decodeKeyValue(buffer).iterator();
+ Iterator<Slice> slices = codec.decodeKeyValue(bufferIterable).iterator();
Assert.assertTrue(!slices.hasNext());
Assert.assertEquals(iiRowList.size(), digest.getColumnCount());
@@ -87,5 +94,6 @@ public class IIKeyValueCodecWithStateTest extends LocalFileMetadataTestCase {
}
Slice newSlice = slices.next();
+ Assert.assertEquals(newSlice.getLocalDictionaries().get(0).getSize(), 2);
}
}