You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/08/15 19:55:02 UTC
[2/3] apex-malhar git commit: APEXMALHAR-2048 Added implementations
of SpillableList, SpillableMap, and SpillableArrayListMultimap
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
new file mode 100644
index 0000000..2a7947d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes lists.
+ */
+@InterfaceStability.Evolving
+public class SerdeListSlice<T> implements Serde<List<T>, Slice>
+{
+ @NotNull
+ private Serde<T, Slice> serde;
+
+ private SerdeListSlice()
+ {
+ // for Kryo
+ }
+
+ /**
+ * Creates a {@link SerdeListSlice}.
+ * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list.
+ */
+ public SerdeListSlice(@NotNull Serde<T, Slice> serde)
+ {
+ this.serde = Preconditions.checkNotNull(serde);
+ }
+
+ @Override
+ public Slice serialize(List<T> objects)
+ {
+ Slice[] slices = new Slice[objects.size()];
+
+ int size = 4;
+
+ for (int index = 0; index < objects.size(); index++) {
+ Slice slice = serde.serialize(objects.get(index));
+ slices[index] = slice;
+ size += slice.length;
+ }
+
+ byte[] bytes = new byte[size];
+ int offset = 0;
+
+ byte[] sizeBytes = GPOUtils.serializeInt(objects.size());
+ System.arraycopy(sizeBytes, 0, bytes, offset, 4);
+ offset += 4;
+
+ for (int index = 0; index < slices.length; index++) {
+ Slice slice = slices[index];
+ System.arraycopy(slice.buffer, slice.offset, bytes, offset, slice.length);
+ offset += slice.length;
+ }
+
+ return new Slice(bytes);
+ }
+
+ @Override
+ public List<T> deserialize(Slice slice, MutableInt offset)
+ {
+ MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue());
+
+ int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset);
+ List<T> list = Lists.newArrayListWithCapacity(numElements);
+ sliceOffset.subtract(slice.offset);
+
+ for (int index = 0; index < numElements; index++) {
+ T object = serde.deserialize(slice, sliceOffset);
+ list.add(object);
+ }
+
+ offset.setValue(sliceOffset.intValue());
+ return list;
+ }
+
+ @Override
+ public List<T> deserialize(Slice slice)
+ {
+ return deserialize(slice, new MutableInt(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
new file mode 100644
index 0000000..80ee597
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An implementation of {@link Serde} which serializes and deserializes {@link String}s.
+ */
+@InterfaceStability.Evolving
+public class SerdeStringSlice implements Serde<String, Slice>
+{
+ @Override
+ public Slice serialize(String object)
+ {
+ return new Slice(GPOUtils.serializeString(object));
+ }
+
+ @Override
+ public String deserialize(Slice object, MutableInt offset)
+ {
+ offset.add(object.offset);
+ String string = GPOUtils.deserializeString(object.buffer, offset);
+ offset.subtract(object.offset);
+ return string;
+ }
+
+ @Override
+ public String deserialize(Slice object)
+ {
+ return deserialize(object, new MutableInt(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
new file mode 100644
index 0000000..b6a61f4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A utility class which contains static methods for manipulating byte arrays and {@link Slice}s
+ */
+@InterfaceStability.Evolving
+public class SliceUtils
+{
+ private SliceUtils()
+ {
+ }
+
+ /**
+ * Concatenates two byte arrays.
+ * @param a The first byte array to concatenate.
+ * @param b The second byte array to concatenate.
+ * @return The concatenated byte arrays.
+ */
+ public static byte[] concatenate(byte[] a, byte[] b)
+ {
+ byte[] output = new byte[a.length + b.length];
+
+ System.arraycopy(a, 0, output, 0, a.length);
+ System.arraycopy(b, 0, output, a.length, b.length);
+ return output;
+ }
+
+ /**
+ * Concatenates two {@link Slice}s
+ * @param a The first {@link Slice} to concatenate.
+ * @param b The second {@link Slice} to concatenate.
+ * @return The concatenated {@link Slice}.
+ */
+ public static Slice concatenate(Slice a, Slice b)
+ {
+ int size = a.length + b.length;
+ byte[] bytes = new byte[size];
+
+ System.arraycopy(a.buffer, a.offset, bytes, 0, a.length);
+ System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
+
+ return new Slice(bytes);
+ }
+
+ /**
+ * Concatenates a byte array with the contents of a {@link Slice}.
+ * @param a The byte array to concatenate. The contents of the byte array appear first in the concatenation.
+ * @param b The {@link Slice} to concatenate a byte array with.
+ * @return A {@link Slice} whose contents are the concatenation of the input byte array and {@link Slice}.
+ */
+ public static Slice concatenate(byte[] a, Slice b)
+ {
+ int size = a.length + b.length;
+ byte[] bytes = new byte[size];
+
+ System.arraycopy(a, 0, bytes, 0, a.length);
+ System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
+
+ return new Slice(bytes);
+ }
+
+ /**
+ * Concatenates a byte array with the contents of a {@link Slice}.
+ * @param a The byte array to concatenate.
+ * @param b The {@link Slice} to concatenate a byte array with. The contents of the {@link Slice} appear first in the
+ * concatenation.
+ * @return A {@link Slice} whose contents are the concatenation of the input byte array and {@link Slice}.
+ */
+ public static Slice concatenate(Slice a, byte[] b)
+ {
+ int size = a.length + b.length;
+ byte[] bytes = new byte[size];
+
+ System.arraycopy(a.buffer, a.offset, bytes, 0, a.length);
+ System.arraycopy(b, 0, bytes, a.length, b.length);
+
+ return new Slice(bytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
index f3b2140..673054b 100644
--- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
+++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
@@ -26,15 +26,40 @@ import org.junit.runner.Description;
import org.apache.commons.io.FileUtils;
+import com.google.common.base.Preconditions;
+
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.OutputPort;
import com.datatorrent.api.Sink;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
+import com.datatorrent.netlet.util.Slice;
public class TestUtils
{
+ public static byte[] getByte(int val)
+ {
+ Preconditions.checkArgument(val <= Byte.MAX_VALUE);
+ return new byte[]{(byte)val};
+ }
+
+ public static byte[] getBytes(int val)
+ {
+ byte[] bytes = new byte[4];
+ bytes[0] = (byte)(val & 0xFF);
+ bytes[1] = (byte)((val >> 8) & 0xFF);
+ bytes[2] = (byte)((val >> 16) & 0xFF);
+ bytes[3] = (byte)((val >> 24) & 0xFF);
+
+ return bytes;
+ }
+
+ public static Slice getSlice(int val)
+ {
+ return new Slice(getBytes(val));
+ }
+
public static class TestInfo extends TestWatcher
{
public org.junit.runner.Description desc;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
index d3564ba..c57e0ca 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
@@ -44,7 +44,8 @@ import com.datatorrent.netlet.util.Slice;
public class ManagedStateTestUtils
{
- static void transferBucketHelper(FileAccess fileAccess, long bucketId, Map<Slice, Bucket.BucketedValue> unsavedBucket,
+ public static void transferBucketHelper(FileAccess fileAccess, long bucketId, Map<Slice, Bucket.BucketedValue>
+ unsavedBucket,
int keysPerTimeBucket) throws IOException
{
RemoteIterator<LocatedFileStatus> iterator = fileAccess.listFiles(bucketId);
@@ -82,7 +83,7 @@ public class ManagedStateTestUtils
Assert.assertEquals("data of bucket" + bucketId, testBucket, fromDisk);
}
- static Map<Long, Map<Slice, Bucket.BucketedValue>> getTestData(int startBucket, int endBucket, int keyStart)
+ public static Map<Long, Map<Slice, Bucket.BucketedValue>> getTestData(int startBucket, int endBucket, int keyStart)
{
Map<Long, Map<Slice, Bucket.BucketedValue>> data = Maps.newHashMap();
for (int i = startBucket; i < endBucket; i++) {
@@ -92,7 +93,7 @@ public class ManagedStateTestUtils
return data;
}
- static Map<Slice, Bucket.BucketedValue> getTestBucketData(int keyStart, long timeBucketStart)
+ public static Map<Slice, Bucket.BucketedValue> getTestBucketData(int keyStart, long timeBucketStart)
{
Map<Slice, Bucket.BucketedValue> bucketData = Maps.newHashMap();
for (int j = 0; j < 5; j++) {
@@ -103,14 +104,14 @@ public class ManagedStateTestUtils
return bucketData;
}
- static Context.OperatorContext getOperatorContext(int operatorId, String applicationPath)
+ public static Context.OperatorContext getOperatorContext(int operatorId, String applicationPath)
{
Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
attributes.put(DAG.APPLICATION_PATH, applicationPath);
return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes);
}
- static Context.OperatorContext getOperatorContext(int operatorId)
+ public static Context.OperatorContext getOperatorContext(int operatorId)
{
Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes);
@@ -118,7 +119,7 @@ public class ManagedStateTestUtils
private static final transient Logger LOG = LoggerFactory.getLogger(ManagedStateTestUtils.class);
- static Slice getSliceFor(String x)
+ public static Slice getSliceFor(String x)
{
return new Slice(x.getBytes());
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java
deleted file mode 100644
index d73bd95..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.managed.spillable.inmem;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.apex.malhar.lib.state.spillable.inmem.InMemMultiset;
-
-import com.esotericsoftware.kryo.Kryo;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-
-public class InMemMultisetTest
-{
- @Test
- public void serializationTest()
- {
- InMemMultiset<String> set = new InMemMultiset<>();
-
- set.add("a");
- set.add("a");
-
- InMemMultiset<String> cloned = KryoCloneUtils.cloneObject(new Kryo(), set);
-
- Assert.assertEquals(2, cloned.count("a"));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java
deleted file mode 100644
index 573982a..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.managed.spillable.inmem;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableArrayList;
-
-import com.esotericsoftware.kryo.Kryo;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-
-public class InMemSpillableArrayListTest
-{
- @Test
- public void serializationTest()
- {
- InMemSpillableArrayList<String> list = new InMemSpillableArrayList<>();
-
- list.add("a");
- list.add("a");
-
- InMemSpillableArrayList<String> cloned = KryoCloneUtils.cloneObject(new Kryo(), list);
-
- Assert.assertEquals(2, list.size());
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java
deleted file mode 100644
index a6bf811..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.managed.spillable.inmem;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableByteArrayListMultimap;
-
-import com.esotericsoftware.kryo.Kryo;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-
-public class InMemSpillableByteArrayListMultimapTest
-{
- @Test
- public void serializationTest()
- {
- InMemSpillableByteArrayListMultimap<String, String> multimap =
- new InMemSpillableByteArrayListMultimap<String, String>();
-
- multimap.put("a", "b");
- multimap.put("a", "c");
-
- InMemSpillableByteArrayListMultimap<String, String> cloned = KryoCloneUtils.cloneObject(new Kryo(), multimap);
-
- Assert.assertEquals(2, cloned.get("a").size());
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java
new file mode 100644
index 0000000..f40ae87
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.util.TestUtils;
+
+public class SequentialSpillableIdentifierGeneratorTest
+{
+ @Test
+ public void dontAllowRegistrationAfterNextCallTest()
+ {
+ SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator();
+
+ gen.next();
+
+ boolean exception = false;
+
+ try {
+ gen.register(TestUtils.getByte(1));
+ } catch (Exception e) {
+ exception = true;
+ }
+
+ Assert.assertTrue(exception);
+ }
+
+ @Test
+ public void simpleSequentialIdGenerationTest()
+ {
+ SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator();
+
+ for (int index = 0; index < (((int)Byte.MAX_VALUE) + 1); index++) {
+ byte[] id = gen.next();
+
+ checkId(index, id);
+ }
+
+ boolean threwException = false;
+
+ try {
+ gen.next();
+ } catch (Exception e) {
+ threwException = true;
+ }
+
+ Assert.assertTrue(threwException);
+ }
+
+ @Test
+ public void registerFirst()
+ {
+ SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator();
+ gen.register(TestUtils.getByte(0));
+
+ byte[] id = gen.next();
+
+ Assert.assertArrayEquals(TestUtils.getByte(1), id);
+ }
+
+ @Test
+ public void registerLast()
+ {
+ SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator();
+ gen.register(TestUtils.getByte(Byte.MAX_VALUE));
+
+ for (int index = 0; index <= (((int)Byte.MAX_VALUE) - 1); index++) {
+ byte[] id = gen.next();
+
+ checkId(index, id);
+ }
+
+ boolean threwException = false;
+
+ try {
+ gen.next();
+ } catch (Exception e) {
+ threwException = true;
+ }
+
+ Assert.assertTrue(threwException);
+ }
+
+ @Test
+ public void intermingledRegistered()
+ {
+ SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator();
+
+ gen.register(TestUtils.getByte(1));
+ gen.register(TestUtils.getByte(2));
+ gen.register(TestUtils.getByte(5));
+ gen.register(TestUtils.getByte(7));
+
+ checkId(0, gen.next());
+ checkId(3, gen.next());
+ checkId(4, gen.next());
+ checkId(6, gen.next());
+ checkId(8, gen.next());
+ checkId(9, gen.next());
+ }
+
+ private void checkId(int val, byte[] id)
+ {
+ Assert.assertEquals(1, id.length);
+ Assert.assertEquals(val, id[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
new file mode 100644
index 0000000..af05c88
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
@@ -0,0 +1,594 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class SpillableArrayListImplTest
+{
+ public static final byte[] ID1 = new byte[]{(byte)0};
+ public static final byte[] ID2 = new byte[]{(byte)1};
+
+ @Rule
+ public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+ @Test
+ public void simpleAddGetAndSetTest1()
+ {
+ InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+ simpleAddGetAndSetTest1Helper(store);
+ }
+
+ @Test
+ public void simpleAddGetAndSetManagedStateTest1()
+ {
+ simpleAddGetAndSetTest1Helper(testMeta.store);
+ }
+
+ public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
+ {
+ SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
+ new SerdeStringSlice(), 1);
+
+ store.setup(testMeta.operatorContext);
+ list.setup(testMeta.operatorContext);
+
+ long windowId = 0L;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ SpillableTestUtils.checkOutOfBounds(list, 0);
+ Assert.assertEquals(0, list.size());
+
+ list.add("a");
+
+ SpillableTestUtils.checkOutOfBounds(list, 1);
+ Assert.assertEquals(1, list.size());
+
+ Assert.assertEquals("a", list.get(0));
+
+ list.addAll(Lists.newArrayList("a", "b", "c"));
+
+ Assert.assertEquals(4, list.size());
+
+ Assert.assertEquals("a", list.get(0));
+ Assert.assertEquals("a", list.get(1));
+ Assert.assertEquals("b", list.get(2));
+ Assert.assertEquals("c", list.get(3));
+
+ SpillableTestUtils.checkOutOfBounds(list, 4);
+
+ list.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("a"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c"));
+
+ Assert.assertEquals(4, list.size());
+
+ Assert.assertEquals("a", list.get(0));
+ Assert.assertEquals("a", list.get(1));
+ Assert.assertEquals("b", list.get(2));
+ Assert.assertEquals("c", list.get(3));
+
+ list.add("tt");
+ list.add("ab");
+ list.add("99");
+ list.add("oo");
+
+ Assert.assertEquals("tt", list.get(4));
+ Assert.assertEquals("ab", list.get(5));
+ Assert.assertEquals("99", list.get(6));
+ Assert.assertEquals("oo", list.get(7));
+
+ list.set(1, "111");
+
+ Assert.assertEquals("a", list.get(0));
+ Assert.assertEquals("111", list.get(1));
+ Assert.assertEquals("b", list.get(2));
+ Assert.assertEquals("c", list.get(3));
+ Assert.assertEquals("tt", list.get(4));
+ Assert.assertEquals("ab", list.get(5));
+ Assert.assertEquals("99", list.get(6));
+ Assert.assertEquals("oo", list.get(7));
+
+ list.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("111"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 4, Lists.newArrayList("tt"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 5, Lists.newArrayList("ab"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 6, Lists.newArrayList("99"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 7, Lists.newArrayList("oo"));
+
+ list.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ list.teardown();
+ store.teardown();
+ }
+
+ @Test
+ public void simpleAddGetAndSetTest3()
+ {
+ InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+ simpleAddGetAndSetTest3Helper(store);
+ }
+
+ @Test
+ public void simpleAddGetAndSetManagedStateTest3()
+ {
+ simpleAddGetAndSetTest3Helper(testMeta.store);
+ }
+
+ private void simpleAddGetAndSetTest3Helper(SpillableStateStore store)
+ {
+ SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
+ new SerdeStringSlice(), 3);
+
+ store.setup(testMeta.operatorContext);
+ list.setup(testMeta.operatorContext);
+
+ long windowId = 0L;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ SpillableTestUtils.checkOutOfBounds(list, 0);
+ Assert.assertEquals(0, list.size());
+
+ list.add("a");
+
+ SpillableTestUtils.checkOutOfBounds(list, 1);
+ Assert.assertEquals(1, list.size());
+
+ Assert.assertEquals("a", list.get(0));
+
+ list.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
+
+ Assert.assertEquals(8, list.size());
+
+ Assert.assertEquals("a", list.get(0));
+ Assert.assertEquals("a", list.get(1));
+ Assert.assertEquals("b", list.get(2));
+ Assert.assertEquals("c", list.get(3));
+ Assert.assertEquals("d", list.get(4));
+ Assert.assertEquals("e", list.get(5));
+ Assert.assertEquals("f", list.get(6));
+ Assert.assertEquals("g", list.get(7));
+
+ SpillableTestUtils.checkOutOfBounds(list, 20);
+
+ list.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a", "a", "b"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("c", "d", "e"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("f", "g"));
+
+ Assert.assertEquals(8, list.size());
+
+ Assert.assertEquals("a", list.get(0));
+ Assert.assertEquals("a", list.get(1));
+ Assert.assertEquals("b", list.get(2));
+ Assert.assertEquals("c", list.get(3));
+ Assert.assertEquals("d", list.get(4));
+ Assert.assertEquals("e", list.get(5));
+ Assert.assertEquals("f", list.get(6));
+ Assert.assertEquals("g", list.get(7));
+
+ list.add("tt");
+ list.add("ab");
+ list.add("99");
+ list.add("oo");
+
+ Assert.assertEquals("tt", list.get(8));
+ Assert.assertEquals("ab", list.get(9));
+ Assert.assertEquals("99", list.get(10));
+ Assert.assertEquals("oo", list.get(11));
+
+ list.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a", "a", "b"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("c", "d", "e"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("f", "g", "tt"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("ab", "99", "oo"));
+
+ list.set(1, "111");
+ list.set(3, "222");
+ list.set(5, "333");
+ list.set(11, "444");
+
+ Assert.assertEquals("a", list.get(0));
+ Assert.assertEquals("111", list.get(1));
+ Assert.assertEquals("b", list.get(2));
+ Assert.assertEquals("222", list.get(3));
+ Assert.assertEquals("d", list.get(4));
+ Assert.assertEquals("333", list.get(5));
+ Assert.assertEquals("f", list.get(6));
+ Assert.assertEquals("g", list.get(7));
+ Assert.assertEquals("tt", list.get(8));
+ Assert.assertEquals("ab", list.get(9));
+ Assert.assertEquals("99", list.get(10));
+ Assert.assertEquals("444", list.get(11));
+
+ list.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a", "111", "b"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("222", "d", "333"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("f", "g", "tt"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("ab", "99", "444"));
+
+ list.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ list.teardown();
+ store.teardown();
+ }
+
+ @Test
+ public void simpleMultiListTest()
+ {
+ InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+ simpleMultiListTestHelper(store);
+ }
+
+ @Test
+ public void simpleMultiListManagedStateTest()
+ {
+ simpleMultiListTestHelper(testMeta.store);
+ }
+
+ public void simpleMultiListTestHelper(SpillableStateStore store)
+ {
+ SpillableArrayListImpl<String> list1 = new SpillableArrayListImpl<>(0L, ID1, store,
+ new SerdeStringSlice(), 1);
+
+ SpillableArrayListImpl<String> list2 = new SpillableArrayListImpl<>(0L, ID2, store,
+ new SerdeStringSlice(), 1);
+
+ store.setup(testMeta.operatorContext);
+ list1.setup(testMeta.operatorContext);
+ list2.setup(testMeta.operatorContext);
+
+ long windowId = 0L;
+ store.beginWindow(windowId);
+ list1.beginWindow(windowId);
+ list2.beginWindow(windowId);
+
+ SpillableTestUtils.checkOutOfBounds(list1, 0);
+ Assert.assertEquals(0, list1.size());
+
+ list1.add("a");
+
+ SpillableTestUtils.checkOutOfBounds(list2, 0);
+
+ list2.add("2a");
+
+ SpillableTestUtils.checkOutOfBounds(list1, 1);
+ SpillableTestUtils.checkOutOfBounds(list2, 1);
+
+ Assert.assertEquals(1, list1.size());
+ Assert.assertEquals(1, list2.size());
+
+ Assert.assertEquals("a", list1.get(0));
+ Assert.assertEquals("2a", list2.get(0));
+
+ list1.addAll(Lists.newArrayList("a", "b", "c"));
+ list2.addAll(Lists.newArrayList("2a", "2b"));
+
+ Assert.assertEquals(4, list1.size());
+ Assert.assertEquals(3, list2.size());
+
+ Assert.assertEquals("a", list1.get(0));
+ Assert.assertEquals("a", list1.get(1));
+ Assert.assertEquals("b", list1.get(2));
+ Assert.assertEquals("c", list1.get(3));
+
+ Assert.assertEquals("2a", list2.get(0));
+ Assert.assertEquals("2a", list2.get(1));
+ Assert.assertEquals("2b", list2.get(2));
+
+ SpillableTestUtils.checkOutOfBounds(list1, 4);
+ SpillableTestUtils.checkOutOfBounds(list2, 3);
+
+ list1.endWindow();
+ list2.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("a"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c"));
+
+ SpillableTestUtils.checkValue(store, 0L, ID2, 0, Lists.newArrayList("2a"));
+ SpillableTestUtils.checkValue(store, 0L, ID2, 1, Lists.newArrayList("2a"));
+ SpillableTestUtils.checkValue(store, 0L, ID2, 2, Lists.newArrayList("2b"));
+
+ windowId++;
+ store.beginWindow(windowId);
+ list1.beginWindow(windowId);
+ list2.beginWindow(windowId);
+
+ Assert.assertEquals(4, list1.size());
+ Assert.assertEquals(3, list2.size());
+
+ Assert.assertEquals("a", list1.get(0));
+ Assert.assertEquals("a", list1.get(1));
+ Assert.assertEquals("b", list1.get(2));
+ Assert.assertEquals("c", list1.get(3));
+
+ Assert.assertEquals("2a", list2.get(0));
+ Assert.assertEquals("2a", list2.get(1));
+ Assert.assertEquals("2b", list2.get(2));
+
+ list1.add("tt");
+ list1.add("ab");
+ list1.add("99");
+ list1.add("oo");
+
+ list2.add("2tt");
+ list2.add("2ab");
+
+ Assert.assertEquals("tt", list1.get(4));
+ Assert.assertEquals("ab", list1.get(5));
+ Assert.assertEquals("99", list1.get(6));
+ Assert.assertEquals("oo", list1.get(7));
+
+ Assert.assertEquals("2tt", list2.get(3));
+ Assert.assertEquals("2ab", list2.get(4));
+
+ list1.set(1, "111");
+ list2.set(1, "2111");
+
+ Assert.assertEquals("a", list1.get(0));
+ Assert.assertEquals("111", list1.get(1));
+ Assert.assertEquals("b", list1.get(2));
+ Assert.assertEquals("c", list1.get(3));
+ Assert.assertEquals("tt", list1.get(4));
+ Assert.assertEquals("ab", list1.get(5));
+ Assert.assertEquals("99", list1.get(6));
+ Assert.assertEquals("oo", list1.get(7));
+
+ Assert.assertEquals("2a", list2.get(0));
+ Assert.assertEquals("2111", list2.get(1));
+ Assert.assertEquals("2b", list2.get(2));
+ Assert.assertEquals("2tt", list2.get(3));
+ Assert.assertEquals("2ab", list2.get(4));
+
+ list1.endWindow();
+ list2.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ list1.beginWindow(windowId);
+ list2.beginWindow(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("111"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 4, Lists.newArrayList("tt"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 5, Lists.newArrayList("ab"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 6, Lists.newArrayList("99"));
+ SpillableTestUtils.checkValue(store, 0L, ID1, 7, Lists.newArrayList("oo"));
+
+ SpillableTestUtils.checkValue(store, 0L, ID2, 0, Lists.newArrayList("2a"));
+ SpillableTestUtils.checkValue(store, 0L, ID2, 1, Lists.newArrayList("2111"));
+ SpillableTestUtils.checkValue(store, 0L, ID2, 2, Lists.newArrayList("2b"));
+ SpillableTestUtils.checkValue(store, 0L, ID2, 3, Lists.newArrayList("2tt"));
+ SpillableTestUtils.checkValue(store, 0L, ID2, 4, Lists.newArrayList("2ab"));
+
+ list1.endWindow();
+ list2.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ list1.teardown();
+ list2.teardown();
+ store.teardown();
+ }
+
+ @Test
+ public void recoveryManagedStateTest()
+ {
+ SpillableStateStore store = testMeta.store;
+
+ SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
+ new SerdeStringSlice(), 3);
+
+ store.setup(testMeta.operatorContext);
+ list.setup(testMeta.operatorContext);
+
+ long windowId = 0L;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ SpillableTestUtils.checkOutOfBounds(list, 0);
+
+ list.add("a");
+ list.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
+
+ Assert.assertEquals(8, list.size());
+
+ list.endWindow();
+ store.endWindow();
+
+ windowId++;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ list.add("tt");
+ list.add("ab");
+ list.add("99");
+ list.add("oo");
+
+ list.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ list.set(1, "111");
+ list.set(3, "222");
+ list.set(5, "333");
+ list.set(11, "444");
+
+ list.endWindow();
+ store.endWindow();
+
+ windowId++;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ list.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ long activationWindow = windowId;
+ SpillableArrayListImpl<String> clonedList = KryoCloneUtils.cloneObject(list);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ list.set(1, "111111");
+ list.set(3, "222222");
+ list.add("xyz");
+
+ list.endWindow();
+ store.endWindow();
+
+ list.teardown();
+ store.teardown();
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+ attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow);
+ Context.OperatorContext context =
+ new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+
+ list = clonedList;
+ store = clonedList.getStore();
+
+ store.setup(context);
+ list.setup(context);
+
+ windowId = activationWindow + 1L;
+ store.beginWindow(windowId);
+ list.beginWindow(windowId);
+
+ Assert.assertEquals("a", list.get(0));
+ Assert.assertEquals("111", list.get(1));
+ Assert.assertEquals("b", list.get(2));
+ Assert.assertEquals("222", list.get(3));
+ Assert.assertEquals("d", list.get(4));
+ Assert.assertEquals("333", list.get(5));
+ Assert.assertEquals("f", list.get(6));
+ Assert.assertEquals("g", list.get(7));
+ Assert.assertEquals("tt", list.get(8));
+ Assert.assertEquals("ab", list.get(9));
+ Assert.assertEquals("99", list.get(10));
+ Assert.assertEquals("444", list.get(11));
+ Assert.assertEquals(12, list.size());
+
+ list.endWindow();
+ store.endWindow();
+
+ list.teardown();
+ store.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
new file mode 100644
index 0000000..42d7d20
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class SpillableByteArrayListMultimapImplTest
+{
+ public static final byte[] ID1 = new byte[]{(byte)0};
+
+ @Rule
+ public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+ @Test
+ public void simpleMultiKeyTest()
+ {
+ InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+ simpleMultiKeyTestHelper(store);
+ }
+
+ @Test
+ public void simpleMultiKeyManagedStateTest()
+ {
+ simpleMultiKeyTestHelper(testMeta.store);
+ }
+
+ public void simpleMultiKeyTestHelper(SpillableStateStore store)
+ {
+ SpillableByteArrayListMultimapImpl<String, String> map =
+ new SpillableByteArrayListMultimapImpl<String, String>(store, ID1, 0L, new SerdeStringSlice(),
+ new SerdeStringSlice());
+
+ store.setup(testMeta.operatorContext);
+ map.setup(testMeta.operatorContext);
+
+ long nextWindowId = 0L;
+ nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
+ nextWindowId++;
+
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(1, map.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ nextWindowId = simpleMultiKeyTestHelper(store, map, "b", nextWindowId);
+ nextWindowId++;
+
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(2, map.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ simpleMultiKeyTestHelper(store, map, "c", nextWindowId);
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(3, map.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ map.teardown();
+ store.teardown();
+ }
+
+ public long simpleMultiKeyTestHelper(SpillableStateStore store,
+ SpillableByteArrayListMultimapImpl<String, String> map, String key, long nextWindowId)
+ {
+ SerdeStringSlice serdeString = new SerdeStringSlice();
+ SerdeIntSlice serdeInt = new SerdeIntSlice();
+
+ Slice keySlice = serdeString.serialize(key);
+
+ byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertNull(map.get(key));
+
+ Assert.assertFalse(map.containsKey(key));
+
+ map.put(key, "a");
+
+ Assert.assertTrue(map.containsKey(key));
+
+ List<String> list1 = map.get(key);
+ Assert.assertEquals(1, list1.size());
+
+ Assert.assertEquals("a", list1.get(0));
+
+ list1.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
+
+ Assert.assertEquals(8, list1.size());
+
+ Assert.assertEquals("a", list1.get(0));
+ Assert.assertEquals("a", list1.get(1));
+ Assert.assertEquals("b", list1.get(2));
+ Assert.assertEquals("c", list1.get(3));
+ Assert.assertEquals("d", list1.get(4));
+ Assert.assertEquals("e", list1.get(5));
+ Assert.assertEquals("f", list1.get(6));
+ Assert.assertEquals("g", list1.get(7));
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ SpillableTestUtils.checkValue(store, 0L,
+ SliceUtils.concatenate(keyBytes, SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 8, 0, serdeInt);
+
+ SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e",
+ "f", "g"));
+
+ List<String> list2 = map.get(key);
+
+ Assert.assertEquals(8, list2.size());
+
+ Assert.assertEquals("a", list2.get(0));
+ Assert.assertEquals("a", list2.get(1));
+ Assert.assertEquals("b", list2.get(2));
+ Assert.assertEquals("c", list2.get(3));
+ Assert.assertEquals("d", list2.get(4));
+ Assert.assertEquals("e", list2.get(5));
+ Assert.assertEquals("f", list2.get(6));
+ Assert.assertEquals("g", list2.get(7));
+
+ list2.add("tt");
+ list2.add("ab");
+ list2.add("99");
+ list2.add("oo");
+
+ Assert.assertEquals("tt", list2.get(8));
+ Assert.assertEquals("ab", list2.get(9));
+ Assert.assertEquals("99", list2.get(10));
+ Assert.assertEquals("oo", list2.get(11));
+
+ Assert.assertEquals(12, list2.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(12, list2.size());
+
+ SpillableTestUtils.checkValue(store, 0L,
+ SliceUtils.concatenate(keyBytes, SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt);
+
+ SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e",
+ "f", "g", "tt", "ab", "99", "oo"));
+
+ List<String> list3 = map.get(key);
+
+ list3.set(1, "111");
+ list3.set(3, "222");
+ list3.set(5, "333");
+ list3.set(11, "444");
+
+ Assert.assertEquals("a", list3.get(0));
+ Assert.assertEquals("111", list3.get(1));
+ Assert.assertEquals("b", list3.get(2));
+ Assert.assertEquals("222", list3.get(3));
+ Assert.assertEquals("d", list3.get(4));
+ Assert.assertEquals("333", list3.get(5));
+ Assert.assertEquals("f", list3.get(6));
+ Assert.assertEquals("g", list3.get(7));
+ Assert.assertEquals("tt", list3.get(8));
+ Assert.assertEquals("ab", list3.get(9));
+ Assert.assertEquals("99", list3.get(10));
+ Assert.assertEquals("444", list3.get(11));
+
+ Assert.assertEquals(12, list2.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ SpillableTestUtils.checkValue(store, 0L,
+ SliceUtils.concatenate(keyBytes, SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt);
+
+ SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d", "333",
+ "f", "g", "tt", "ab", "99", "444"));
+
+ map.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(nextWindowId);
+
+ return nextWindowId;
+ }
+
+ @Test
+ public void recoveryTestWithManagedState()
+ {
+ SpillableStateStore store = testMeta.store;
+
+ SpillableByteArrayListMultimapImpl<String, String> map =
+ new SpillableByteArrayListMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+ store.setup(testMeta.operatorContext);
+ map.setup(testMeta.operatorContext);
+
+ long nextWindowId = 0L;
+ nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
+ long activationWindow = nextWindowId;
+ nextWindowId++;
+
+ SpillableByteArrayListMultimapImpl<String, String> clonedMap = KryoCloneUtils.cloneObject(map);
+ store.checkpointed(nextWindowId);
+ store.committed(nextWindowId);
+
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ List<String> list1 = map.get("a");
+
+ Assert.assertEquals(12, list1.size());
+
+ Assert.assertEquals("a", list1.get(0));
+ Assert.assertEquals("111", list1.get(1));
+ Assert.assertEquals("b", list1.get(2));
+ Assert.assertEquals("222", list1.get(3));
+ Assert.assertEquals("d", list1.get(4));
+ Assert.assertEquals("333", list1.get(5));
+ Assert.assertEquals("f", list1.get(6));
+ Assert.assertEquals("g", list1.get(7));
+ Assert.assertEquals("tt", list1.get(8));
+ Assert.assertEquals("ab", list1.get(9));
+ Assert.assertEquals("99", list1.get(10));
+ Assert.assertEquals("444", list1.get(11));
+
+ list1.add("111");
+
+ Assert.assertEquals("a", list1.get(0));
+ Assert.assertEquals("111", list1.get(1));
+ Assert.assertEquals("b", list1.get(2));
+ Assert.assertEquals("222", list1.get(3));
+ Assert.assertEquals("d", list1.get(4));
+ Assert.assertEquals("333", list1.get(5));
+ Assert.assertEquals("f", list1.get(6));
+ Assert.assertEquals("g", list1.get(7));
+ Assert.assertEquals("tt", list1.get(8));
+ Assert.assertEquals("ab", list1.get(9));
+ Assert.assertEquals("99", list1.get(10));
+ Assert.assertEquals("444", list1.get(11));
+ Assert.assertEquals("111", list1.get(12));
+
+ Assert.assertEquals(13, list1.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ map.teardown();
+ store.teardown();
+
+ map = clonedMap;
+ store = map.getStore();
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+ attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow);
+ Context.OperatorContext context =
+ new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+
+ store.setup(context);
+ map.setup(context);
+
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ SerdeStringSlice serdeString = new SerdeStringSlice();
+ Slice keySlice = serdeString.serialize("a");
+ byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
+
+ SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d",
+ "333", "f", "g", "tt", "ab", "99", "444"));
+
+ Assert.assertEquals(1, map.size());
+ Assert.assertEquals(12, map.get("a").size());
+
+ map.endWindow();
+ store.endWindow();
+
+ map.teardown();
+ store.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
new file mode 100644
index 0000000..63f7b79
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class SpillableByteMapImplTest
+{
+ public static final byte[] ID1 = new byte[]{(byte)0};
+ public static final byte[] ID2 = new byte[]{(byte)1};
+
+ @Rule
+ public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+ @Test
+ public void simpleGetAndPutTest()
+ {
+ InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+ simpleGetAndPutTestHelper(store);
+ }
+
+ @Test
+ public void simpleGetAndPutManagedStateTest()
+ {
+ simpleGetAndPutTestHelper(testMeta.store);
+ }
+
+ private void simpleGetAndPutTestHelper(SpillableStateStore store)
+ {
+ SerdeStringSlice sss = new SerdeStringSlice();
+
+ SpillableByteMapImpl<String, String> map = new SpillableByteMapImpl<>(store, ID1, 0L,
+ new SerdeStringSlice(),
+ new SerdeStringSlice());
+
+ store.setup(testMeta.operatorContext);
+ map.setup(testMeta.operatorContext);
+
+ long windowId = 0L;
+ store.beginWindow(windowId);
+ map.beginWindow(windowId);
+
+ Assert.assertEquals(0, map.size());
+
+ map.put("a", "1");
+ map.put("b", "2");
+ map.put("c", "3");
+
+ Assert.assertEquals(3, map.size());
+
+ Assert.assertEquals("1", map.get("a"));
+ Assert.assertEquals("2", map.get("b"));
+ Assert.assertEquals("3", map.get("c"));
+ Assert.assertEquals(null, map.get("d"));
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+
+ map.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ map.beginWindow(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+ SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+ SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
+ SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+
+ Assert.assertEquals(3, map.size());
+
+ Assert.assertEquals("1", map.get("a"));
+ Assert.assertEquals("2", map.get("b"));
+ Assert.assertEquals("3", map.get("c"));
+ Assert.assertEquals(null, map.get("d"));
+
+ map.put("d", "4");
+ map.put("e", "5");
+ map.put("f", "6");
+
+ Assert.assertEquals(6, map.size());
+
+ Assert.assertEquals("4", map.get("d"));
+ Assert.assertEquals("5", map.get("e"));
+ Assert.assertEquals("6", map.get("f"));
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+ SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+ SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
+ SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "e", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "f", ID1, null);
+
+ map.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ map.beginWindow(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+ SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+ SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
+ SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
+ SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
+ SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
+ SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+ map.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ map.teardown();
+ store.teardown();
+ }
+
+ @Test
+ public void simpleRemoveTest()
+ {
+ InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+ simpleRemoveTestHelper(store);
+ }
+
+ @Test
+ public void simpleRemoveManagedStateTest()
+ {
+ simpleRemoveTestHelper(testMeta.store);
+ }
+
+ private void simpleRemoveTestHelper(SpillableStateStore store)
+ {
+ SerdeStringSlice sss = new SerdeStringSlice();
+
+ SpillableByteMapImpl<String, String> map = new SpillableByteMapImpl<>(store, ID1, 0L,
+ new SerdeStringSlice(),
+ new SerdeStringSlice());
+
+ store.setup(testMeta.operatorContext);
+ map.setup(testMeta.operatorContext);
+
+ long windowId = 0L;
+ store.beginWindow(windowId);
+ map.beginWindow(windowId);
+
+ Assert.assertEquals(0, map.size());
+
+ map.put("a", "1");
+ map.put("b", "2");
+ map.put("c", "3");
+
+ Assert.assertEquals(3, map.size());
+
+ map.remove("b");
+ map.remove("c");
+
+ Assert.assertEquals("1", map.get("a"));
+ Assert.assertEquals(null, map.get("b"));
+ Assert.assertEquals(null, map.get("c"));
+ Assert.assertEquals(null, map.get("d"));
+
+ Assert.assertEquals(1, map.size());
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+
+ map.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+ SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+
+ windowId++;
+ store.beginWindow(windowId);
+ map.beginWindow(windowId);
+
+ Assert.assertEquals(1, map.size());
+
+ Assert.assertEquals("1", map.get("a"));
+ Assert.assertEquals(null, map.get("b"));
+ Assert.assertEquals(null, map.get("c"));
+ Assert.assertEquals(null, map.get("d"));
+
+ map.put("d", "4");
+ map.put("e", "5");
+ map.put("f", "6");
+
+ Assert.assertEquals(4, map.size());
+
+ Assert.assertEquals("4", map.get("d"));
+ Assert.assertEquals("5", map.get("e"));
+ Assert.assertEquals("6", map.get("f"));
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+ SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "e", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "f", ID1, null);
+
+ map.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ map.beginWindow(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+ SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
+ SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
+ SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
+ SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+ map.remove("a");
+ map.remove("d");
+ Assert.assertEquals(null, map.get("a"));
+ Assert.assertEquals(null, map.get("b"));
+ Assert.assertEquals(null, map.get("c"));
+ Assert.assertEquals(null, map.get("d"));
+ Assert.assertEquals("5", map.get("e"));
+ Assert.assertEquals("6", map.get("f"));
+ Assert.assertEquals(null, map.get("g"));
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+ SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
+ SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
+ SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
+ SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+ map.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ map.beginWindow(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
+ SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
+ SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+ map.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ map.teardown();
+ store.teardown();
+ }
+
+ @Test
+ public void multiMapPerBucketTest()
+ {
+ InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+ multiMapPerBucketTestHelper(store);
+ }
+
+ @Test
+ public void multiMapPerBucketManagedStateTest()
+ {
+ multiMapPerBucketTestHelper(testMeta.store);
+ }
+
+ public void multiMapPerBucketTestHelper(SpillableStateStore store)
+ {
+ SerdeStringSlice sss = new SerdeStringSlice();
+
+ SpillableByteMapImpl<String, String> map1 = new SpillableByteMapImpl<>(store, ID1, 0L,
+ new SerdeStringSlice(),
+ new SerdeStringSlice());
+ SpillableByteMapImpl<String, String> map2 = new SpillableByteMapImpl<>(store, ID2, 0L,
+ new SerdeStringSlice(),
+ new SerdeStringSlice());
+
+ store.setup(testMeta.operatorContext);
+ map1.setup(testMeta.operatorContext);
+ map2.setup(testMeta.operatorContext);
+
+ long windowId = 0L;
+ store.beginWindow(windowId);
+ map1.beginWindow(windowId);
+ map2.beginWindow(windowId);
+
+ map1.put("a", "1");
+
+ Assert.assertEquals("1", map1.get("a"));
+ Assert.assertEquals(null, map2.get("a"));
+
+ map2.put("a", "a1");
+
+ Assert.assertEquals("1", map1.get("a"));
+ Assert.assertEquals("a1", map2.get("a"));
+
+ map1.put("b", "2");
+ map2.put("c", "3");
+
+ Assert.assertEquals("1", map1.get("a"));
+ Assert.assertEquals("2", map1.get("b"));
+
+ Assert.assertEquals("a1", map2.get("a"));
+ Assert.assertEquals(null, map2.get("b"));
+ Assert.assertEquals("3", map2.get("c"));
+
+ map1.endWindow();
+ map2.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ map1.beginWindow(windowId);
+ map2.beginWindow(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+ SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1");
+ SpillableTestUtils.checkValue(store, 0L, "b", ID2, null);
+ SpillableTestUtils.checkValue(store, 0L, "c", ID2, "3");
+
+ map1.remove("a");
+
+ Assert.assertEquals(null, map1.get("a"));
+ Assert.assertEquals("a1", map2.get("a"));
+
+ map1.endWindow();
+ map2.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ map1.beginWindow(windowId);
+ map2.beginWindow(windowId);
+
+ SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
+ SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1");
+
+ map1.endWindow();
+ map2.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+
+ map1.teardown();
+ map2.teardown();
+ store.teardown();
+ }
+
+ @Test
+ public void recoveryWithManagedStateTest() throws Exception
+ {
+ SerdeStringSlice sss = new SerdeStringSlice();
+
+ SpillableByteMapImpl<String, String> map1 = new SpillableByteMapImpl<>(testMeta.store, ID1, 0L,
+ new SerdeStringSlice(),
+ new SerdeStringSlice());
+
+ testMeta.store.setup(testMeta.operatorContext);
+ map1.setup(testMeta.operatorContext);
+
+ testMeta.store.beginWindow(0);
+ map1.beginWindow(0);
+ map1.put("x", "1");
+ map1.put("y", "2");
+ map1.put("z", "3");
+ map1.put("zz", "33");
+ Assert.assertEquals(4, map1.size());
+ map1.endWindow();
+ testMeta.store.endWindow();
+
+ testMeta.store.beginWindow(1);
+ map1.beginWindow(1);
+ Assert.assertEquals(4, map1.size());
+ map1.put("x", "4");
+ map1.put("y", "5");
+ map1.remove("zz");
+ Assert.assertEquals(3, map1.size());
+ map1.endWindow();
+ testMeta.store.endWindow();
+ testMeta.store.beforeCheckpoint(1);
+ testMeta.store.checkpointed(1);
+
+ SpillableByteMapImpl<String, String> clonedMap1 = KryoCloneUtils.cloneObject(map1);
+
+ testMeta.store.beginWindow(2);
+ map1.beginWindow(2);
+ Assert.assertEquals(3, map1.size());
+ map1.put("x", "6");
+ map1.put("y", "7");
+ map1.put("w", "8");
+ Assert.assertEquals(4, map1.size());
+ map1.endWindow();
+ testMeta.store.endWindow();
+
+ // simulating crash here
+ map1.teardown();
+ testMeta.store.teardown();
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+ attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
+ Context.OperatorContext context =
+ new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+
+ map1 = clonedMap1;
+ map1.getStore().setup(context);
+ map1.setup(testMeta.operatorContext);
+
+ map1.getStore().beginWindow(2);
+ map1.beginWindow(2);
+ Assert.assertEquals(3, map1.size());
+ Assert.assertEquals("4", map1.get("x"));
+ Assert.assertEquals("5", map1.get("y"));
+ Assert.assertEquals("3", map1.get("z"));
+ map1.endWindow();
+ map1.getStore().endWindow();
+
+ map1.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
new file mode 100644
index 0000000..67db6ba
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+public class SpillableComplexComponentImplTest
+{
+ @Rule
+ public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+ @Test
+ public void simpleIntegrationTest()
+ {
+ InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+ simpleIntegrationTestHelper(store);
+ }
+
+ @Test
+ public void simpleIntegrationManagedStateTest()
+ {
+ simpleIntegrationTestHelper(testMeta.store);
+ }
+
+ public void simpleIntegrationTestHelper(SpillableStateStore store)
+ {
+ SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(store);
+
+ Spillable.SpillableComponent scList =
+ (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new SerdeStringSlice());
+ Spillable.SpillableComponent scMap =
+ (Spillable.SpillableComponent)sccImpl.newSpillableByteMap(0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+ sccImpl.setup(testMeta.operatorContext);
+
+ sccImpl.beginWindow(0L);
+
+ sccImpl.endWindow();
+
+ sccImpl.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
new file mode 100644
index 0000000..00ea58d
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This class contains utility methods that can be used by Spillable data structure unit tests.
+ */
+public class SpillableTestUtils
+{
+ public static SerdeStringSlice SERDE_STRING_SLICE = new SerdeStringSlice();
+ public static SerdeListSlice<String> SERDE_STRING_LIST_SLICE = new SerdeListSlice(new SerdeStringSlice());
+
+ private SpillableTestUtils()
+ {
+ //Shouldn't instantiate this
+ }
+
+ static class TestMeta extends TestWatcher
+ {
+ ManagedStateSpillableStateStore store;
+ Context.OperatorContext operatorContext;
+ String applicationPath;
+
+ @Override
+ protected void starting(Description description)
+ {
+ TestUtils.deleteTargetTestClassFolder(description);
+ store = new ManagedStateSpillableStateStore();
+ applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+ ((FileAccessFSImpl)store.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+ operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ TestUtils.deleteTargetTestClassFolder(description);
+ }
+ }
+
+ public static Slice getKeySlice(byte[] id, String key)
+ {
+ return SliceUtils.concatenate(id, SERDE_STRING_SLICE.serialize(key));
+ }
+
+ public static Slice getKeySlice(byte[] id, int index, String key)
+ {
+ return SliceUtils.concatenate(id,
+ SliceUtils.concatenate(GPOUtils.serializeInt(index),
+ SERDE_STRING_SLICE.serialize(key)));
+ }
+
+ public static void checkValue(SpillableStateStore store, long bucketId, String key,
+ byte[] prefix, String expectedValue)
+ {
+ checkValue(store, bucketId, SliceUtils.concatenate(prefix, SERDE_STRING_SLICE.serialize(key)).buffer,
+ expectedValue, 0, SERDE_STRING_SLICE);
+ }
+
+ public static void checkValue(SpillableStateStore store, long bucketId,
+ byte[] prefix, int index, List<String> expectedValue)
+ {
+ checkValue(store, bucketId, SliceUtils.concatenate(prefix, GPOUtils.serializeInt(index)), expectedValue, 0,
+ SERDE_STRING_LIST_SLICE);
+ }
+
+ public static <T> void checkValue(SpillableStateStore store, long bucketId, byte[] bytes,
+ T expectedValue, int offset, Serde<T, Slice> serde)
+ {
+ Slice slice = store.getSync(bucketId, new Slice(bytes));
+
+ if (slice == null || slice.length == 0) {
+ if (expectedValue != null) {
+ Assert.assertEquals(expectedValue, slice);
+ } else {
+ return;
+ }
+ }
+
+ T string = serde.deserialize(slice, new MutableInt(offset));
+
+ Assert.assertEquals(expectedValue, string);
+ }
+
+ public static void checkOutOfBounds(SpillableArrayListImpl<String> list, int index)
+ {
+ boolean exceptionThrown = false;
+
+ try {
+ list.get(index);
+ } catch (IndexOutOfBoundsException ex) {
+ exceptionThrown = true;
+ }
+
+ Assert.assertTrue(exceptionThrown);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
new file mode 100644
index 0000000..8033a7d
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Created by tfarkas on 6/4/16.
+ */
+public class TimeBasedPriorityQueueTest
+{
+ @Test
+ public void simpleInsertAndRemoveTest()
+ {
+ TimeBasedPriorityQueue<String> queue = new TimeBasedPriorityQueue<String>();
+ queue.upSert("a");
+ queue.remove("a");
+
+ overRemoveTest(queue, 1);
+ }
+
+ @Test
+ public void simpleInsertAndLRURemoveTest()
+ {
+ TimeBasedPriorityQueue<String> queue = new TimeBasedPriorityQueue<String>();
+ queue.upSert("a");
+
+ Set<String> set = queue.removeLRU(1);
+
+ Assert.assertEquals(Sets.newHashSet("a"), set);
+ }
+
+ @Test
+ public void simpleLRUTest() throws Exception
+ {
+ TimeBasedPriorityQueue<String> queue = new TimeBasedPriorityQueue<String>();
+
+ queue.upSert("a");
+ Thread.sleep(1L);
+
+ queue.upSert("b");
+ Thread.sleep(1L);
+
+ queue.upSert("a");
+
+ Set<String> set = queue.removeLRU(1);
+
+ Assert.assertEquals(Sets.newHashSet("b"), set);
+ }
+
+ @Test
+ public void complexLRUTest() throws Exception
+ {
+ //0, 3, 6, 9
+ //1, 4, 7
+ //2, 5, 8
+
+ TimeBasedPriorityQueue<String> queue = new TimeBasedPriorityQueue<String>();
+
+ for (int counter = 0; counter < 10; counter++) {
+ String val = "" + counter;
+
+ queue.upSert(val);
+ Thread.sleep(1L);
+ }
+
+ for (int counter = 0; counter < 10; counter++) {
+ if (counter % 3 != 1) {
+ continue;
+ }
+
+ String val = "" + counter;
+ queue.remove(val);
+ }
+
+ for (int counter = 0; counter < 10; counter++) {
+ if (counter % 3 != 0) {
+ continue;
+ }
+
+ String val = "" + counter;
+ queue.upSert(val);
+ Thread.sleep(1L);
+ }
+
+ //2, 5, 8, 0, 3, 6, 9
+
+ overRemoveTest(queue, 8);
+
+ Set<String> expiredValues = queue.removeLRU(3);
+
+ Assert.assertEquals(Sets.newHashSet("2", "5", "8"), expiredValues);
+
+ overRemoveTest(queue, 6);
+
+ expiredValues = queue.removeLRU(4);
+
+ Assert.assertEquals(Sets.newHashSet("0", "3", "6", "9"), expiredValues);
+ }
+
+ private void overRemoveTest(TimeBasedPriorityQueue<String> queue, int removeCount)
+ {
+ boolean exceptionThrown = false;
+
+ try {
+ queue.removeLRU(removeCount);
+ } catch (IllegalArgumentException e) {
+ exceptionThrown = true;
+ }
+
+ Assert.assertTrue(exceptionThrown);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java
new file mode 100644
index 0000000..b1710dd
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class WindowBoundedMapCacheTest
+{
+ @Test
+ public void simplePutGetTest()
+ {
+ WindowBoundedMapCache<String, String> cache = new WindowBoundedMapCache<>();
+
+ long windowId = 0L;
+
+ windowId++;
+
+ cache.put("1", "a");
+ Assert.assertEquals("a", cache.get("1"));
+
+ cache.endWindow();
+
+ windowId++;
+
+ Assert.assertEquals("a", cache.get("1"));
+
+ cache.endWindow();
+ }
+
+ @Test
+ public void getChangedGetRemovedTest()
+ {
+ WindowBoundedMapCache<String, String> cache = new WindowBoundedMapCache<>();
+
+ long windowId = 0L;
+
+ windowId++;
+
+ cache.put("1", "a");
+ cache.put("2", "b");
+
+ Assert.assertEquals(Sets.newHashSet("1", "2"), cache.getChangedKeys());
+ Assert.assertEquals(Sets.newHashSet(), cache.getRemovedKeys());
+
+ cache.endWindow();
+
+ windowId++;
+
+ cache.remove("1");
+
+ Assert.assertEquals(Sets.newHashSet(), cache.getChangedKeys());
+ Assert.assertEquals(Sets.newHashSet("1"), cache.getRemovedKeys());
+
+ Assert.assertEquals(null, cache.get("1"));
+ Assert.assertEquals("b", cache.get("2"));
+
+ cache.endWindow();
+
+ windowId++;
+
+ Assert.assertEquals(Sets.newHashSet(), cache.getChangedKeys());
+ Assert.assertEquals(Sets.newHashSet(), cache.getRemovedKeys());
+
+ cache.endWindow();
+ }
+
+ @Test
+ public void expirationTest() throws Exception
+ {
+ WindowBoundedMapCache<String, String> cache = new WindowBoundedMapCache<>(2);
+
+ long windowId = 0L;
+
+ windowId++;
+
+ cache.put("1", "a");
+ Thread.sleep(1L);
+ cache.put("2", "b");
+ Thread.sleep(1L);
+ cache.put("3", "c");
+
+ Assert.assertEquals(Sets.newHashSet("1", "2", "3"), cache.getChangedKeys());
+
+ cache.endWindow();
+
+ windowId++;
+
+ Assert.assertEquals(null, cache.get("1"));
+ Assert.assertEquals("b", cache.get("2"));
+ Assert.assertEquals("c", cache.get("3"));
+
+ Assert.assertEquals(Sets.newHashSet(), cache.getChangedKeys());
+ Assert.assertEquals(Sets.newHashSet(), cache.getRemovedKeys());
+
+ cache.endWindow();
+ }
+}