You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/09/22 16:46:49 UTC
apex-malhar git commit: APEXMALHAR-2248 #resolve Added interfaces and
implementations of SpillableSet and SpillableSetMultimap
Repository: apex-malhar
Updated Branches:
refs/heads/master d713e521e -> 7ac4a0ed7
APEXMALHAR-2248 #resolve Added interfaces and implementations of SpillableSet and SpillableSetMultimap
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7ac4a0ed
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7ac4a0ed
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7ac4a0ed
Branch: refs/heads/master
Commit: 7ac4a0ed759b06c382fa54fc148114709c1452f2
Parents: d713e52
Author: David Yan <da...@datatorrent.com>
Authored: Mon Sep 19 17:58:12 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu Sep 22 08:56:43 2016 -0700
----------------------------------------------------------------------
.../malhar/lib/state/spillable/Spillable.java | 35 +-
.../spillable/SpillableComplexComponent.java | 32 +-
.../SpillableComplexComponentImpl.java | 9 +
.../lib/state/spillable/SpillableSetImpl.java | 352 +++++++++++++++++++
.../spillable/SpillableSetMultimapImpl.java | 321 +++++++++++++++++
.../malhar/lib/utils/serde/SerdeKryoSlice.java | 100 ++++++
.../malhar/lib/utils/serde/SerdeLongSlice.java | 54 +++
.../malhar/lib/utils/serde/SerdePairSlice.java | 89 +++++
.../state/spillable/SpillableSetImplTest.java | 148 ++++++++
.../spillable/SpillableSetMultimapImplTest.java | 298 ++++++++++++++++
.../lib/utils/serde/SerdeKryoSliceTest.java | 79 +++++
.../lib/utils/serde/SerdePairSliceTest.java | 44 +++
12 files changed, 1546 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
index 4c9b997..849389b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
@@ -21,9 +21,11 @@ package org.apache.apex.malhar.lib.state.spillable;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multiset;
+import com.google.common.collect.SetMultimap;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context.OperatorContext;
@@ -37,7 +39,7 @@ public interface Spillable
{
/**
* This represents a spillable {@link java.util.List}. The underlying implementation
- * of this list is similar to that of an {@link java.util.ArrayList}. User's that receive an
+ * of this list is similar to that of an {@link java.util.ArrayList}. Users that receive an
* implementation of this interface don't need to worry about propagating operator call-backs
* to the data structure.
* @param <T> The type of the data stored in the {@link SpillableArrayList}.
@@ -47,9 +49,19 @@ public interface Spillable
}
/**
+ * This represents a spillable {@link java.util.Set}. Users that receive an
+ * implementation of this interface don't need to worry about propagating operator call-backs
+ * to the data structure.
+ * @param <T> The type of the data stored in the {@link SpillableSet}.
+ */
+ interface SpillableSet<T> extends Set<T>
+ {
+ }
+
+ /**
* This represents a spillable {@link java.util.Map}. Implementations make
* some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is
- * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). User's that receive an
+ * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an
* implementation of this interface don't need to worry about propagating operator call-backs
* to the data structure.
* @param <K> The type of the keys.
@@ -62,7 +74,7 @@ public interface Spillable
/**
* This represents a spillable {@link com.google.common.collect.ListMultimap} implementation. Implementations make
* some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is
- * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). User's that receive an
+ * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an
* implementation of this interface don't need to worry about propagating operator call-backs
* to the data structure.
* @param <K> The type of the keys.
@@ -73,9 +85,22 @@ public interface Spillable
}
/**
+ * This represents a spillable {@link com.google.common.collect.SetMultimap} implementation. Implementations make
+ * some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is
+ * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an
+ * implementation of this interface don't need to worry about propagating operator call-backs
+ * to the data structure.
+ * @param <K> The type of the keys.
+ * @param <V> The type of the values.
+ */
+ interface SpillableSetMultimap<K, V> extends SetMultimap<K, V>
+ {
+ }
+
+ /**
* This represents a spillable {@link com.google.common.collect.Multiset} implementation. Implementations make
* some assumptions about serialization and equality. Consider two elements T1 and T2. The assumption is
- * that T1.equals(T2) should be consistent with T1.toByteArray().equals(T2.toByteArray()). User's that receive an
+ * that T1.equals(T2) should be consistent with T1.toByteArray().equals(T2.toByteArray()). Users that receive an
* implementation of this interface don't need to worry about propagating operator call-backs to the data structure.
*/
interface SpillableByteMultiset<T> extends Multiset<T>
@@ -83,7 +108,7 @@ public interface Spillable
}
/**
- * This represents a spillable {@link java.util.Queue} implementation. User's that receive an
+ * This represents a spillable {@link java.util.Queue} implementation. Users that receive an
* implementation of this interface don't need to worry about propagating operator call-backs
* to the data structure.
* @param <T> The type of the data stored in the queue.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index c63c7ef..e4836c4 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -39,7 +39,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* This is a method for creating a {@link SpillableArrayList}. This method
* auto-generates an identifier for the data structure.
* @param <T> The type of data stored in the {@link SpillableArrayList}.
- * @param bucket The bucket that this {@link SpillableArrayList} will be spilled too.
+ * @param bucket The bucket that this {@link SpillableArrayList} will be spilled to.
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}.
* @return A {@link SpillableArrayList}.
*/
@@ -49,7 +49,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* This is a method for creating a {@link SpillableArrayList}.
* @param <T> The type of data stored in the {@link SpillableArrayList}.
* @param identifier The identifier for this {@link SpillableArrayList}.
- * @param bucket The bucket that this {@link SpillableArrayList} will be spilled too.
+ * @param bucket The bucket that this {@link SpillableArrayList} will be spilled to.
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}.
* @return A {@link SpillableArrayList}.
*/
@@ -60,7 +60,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* auto-generates an identifier for the data structure.
* @param <K> The type of the keys.
* @param <V> The type of the values.
- * @param bucket The bucket that this {@link SpillableByteMap} will be spilled too.
+ * @param bucket The bucket that this {@link SpillableByteMap} will be spilled to.
* @param serdeKey The Serializer/Deserializer to use for the map's keys.
* @param serdeValue The Serializer/Deserializer to use for the map's values.
* @return A {@link SpillableByteMap}.
@@ -73,7 +73,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param <K> The type of the keys.
* @param <V> The type of the values.
* @param identifier The identifier for this {@link SpillableByteMap}.
- * @param bucket The bucket that this {@link SpillableByteMap} will be spilled too.
+ * @param bucket The bucket that this {@link SpillableByteMap} will be spilled to.
* @param serdeKey The Serializer/Deserializer to use for the map's keys.
* @param serdeValue The Serializer/Deserializer to use for the map's values.
* @return A {@link SpillableByteMap}.
@@ -86,7 +86,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* auto-generates an identifier for the data structure.
* @param <K> The type of the keys.
* @param <V> The type of the values in the map's lists.
- * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled too.
+ * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled to.
* @param serdeKey The Serializer/Deserializer to use for the map's keys.
* @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
* @return A {@link SpillableByteArrayListMultimap}.
@@ -99,7 +99,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param <K> The type of the keys.
* @param <V> The type of the values in the map's lists.
* @param identifier The identifier for this {@link SpillableByteArrayListMultimap}.
- * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled too.
+ * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled to.
* @param serdeKey The Serializer/Deserializer to use for the map's keys.
* @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
* @return A {@link SpillableByteArrayListMultimap}.
@@ -109,10 +109,22 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
Serde<V, Slice> serdeValue);
/**
+ * This is a method for creating a {@link SpillableSetMultimap}.
+ * @param <K> The type of the keys.
+ * @param <V> The type of the values in the map's lists.
+ * @param bucket The bucket that this {@link SpillableSetMultimap} will be spilled to.
+ * @param serdeKey The Serializer/Deserializer to use for the map's keys.
+ * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
+ * @return A {@link SpillableSetMultimap}.
+ */
+ <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K,
+ Slice> serdeKey, Serde<V, Slice> serdeValue);
+
+ /**
* This is a method for creating a {@link SpillableByteMultiset}. This method
* auto-generates an identifier for the data structure.
* @param <T> The type of the elements.
- * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled too.
+ * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled to.
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}.
* @return A {@link SpillableByteMultiset}.
*/
@@ -122,7 +134,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* This is a method for creating a {@link SpillableByteMultiset}.
* @param <T> The type of the elements.
* @param identifier The identifier for this {@link SpillableByteMultiset}.
- * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled too.
+ * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled to.
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}.
* @return A {@link SpillableByteMultiset}.
*/
@@ -132,7 +144,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* This is a method for creating a {@link SpillableQueue}. This method
* auto-generates an identifier for the data structure.
* @param <T> The type of the data stored in the {@link SpillableQueue}.
- * @param bucket The bucket that this {@link SpillableQueue} will be spilled too.
+ * @param bucket The bucket that this {@link SpillableQueue} will be spilled to.
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
* @return A {@link SpillableQueue}.
*/
@@ -142,7 +154,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* This is a method for creating a {@link SpillableQueue}.
* @param <T> The type of the data stored in the {@link SpillableQueue}.
* @param identifier The identifier for this {@link SpillableByteArrayListMultimap}.
- * @param bucket The bucket that this {@link SpillableQueue} will be spilled too.
+ * @param bucket The bucket that this {@link SpillableQueue} will be spilled to.
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
* @return A {@link SpillableQueue}.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
index aeb02c5..9c3defc 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -119,6 +119,15 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
return map;
}
+ public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K,
+ Slice> serdeKey, Serde<V, Slice> serdeValue)
+ {
+ SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<K, V>(store,
+ identifierGenerator.next(), bucket, serdeKey, serdeValue);
+ componentList.add(map);
+ return map;
+ }
+
public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde)
{
throw new UnsupportedOperationException("Unsupported Operation");
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
new file mode 100644
index 0000000..122cd2d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
+ * @param <T> The type of object stored in the {@link SpillableSetImpl}.
+ *
+ * @since 3.5.0
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable.SpillableComponent
+{
+ private static class ListNode<T>
+ {
+ ListNode()
+ {
+ }
+
+ ListNode(boolean valid, T next)
+ {
+ this.valid = valid;
+ this.next = next;
+ }
+
+ boolean valid;
+ T next;
+ }
+
+ public static class SerdeListNodeSlice<T> implements Serde<ListNode<T>, Slice>
+ {
+ private Serde<T, Slice> serde;
+ private static Slice falseSlice = new Slice(new byte[]{0});
+ private static Slice trueSlice = new Slice(new byte[]{1});
+
+ public SerdeListNodeSlice(@NotNull Serde<T, Slice> serde)
+ {
+ this.serde = Preconditions.checkNotNull(serde);
+ }
+
+ @Override
+ public Slice serialize(ListNode<T> object)
+ {
+ int size = 0;
+
+ Slice slice1 = object.valid ? trueSlice : falseSlice;
+ size += 1;
+ Slice slice2 = serde.serialize(object.next);
+ size += slice2.length;
+
+ byte[] bytes = new byte[size];
+ System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length);
+ System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length);
+
+ return new Slice(bytes);
+ }
+
+ @Override
+ public ListNode<T> deserialize(Slice slice, MutableInt offset)
+ {
+ ListNode<T> result = new ListNode<>();
+ result.valid = slice.buffer[offset.intValue()] != 0;
+ offset.add(1);
+ result.next = serde.deserialize(slice, offset);
+ return result;
+ }
+
+ @Override
+ public ListNode<T> deserialize(Slice object)
+ {
+ return deserialize(object, new MutableInt(0));
+ }
+ }
+
+ @NotNull
+ private SpillableStateStore store;
+ @NotNull
+ private SpillableByteMapImpl<T, ListNode<T>> map;
+
+ private T head;
+ private int size;
+
+ private SpillableSetImpl()
+ {
+ //for kryo
+ }
+
+ public SpillableStateStore getStore()
+ {
+ return store;
+ }
+
+ /**
+ * Creates a {@link SpillableSetImpl}.
+ * @param bucketId The Id of the bucket used to store this
+ * {@link SpillableSetImpl} in the provided {@link SpillableStateStore}.
+ * @param prefix The Id of this {@link SpillableSetImpl}.
+ * @param store The {@link SpillableStateStore} in which to spill to.
+ * @param serde The {@link Serde} to use when serializing and deserializing data.
+ */
+ public SpillableSetImpl(long bucketId, @NotNull byte[] prefix,
+ @NotNull SpillableStateStore store,
+ @NotNull Serde<T, Slice> serde)
+ {
+ this.store = Preconditions.checkNotNull(store);
+
+ map = new SpillableByteMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde));
+ }
+
+ public void setSize(int size)
+ {
+ Preconditions.checkArgument(size >= 0);
+ this.size = size;
+ }
+
+ public void setHead(T head)
+ {
+ Preconditions.checkNotNull(head);
+ this.head = head;
+ }
+
+ public T getHead()
+ {
+ return head;
+ }
+
+ @Override
+ public int size()
+ {
+ return size;
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return size == 0;
+ }
+
+ @Override
+ public boolean contains(Object o)
+ {
+ T t = (T)o;
+ ListNode<T> node = map.get(t);
+ return node != null && node.valid;
+ }
+
+ @Override
+ public Iterator<T> iterator()
+ {
+ return new Iterator<T>()
+ {
+ T cur = head;
+ T prev = null;
+
+ @Override
+ public boolean hasNext()
+ {
+ while (cur != null) {
+ ListNode<T> node = map.get(cur);
+ if (node.valid) {
+ return true;
+ }
+ if (cur.equals(node.next)) {
+ break;
+ } else {
+ cur = node.next;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public T next()
+ {
+ while (cur != null) {
+ ListNode<T> node = map.get(cur);
+ try {
+ if (node.valid) {
+ prev = cur;
+ return prev;
+ }
+ } finally {
+ if (cur.equals(node.next)) {
+ cur = null;
+ } else {
+ cur = node.next;
+ }
+ }
+ }
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void remove()
+ {
+ ListNode<T> node = map.get(prev);
+ node.valid = false;
+ map.put(prev, node);
+ size--;
+ }
+ };
+ }
+
+ @Override
+ public Object[] toArray()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T1> T1[] toArray(T1[] t1s)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean add(T t)
+ {
+ Preconditions.checkArgument((size() + 1) > 0);
+ ListNode<T> node = map.get(t);
+ if (node == null) {
+ map.put(t, new ListNode<>(true, head == null ? t : head));
+ head = t;
+ size++;
+ return true;
+ } else if (!node.valid) {
+ node.valid = true;
+ map.put(t, node);
+ size++;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean remove(Object o)
+ {
+ T t = (T)o;
+ ListNode<T> node = map.get(t);
+ if (node == null || !node.valid) {
+ return false;
+ } else {
+ node.valid = false;
+ map.put(t, node);
+ size--;
+ return true;
+ }
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> collection)
+ {
+ for (Object item : collection) {
+ if (!contains(item)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends T> collection)
+ {
+ for (T element: collection) {
+ add(element);
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> collection)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> collection)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear()
+ {
+ Iterator<T> it = iterator();
+ while (it.hasNext()) {
+ it.next();
+ it.remove();
+ }
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ map.setup(context);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ map.beginWindow(windowId);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ map.endWindow();
+ }
+
+ @Override
+ public void teardown()
+ {
+ map.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
new file mode 100644
index 0000000..951ef76
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdePairSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}.
+ *
+ * @since 3.5.0
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMultimap<K, V>,
+ Spillable.SpillableComponent
+{
+ public static final int DEFAULT_BATCH_SIZE = 1000;
+ public static final byte[] META_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
+
+ private transient WindowBoundedMapCache<K, SpillableSetImpl<V>> cache = new WindowBoundedMapCache<>();
+
+ @NotNull
+ private SpillableByteMapImpl<Slice, Pair<Integer, V>> map;
+ private SpillableStateStore store;
+ private byte[] identifier;
+ private long bucket;
+ private Serde<K, Slice> serdeKey;
+ private Serde<V, Slice> serdeValue;
+ private transient List<SpillableSetImpl<V>> removedSets = new ArrayList<>();
+
+ private SpillableSetMultimapImpl()
+ {
+ // for kryo
+ }
+
+ /**
+ * Creates a {@link SpillableSetMultimapImpl}.
+ * @param store The {@link SpillableStateStore} in which to spill to.
+ * @param identifier The Id of this {@link SpillableSetMultimapImpl}.
+ * @param bucket The Id of the bucket used to store this
+ * {@link SpillableSetMultimapImpl} in the provided {@link SpillableStateStore}.
+ * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+ * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+ */
+ public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
+ Serde<K, Slice> serdeKey,
+ Serde<V, Slice> serdeValue)
+ {
+ this.store = Preconditions.checkNotNull(store);
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.bucket = bucket;
+ this.serdeKey = Preconditions.checkNotNull(serdeKey);
+ this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+ map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue));
+ }
+
+ public SpillableStateStore getStore()
+ {
+ return store;
+ }
+
+ @Override
+ public Set<V> get(@NotNull K key)
+ {
+ return getHelper(key);
+ }
+
+ private SpillableSetImpl<V> getHelper(@NotNull K key)
+ {
+ SpillableSetImpl<V> spillableSet = cache.get(key);
+
+ if (spillableSet == null) {
+ Slice keySlice = serdeKey.serialize(key);
+ Pair<Integer, V> meta = map.get(SliceUtils.concatenate(keySlice, META_KEY_SUFFIX));
+
+ if (meta == null) {
+ return null;
+ }
+
+ Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
+ spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+ spillableSet.setSize(meta.getLeft());
+ spillableSet.setHead(meta.getRight());
+ }
+
+ cache.put(key, spillableSet);
+
+ return spillableSet;
+ }
+
+ @Override
+ public Set<K> keySet()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Multiset<K> keys()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<V> values()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<Map.Entry<K, V>> entries()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Note that this always returns null because the set is no longer valid after this call
+ *
+ * @param key
+ * @return null
+ */
+ @Override
+ public Set<V> removeAll(@NotNull Object key)
+ {
+ SpillableSetImpl<V> spillableSet = getHelper((K)key);
+ if (spillableSet != null) {
+ cache.remove((K)key);
+ Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
+ map.remove(keySlice);
+ spillableSet.clear();
+ removedSets.add(spillableSet);
+ }
+ return null;
+ }
+
+ @Override
+ public void clear()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size()
+ {
+ // TODO: This is actually wrong since in a Multimap, size() should return the number of entries, not the number of distinct keys
+ return map.size();
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return map.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key)
+ {
+ if (cache.contains((K)key)) {
+ return true;
+ }
+ Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
+ return map.containsKey(keySlice);
+ }
+
+ @Override
+ public boolean containsValue(@NotNull Object value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsEntry(Object key, Object value)
+ {
+ Set<V> set = get((K)key);
+ if (set == null) {
+ return false;
+ } else {
+ return set.contains(value);
+ }
+ }
+
+ @Override
+ public boolean put(K key, V value)
+ {
+ SpillableSetImpl<V> spillableSet = getHelper(key);
+
+ if (spillableSet == null) {
+ Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
+ spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+ cache.put(key, spillableSet);
+ }
+ spillableSet.add(value);
+ return true;
+ }
+
+ @Override
+ public boolean remove(@NotNull Object key, @NotNull Object value)
+ {
+ Set<V> set = get((K)key);
+ if (set == null) {
+ return false;
+ } else {
+ return set.remove(value);
+ }
+ }
+
+ @Override
+ public boolean putAll(@Nullable K key, Iterable<? extends V> values)
+ {
+ boolean changed = false;
+
+ for (V value: values) {
+ changed |= put(key, value);
+ }
+
+ return changed;
+ }
+
+ @Override
+ public boolean putAll(Multimap<? extends K, ? extends V> multimap)
+ {
+ boolean changed = false;
+
+ for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) {
+ changed |= put(entry.getKey(), entry.getValue());
+ }
+
+ return changed;
+ }
+
+ @Override
+ public Set<V> replaceValues(K key, Iterable<? extends V> values)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<K, Collection<V>> asMap()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ map.setup(context);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ map.beginWindow(windowId);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ for (K key: cache.getChangedKeys()) {
+
+ SpillableSetImpl<V> spillableSet = cache.get(key);
+ spillableSet.endWindow();
+
+ map.put(SliceUtils.concatenate(serdeKey.serialize(key), META_KEY_SUFFIX),
+ new ImmutablePair<>(spillableSet.size(), spillableSet.getHead()));
+ }
+
+ for (SpillableSetImpl removedSet : removedSets) {
+ removedSet.endWindow();
+ }
+
+ cache.endWindow();
+ map.endWindow();
+ }
+
+ @Override
+ public void teardown()
+ {
+ map.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
new file mode 100644
index 0000000..d4b9488
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because
+ * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or
+ * incompatible changes to the class being serialized.
+ *
+ * @param <T> The type being serialized
+ */
+@InterfaceStability.Evolving
+public class SerdeKryoSlice<T> implements Serde<T, Slice>
+{
+ // Setup ThreadLocal of Kryo instances
+ private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>()
+ {
+ protected Kryo initialValue()
+ {
+ Kryo kryo = new Kryo();
+ // configure kryo instance, customize settings
+ return kryo;
+ }
+ };
+
+ private final Class<? extends T> clazz;
+
+ public SerdeKryoSlice()
+ {
+ this.clazz = null;
+ }
+
+ public SerdeKryoSlice(Class<? extends T> clazz)
+ {
+ this.clazz = clazz;
+ }
+
+ @Override
+ public Slice serialize(T object)
+ {
+ Kryo kryo = kryos.get();
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ Output output = new Output(stream);
+ if (clazz == null) {
+ kryo.writeClassAndObject(output, object);
+ } else {
+ kryo.writeObject(output, object);
+ }
+ return new Slice(output.toBytes());
+ }
+
+ @Override
+ public T deserialize(Slice slice, MutableInt offset)
+ {
+ byte[] bytes = slice.toByteArray();
+ Kryo kryo = kryos.get();
+ Input input = new Input(bytes, offset.intValue(), bytes.length - offset.intValue());
+ T object;
+ if (clazz == null) {
+ object = (T)kryo.readClassAndObject(input);
+ } else {
+ object = kryo.readObject(input, clazz);
+ }
+ offset.setValue(bytes.length - input.position());
+ return object;
+ }
+
+ @Override
+ public T deserialize(Slice slice)
+ {
+ return deserialize(slice, new MutableInt(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
new file mode 100644
index 0000000..6fe07d9
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of {@link Serde} which deserializes and serializes integers.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class SerdeLongSlice implements Serde<Long, Slice>
+{
+ @Override
+ public Slice serialize(Long object)
+ {
+ return new Slice(GPOUtils.serializeLong(object));
+ }
+
+ @Override
+ public Long deserialize(Slice slice, MutableInt offset)
+ {
+ long val = GPOUtils.deserializeLong(slice.buffer, new MutableInt(slice.offset + offset.intValue()));
+ offset.add(8);
+ return val;
+ }
+
+ @Override
+ public Long deserialize(Slice object)
+ {
+ return deserialize(object, new MutableInt(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
new file mode 100644
index 0000000..59cf282
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes pairs.
+ */
+@InterfaceStability.Evolving
+public class SerdePairSlice<T1, T2> implements Serde<Pair<T1, T2>, Slice>
+{
+ @NotNull
+ private Serde<T1, Slice> serde1;
+ @NotNull
+ private Serde<T2, Slice> serde2;
+
+ private SerdePairSlice()
+ {
+ // for Kryo
+ }
+
+ /**
+ * Creates a {@link SerdePairSlice}.
+ * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair
+ * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair
+ */
+ public SerdePairSlice(@NotNull Serde<T1, Slice> serde1, @NotNull Serde<T2, Slice> serde2)
+ {
+ this.serde1 = Preconditions.checkNotNull(serde1);
+ this.serde2 = Preconditions.checkNotNull(serde2);
+ }
+
+ @Override
+ public Slice serialize(Pair<T1, T2> pair)
+ {
+ int size = 0;
+
+ Slice slice1 = serde1.serialize(pair.getLeft());
+ size += slice1.length;
+ Slice slice2 = serde2.serialize(pair.getRight());
+ size += slice2.length;
+
+ byte[] bytes = new byte[size];
+ System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length);
+ System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length);
+
+ return new Slice(bytes);
+ }
+
+ @Override
+ public Pair<T1, T2> deserialize(Slice slice, MutableInt offset)
+ {
+ T1 first = serde1.deserialize(slice, offset);
+ T2 second = serde2.deserialize(slice, offset);
+ return new ImmutablePair<>(first, second);
+ }
+
+ @Override
+ public Pair<T1, T2> deserialize(Slice slice)
+ {
+ return deserialize(slice, new MutableInt(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
new file mode 100644
index 0000000..3883191
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.google.common.collect.Lists;
+
+public class SpillableSetImplTest
+{
+ public static final byte[] ID1 = new byte[]{(byte)0};
+
+ @Rule
+ public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+ @Test
+ public void simpleAddGetAndSetTest1()
+ {
+ InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+ simpleAddGetAndSetTest1Helper(store);
+ }
+
+ @Test
+ public void simpleAddGetAndSetManagedStateTest1()
+ {
+ simpleAddGetAndSetTest1Helper(testMeta.store);
+ }
+
+ public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
+ {
+ SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new SerdeStringSlice());
+
+ store.setup(testMeta.operatorContext);
+ set.setup(testMeta.operatorContext);
+
+ long windowId = 0L;
+ store.beginWindow(windowId);
+ set.beginWindow(windowId);
+
+ Assert.assertEquals(0, set.size());
+
+ set.add("a");
+
+ Assert.assertEquals(1, set.size());
+
+ Assert.assertTrue(set.contains("a"));
+
+ set.addAll(Lists.newArrayList("a", "b", "c"));
+
+ Assert.assertEquals(3, set.size());
+
+ Assert.assertTrue(set.contains("a"));
+ Assert.assertTrue(set.contains("b"));
+ Assert.assertTrue(set.contains("c"));
+
+ HashSet<String> result = new HashSet<>();
+ Iterator<String> it = set.iterator();
+ int i = 0;
+ while (it.hasNext()) {
+ result.add(it.next());
+ i++;
+ }
+ Assert.assertTrue(result.containsAll(Lists.newArrayList("a", "b", "c")));
+ Assert.assertEquals(3, i);
+
+ it = set.iterator();
+ while (it.hasNext()) {
+ if ("b".equals(it.next())) {
+ it.remove();
+ }
+ }
+ Assert.assertEquals(2, set.size());
+ Assert.assertTrue(set.contains("a"));
+ Assert.assertFalse(set.contains("b"));
+ Assert.assertTrue(set.contains("c"));
+
+ set.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ set.beginWindow(windowId);
+
+ set.add("tt");
+ set.add("ab");
+ set.add("99");
+ set.add("oo");
+
+ Assert.assertTrue(set.contains("tt"));
+ Assert.assertTrue(set.contains("ab"));
+ Assert.assertTrue(set.contains("99"));
+ Assert.assertTrue(set.contains("oo"));
+
+ set.remove("ab");
+
+ Assert.assertTrue(set.contains("tt"));
+ Assert.assertFalse(set.contains("ab"));
+ Assert.assertTrue(set.contains("99"));
+ Assert.assertTrue(set.contains("oo"));
+
+ set.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ windowId++;
+ store.beginWindow(windowId);
+ set.beginWindow(windowId);
+
+ set.endWindow();
+ store.endWindow();
+ store.beforeCheckpoint(windowId);
+ store.checkpointed(windowId);
+ store.committed(windowId);
+
+ set.teardown();
+ store.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
new file mode 100644
index 0000000..e9903ec
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Iterator;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class SpillableSetMultimapImplTest
+{
+ public static final byte[] ID1 = new byte[]{(byte)0};
+
+ @Rule
+ public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+ @Test
+ public void simpleMultiKeyTest()
+ {
+ InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+ simpleMultiKeyTestHelper(store);
+ }
+
+ @Test
+ public void simpleMultiKeyManagedStateTest()
+ {
+ simpleMultiKeyTestHelper(testMeta.store);
+ }
+
+ public void simpleMultiKeyTestHelper(SpillableStateStore store)
+ {
+ SpillableSetMultimapImpl<String, String> map =
+ new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(),
+ new SerdeStringSlice());
+
+ store.setup(testMeta.operatorContext);
+ map.setup(testMeta.operatorContext);
+
+ long nextWindowId = 0L;
+ nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
+ nextWindowId++;
+
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(1, map.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ nextWindowId = simpleMultiKeyTestHelper(store, map, "b", nextWindowId);
+ nextWindowId++;
+
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(2, map.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ simpleMultiKeyTestHelper(store, map, "c", nextWindowId);
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(3, map.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ map.teardown();
+ store.teardown();
+ }
+
+ public long simpleMultiKeyTestHelper(SpillableStateStore store,
+ SpillableSetMultimapImpl<String, String> map, String key, long nextWindowId)
+ {
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertNull(map.get(key));
+
+ Assert.assertFalse(map.containsKey(key));
+
+ map.put(key, "a");
+
+ Assert.assertTrue(map.containsKey(key));
+
+ Set<String> set1 = map.get(key);
+ Assert.assertEquals(1, set1.size());
+ Iterator<String> it = set1.iterator();
+
+ Assert.assertEquals("a", it.next());
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ map.removeAll(key);
+ Assert.assertFalse(map.containsKey(key));
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertFalse(map.containsKey(key));
+ map.put(key, "a");
+ set1 = map.get(key);
+ Assert.assertEquals(1, set1.size());
+ set1.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
+ Assert.assertEquals(7, set1.size());
+
+ Set<String> referenceSet = Sets.newHashSet("a", "b", "c", "d", "e", "f", "g");
+ Assert.assertTrue(referenceSet.containsAll(set1));
+ Assert.assertTrue(set1.containsAll(referenceSet));
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Set<String> set2 = map.get(key);
+
+ Assert.assertEquals(7, set2.size());
+ Assert.assertTrue(referenceSet.containsAll(set2));
+ Assert.assertTrue(set2.containsAll(referenceSet));
+
+ set2.add("tt");
+ set2.add("ab");
+ set2.add("99");
+ set2.add("oo");
+ referenceSet = Sets.newHashSet("a", "b", "c", "d", "e", "f", "g", "tt", "ab", "99", "oo");
+ Assert.assertTrue(referenceSet.containsAll(set2));
+ Assert.assertTrue(set2.containsAll(referenceSet));
+
+ Assert.assertEquals(11, set2.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(11, set2.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ return nextWindowId;
+ }
+
+ @Test
+ public void recoveryTestWithManagedState()
+ {
+ SpillableStateStore store = testMeta.store;
+
+ SpillableSetMultimapImpl<String, String> map =
+ new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+ store.setup(testMeta.operatorContext);
+ map.setup(testMeta.operatorContext);
+
+ long nextWindowId = 0L;
+ nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
+ long activationWindow = nextWindowId;
+ store.beforeCheckpoint(nextWindowId);
+ SpillableSetMultimapImpl<String, String> clonedMap = KryoCloneUtils.cloneObject(map);
+ store.checkpointed(nextWindowId);
+ store.committed(nextWindowId);
+
+ nextWindowId++;
+
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Set<String> set1 = map.get("a");
+
+ Assert.assertEquals(11, set1.size());
+
+ Set<String> referenceSet = Sets.newHashSet("a", "b", "c", "d", "e", "f", "g", "tt", "ab", "99", "oo");
+ Assert.assertTrue(referenceSet.containsAll(set1));
+ Assert.assertTrue(set1.containsAll(referenceSet));
+
+ set1.add("111");
+
+ Assert.assertTrue(set1.contains("111"));
+
+ Assert.assertEquals(12, set1.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ map.teardown();
+ store.teardown();
+
+ map = clonedMap;
+ store = map.getStore();
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+ attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow);
+ Context.OperatorContext context =
+ new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+
+ store.setup(context);
+ map.setup(context);
+
+ nextWindowId = activationWindow + 1;
+
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(1, map.size());
+ Assert.assertTrue(map.containsKey("a"));
+ Assert.assertEquals(11, map.get("a").size());
+
+ map.endWindow();
+ store.endWindow();
+
+ map.teardown();
+ store.teardown();
+ }
+
+ @Test
+ public void testLoad()
+ {
+ Random random = new Random();
+ final int keySize = 1000000;
+ final int valueSize = 100000000;
+ final int numOfEntry = 100000;
+
+ SpillableStateStore store = testMeta.store;
+
+ SpillableByteArrayListMultimapImpl<String, String> multimap = new SpillableByteArrayListMultimapImpl<>(
+ this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+ Context.OperatorContext context =
+ new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+ store.setup(context);
+ multimap.setup(context);
+
+ store.beginWindow(1);
+ multimap.beginWindow(1);
+ for (int i = 0; i < numOfEntry; ++i) {
+ multimap.put(String.valueOf(random.nextInt(keySize)), String.valueOf(random.nextInt(valueSize)));
+ }
+ multimap.endWindow();
+ store.endWindow();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
new file mode 100644
index 0000000..b780f66
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * SerdeKryoSlice unit tests
+ */
+public class SerdeKryoSliceTest
+{
+ public static class TestPojo
+ {
+ private TestPojo()
+ {
+ }
+
+ public TestPojo(int intValue, String stringValue)
+ {
+ this.intValue = intValue;
+ this.stringValue = stringValue;
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ TestPojo o = (TestPojo)other;
+ return intValue == o.intValue && stringValue.equals(o.stringValue);
+ }
+
+ int intValue;
+ String stringValue;
+ }
+
+ @Test
+ public void stringListTest()
+ {
+ SerdeKryoSlice<ArrayList> serdeList = new SerdeKryoSlice<>(ArrayList.class);
+
+ ArrayList<String> stringList = Lists.newArrayList("a", "b", "c");
+ Slice slice = serdeList.serialize(stringList);
+ List<String> deserializedList = serdeList.deserialize(slice);
+ Assert.assertEquals(stringList, deserializedList);
+ }
+
+ @Test
+ public void pojoTest()
+ {
+ SerdeKryoSlice<TestPojo> serdePojo = new SerdeKryoSlice<>();
+ TestPojo pojo = new TestPojo(345, "xyz");
+ Slice slice = serdePojo.serialize(pojo);
+ TestPojo deserializedPojo = serdePojo.deserialize(slice);
+ Assert.assertEquals(pojo, deserializedPojo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
new file mode 100644
index 0000000..6684a9f
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SerdePairSliceTest
+{
+ @Test
+ public void simpleSerdeTest()
+ {
+ SerdePairSlice<String, Integer> serdePair = new SerdePairSlice<>(new SerdeStringSlice(), new SerdeIntSlice());
+
+ Pair<String, Integer> pair = new ImmutablePair<>("abc", 123);
+
+ Slice slice = serdePair.serialize(pair);
+
+ Pair<String, Integer> deserializedPair = serdePair.deserialize(slice);
+
+ Assert.assertEquals(pair, deserializedPair);
+ }
+}