You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by cs...@apache.org on 2016/05/20 06:51:52 UTC

[1/2] incubator-apex-malhar git commit: - APEXMALHAR-2070 #resolve #comment - Added SpillableMultiSetInterface - Added in memory implementations of SpillableArrayList, SpillableMultiset, and SpillableByteArrayListMultimap

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 27e52d8fe -> debf3c0cc


- APEXMALHAR-2070 #resolve #comment
- Added SpillableMultiSetInterface
- Added in memory implementations of SpillableArrayList, SpillableMultiset, and SpillableByteArrayListMultimap


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9319ee7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9319ee7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9319ee7c

Branch: refs/heads/master
Commit: 9319ee7c3d80d57a8f925bc180b8400186da0c1a
Parents: 2138512
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Sat Apr 30 23:23:52 2016 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu May 19 23:38:59 2016 -0700

----------------------------------------------------------------------
 .../malhar/lib/state/spillable/Spillable.java   |  20 ++-
 .../spillable/SpillableComplexComponent.java    |  42 +++--
 .../state/spillable/inmem/InMemMultiset.java    | 161 +++++++++++++++++
 .../inmem/InMemSpillableArrayList.java          | 175 +++++++++++++++++++
 .../InMemSpillableByteArrayListMultimap.java    | 154 ++++++++++++++++
 .../inmem/InMemSpillableComplexComponent.java   | 117 +++++++++++++
 .../spillable/inmem/InMemMultisetTest.java      |  44 +++++
 .../inmem/InMemSpillableArrayListTest.java      |  44 +++++
 ...InMemSpillableByteArrayListMultimapTest.java |  45 +++++
 9 files changed, 788 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9319ee7c/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
index bc7a6d5..7b55614 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Queue;
 
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multiset;
 
 import com.datatorrent.api.Component;
 import com.datatorrent.api.Context.OperatorContext;
@@ -44,8 +45,9 @@ public interface Spillable
   }
 
   /**
-   * This represents a spillable {@link java.util.Map}. The underlying implementation
-   * of this map uses the serialized representation of the key object as a key. User's that receive an
+   * This represents a spillable {@link java.util.Map}. Implementations make
+   * some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is
+   * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). User's that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.
    * @param <K> The type of the keys.
@@ -56,7 +58,9 @@ public interface Spillable
   }
 
   /**
-   * This represents a spillable {@link com.google.common.collect.ListMultimap} implementation. User's that receive an
+   * This represents a spillable {@link com.google.common.collect.ListMultimap} implementation. Implementations make
+   * some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is
+   * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). User's that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.
    * @param <K> The type of the keys.
@@ -67,6 +71,16 @@ public interface Spillable
   }
 
   /**
+   * This represents a spillable {@link com.google.common.collect.Multiset} implementation. Implementations make
+   * some assumptions about serialization and equality. Consider two elements T1 and T2. The assumption is
+   * that T1.equals(T2) should be consistent with T1.toByteArray().equals(T2.toByteArray()). User's that receive an
+   * implementation of this interface don't need to worry about propagating operator call-backs to the data structure.
+   */
+  interface SpillableByteMultiset<T> extends Multiset<T>
+  {
+  }
+
+  /**
    * This represents a spillable {@link java.util.Queue} implementation. User's that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9319ee7c/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index d157ba0..0c8c7db 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -23,7 +23,7 @@ import org.apache.apex.malhar.lib.utils.serde.Serde;
 
 import com.datatorrent.api.Component;
 import com.datatorrent.api.Context.OperatorContext;
-
+import com.datatorrent.netlet.util.Slice;
 
 /**
  * This is a composite component containing spillable data structures. This should be used as
@@ -39,7 +39,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}.
    * @return A {@link SpillableArrayList}.
    */
-  <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, byte[]> serde);
+  <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde);
 
   /**
    * This is a method for creating a {@link SpillableArrayList}.
@@ -49,7 +49,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}.
    * @return A {@link SpillableArrayList}.
    */
-  <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, byte[]> serde);
+  <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde);
 
   /**
    * This is a method for creating a {@link SpillableByteMap}. This method
@@ -61,8 +61,8 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serdeValue The Serializer/Deserializer to use for the map's values.
    * @return A {@link SpillableByteMap}.
    */
-  <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, byte[]> serdeKey,
-      Serde<V, byte[]> serdeValue);
+  <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey,
+      Serde<V, Slice> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableByteMap}.
@@ -75,7 +75,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @return A {@link SpillableByteMap}.
    */
   <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket,
-      Serde<K, byte[]> serdeKey, Serde<V, byte[]> serdeValue);
+      Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableByteArrayListMultimap}. This method
@@ -88,7 +88,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @return A {@link SpillableByteArrayListMultimap}.
    */
   <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket, Serde<K,
-      byte[]> serdeKey, Serde<V, byte[]> serdeValue);
+      Slice> serdeKey, Serde<V, Slice> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableByteArrayListMultimap}.
@@ -101,8 +101,28 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @return A {@link SpillableByteArrayListMultimap}.
    */
   <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier, long bucket,
-      Serde<K, byte[]> serdeKey,
-      Serde<V, byte[]> serdeValue);
+      Serde<K, Slice> serdeKey,
+      Serde<V, Slice> serdeValue);
+
+  /**
+   * This is a method for creating a {@link SpillableByteMultiset}. This method
+   * auto-generates an identifier for the data structure.
+   * @param <T> The type of the elements.
+   * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled too.
+   * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}.
+   * @return A {@link SpillableByteMultiset}.
+   */
+  <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde);
+
+  /**
+   * This is a method for creating a {@link SpillableByteMultiset}.
+   * @param <T> The type of the elements.
+   * @param identifier The identifier for this {@link SpillableByteMultiset}.
+   * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled too.
+   * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}.
+   * @return A {@link SpillableByteMultiset}.
+   */
+  <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde);
 
   /**
    * This is a method for creating a {@link SpillableQueue}. This method
@@ -112,7 +132,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
    * @return A {@link SpillableQueue}.
    */
-  <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, byte[]> serde);
+  <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde);
 
   /**
    * This is a method for creating a {@link SpillableQueue}.
@@ -122,5 +142,5 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
    * @return A {@link SpillableQueue}.
    */
-  <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, byte[]> serde);
+  <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9319ee7c/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java
new file mode 100644
index 0000000..fa7bf08
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable.inmem;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.HashMultiset;
+
+/**
+ * An in memory implementation of the {@link Spillable.SpillableByteMultiset} interface.
+ * @param <T> The type of the data stored in the {@link InMemMultiset}
+ */
+public class InMemMultiset<T> implements Spillable.SpillableByteMultiset<T>
+{
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private HashMultiset<T> multiset = HashMultiset.create();
+
+  @Override
+  public int count(@Nullable Object element)
+  {
+    return multiset.count(element);
+  }
+
+  @Override
+  public int add(@Nullable T element, int occurrences)
+  {
+    return multiset.add(element, occurrences);
+  }
+
+  @Override
+  public int remove(@Nullable Object element, int occurrences)
+  {
+    return multiset.remove(element, occurrences);
+  }
+
+  @Override
+  public int setCount(T element, int count)
+  {
+    return multiset.setCount(element, count);
+  }
+
+  @Override
+  public boolean setCount(T element, int oldCount, int newCount)
+  {
+    return multiset.setCount(element, oldCount, newCount);
+  }
+
+  @Override
+  public Set<T> elementSet()
+  {
+    return multiset.elementSet();
+  }
+
+  @Override
+  public Set<Entry<T>> entrySet()
+  {
+    return multiset.entrySet();
+  }
+
+  @Override
+  public Iterator<T> iterator()
+  {
+    return multiset.iterator();
+  }
+
+  @Override
+  public Object[] toArray()
+  {
+    return multiset.toArray();
+  }
+
+  @Override
+  public <T1> T1[] toArray(T1[] t1s)
+  {
+    return multiset.toArray(t1s);
+  }
+
+  @Override
+  public int size()
+  {
+    return multiset.size();
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return multiset.isEmpty();
+  }
+
+  @Override
+  public boolean contains(@Nullable Object element)
+  {
+    return multiset.contains(element);
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> es)
+  {
+    return multiset.containsAll(es);
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends T> collection)
+  {
+    return multiset.addAll(collection);
+  }
+
+  @Override
+  public boolean add(T element)
+  {
+    return multiset.add(element);
+  }
+
+  @Override
+  public boolean remove(@Nullable Object element)
+  {
+    return multiset.remove(element);
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> c)
+  {
+    return multiset.removeAll(c);
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> c)
+  {
+    return multiset.retainAll(c);
+  }
+
+  @Override
+  public void clear()
+  {
+    multiset.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9319ee7c/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java
new file mode 100644
index 0000000..9742537
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable.inmem;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.google.common.collect.Lists;
+
+/**
+ * An in memory implementation of the {@link Spillable.SpillableArrayList} interface.
+ * @param <T> The type of the data stored in the {@link InMemSpillableArrayList}
+ */
+public class InMemSpillableArrayList<T> implements Spillable.SpillableArrayList<T>
+{
+  private List<T> list = Lists.newArrayList();
+
+  @Override
+  public int size()
+  {
+    return list.size();
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return list.isEmpty();
+  }
+
+  @Override
+  public boolean contains(Object o)
+  {
+    return list.contains(o);
+  }
+
+  @Override
+  public Iterator<T> iterator()
+  {
+    return list.iterator();
+  }
+
+  @Override
+  public Object[] toArray()
+  {
+    return list.toArray();
+  }
+
+  @Override
+  public <T1> T1[] toArray(T1[] t1s)
+  {
+    return list.toArray(t1s);
+  }
+
+  @Override
+  public boolean add(T t)
+  {
+    return list.add(t);
+  }
+
+  @Override
+  public boolean remove(Object o)
+  {
+    return list.remove(o);
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> collection)
+  {
+    return list.containsAll(collection);
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends T> collection)
+  {
+    return list.addAll(collection);
+  }
+
+  @Override
+  public boolean addAll(int i, Collection<? extends T> collection)
+  {
+    return list.addAll(i, collection);
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> collection)
+  {
+    return list.removeAll(collection);
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> collection)
+  {
+    return list.retainAll(collection);
+  }
+
+  @Override
+  public void clear()
+  {
+    list.clear();
+  }
+
+  @Override
+  public T get(int i)
+  {
+    return list.get(i);
+  }
+
+  @Override
+  public T set(int i, T t)
+  {
+    return list.set(i, t);
+  }
+
+  @Override
+  public void add(int i, T t)
+  {
+    list.add(i, t);
+  }
+
+  @Override
+  public T remove(int i)
+  {
+    return list.remove(i);
+  }
+
+  @Override
+  public int indexOf(Object o)
+  {
+    return list.indexOf(o);
+  }
+
+  @Override
+  public int lastIndexOf(Object o)
+  {
+    return list.lastIndexOf(o);
+  }
+
+  @Override
+  public ListIterator<T> listIterator()
+  {
+    return list.listIterator();
+  }
+
+  @Override
+  public ListIterator<T> listIterator(int i)
+  {
+    return list.listIterator(i);
+  }
+
+  @Override
+  public List<T> subList(int i, int i1)
+  {
+    return list.subList(i, i1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9319ee7c/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java
new file mode 100644
index 0000000..8376bd5
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable.inmem;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+/**
+ * An in memory implementation of the {@link Spillable.SpillableByteArrayListMultimap} interface.
+ * @param <K> The type of the keys stored in the {@link InMemSpillableByteArrayListMultimap}
+ * @param <V> The type of the values stored in the {@link InMemSpillableByteArrayListMultimap}
+ */
+public class InMemSpillableByteArrayListMultimap<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>
+{
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private ListMultimap<K, V> multimap = ArrayListMultimap.create();
+
+  @Override
+  public List<V> get(@Nullable K key)
+  {
+    return multimap.get(key);
+  }
+
+  @Override
+  public Set<K> keySet()
+  {
+    return multimap.keySet();
+  }
+
+  @Override
+  public Multiset<K> keys()
+  {
+    return multimap.keys();
+  }
+
+  @Override
+  public Collection<V> values()
+  {
+    return multimap.values();
+  }
+
+  @Override
+  public Collection<Map.Entry<K, V>> entries()
+  {
+    return multimap.entries();
+  }
+
+  @Override
+  public List<V> removeAll(@Nullable Object key)
+  {
+    return multimap.removeAll(key);
+  }
+
+  @Override
+  public void clear()
+  {
+    multimap.clear();
+  }
+
+  @Override
+  public int size()
+  {
+    return multimap.size();
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return multimap.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(@Nullable Object key)
+  {
+    return multimap.containsKey(key);
+  }
+
+  @Override
+  public boolean containsValue(@Nullable Object value)
+  {
+    return multimap.containsValue(value);
+  }
+
+  @Override
+  public boolean containsEntry(@Nullable Object key, @Nullable Object value)
+  {
+    return multimap.containsEntry(key, value);
+  }
+
+  @Override
+  public boolean put(@Nullable K key, @Nullable V value)
+  {
+    return multimap.put(key, value);
+  }
+
+  @Override
+  public boolean remove(@Nullable Object key, @Nullable Object value)
+  {
+    return multimap.remove(key, value);
+  }
+
+  @Override
+  public boolean putAll(@Nullable K key, Iterable<? extends V> values)
+  {
+    return multimap.putAll(key, values);
+  }
+
+  @Override
+  public boolean putAll(Multimap<? extends K, ? extends V> m)
+  {
+    return multimap.putAll(m);
+  }
+
+  @Override
+  public List<V> replaceValues(K key, Iterable<? extends V> values)
+  {
+    return multimap.replaceValues(key, values);
+  }
+
+  @Override
+  public Map<K, Collection<V>> asMap()
+  {
+    return multimap.asMap();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9319ee7c/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java
new file mode 100644
index 0000000..25e8b2c
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable.inmem;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An in memory implementation {@link SpillableComplexComponent}
+ */
+public class InMemSpillableComplexComponent implements SpillableComplexComponent
+{
+  @Override
+  public <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
+  {
+    return new InMemSpillableArrayList<>();
+  }
+
+  @Override
+  public <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket,
+      Serde<T, Slice> serde)
+  {
+    return new InMemSpillableArrayList<>();
+  }
+
+  @Override
+  public <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey,
+      Serde<V, Slice> serdeValue)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket,
+      Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket,
+      Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue)
+  {
+    return new InMemSpillableByteArrayListMultimap<>();
+  }
+
+  @Override
+  public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier,
+      long bucket, Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue)
+  {
+    return  new InMemSpillableByteArrayListMultimap<>();
+  }
+
+  @Override
+  public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde)
+  {
+    return new InMemMultiset<>();
+  }
+
+  @Override
+  public <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket,
+      Serde<T, Slice> serde)
+  {
+    return new InMemMultiset<>();
+  }
+
+  @Override
+  public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9319ee7c/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java
new file mode 100644
index 0000000..d73bd95
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed.spillable.inmem;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemMultiset;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class InMemMultisetTest
+{
+  @Test
+  public void serializationTest()
+  {
+    InMemMultiset<String> set = new InMemMultiset<>();
+
+    set.add("a");
+    set.add("a");
+
+    InMemMultiset<String> cloned = KryoCloneUtils.cloneObject(new Kryo(), set);
+
+    Assert.assertEquals(2, cloned.count("a"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9319ee7c/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java
new file mode 100644
index 0000000..573982a
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed.spillable.inmem;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableArrayList;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class InMemSpillableArrayListTest
+{
+  @Test
+  public void serializationTest()
+  {
+    InMemSpillableArrayList<String> list = new InMemSpillableArrayList<>();
+
+    list.add("a");
+    list.add("a");
+
+    InMemSpillableArrayList<String> cloned = KryoCloneUtils.cloneObject(new Kryo(), list);
+
+    Assert.assertEquals(2, list.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9319ee7c/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java
new file mode 100644
index 0000000..a6bf811
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed.spillable.inmem;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableByteArrayListMultimap;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class InMemSpillableByteArrayListMultimapTest
+{
+  @Test
+  public void serializationTest()
+  {
+    InMemSpillableByteArrayListMultimap<String, String> multimap =
+        new InMemSpillableByteArrayListMultimap<String, String>();
+
+    multimap.put("a", "b");
+    multimap.put("a", "c");
+
+    InMemSpillableByteArrayListMultimap<String, String> cloned = KryoCloneUtils.cloneObject(new Kryo(), multimap);
+
+    Assert.assertEquals(2, cloned.get("a").size());
+  }
+}


[2/2] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2070'

Posted by cs...@apache.org.
Merge branch 'APEXMALHAR-2070'


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/debf3c0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/debf3c0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/debf3c0c

Branch: refs/heads/master
Commit: debf3c0cc291a6122967b958b32ff575ec8e6f75
Parents: 27e52d8 9319ee7
Author: Chandni Singh <cs...@apache.org>
Authored: Thu May 19 23:42:10 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu May 19 23:42:10 2016 -0700

----------------------------------------------------------------------
 .../malhar/lib/state/spillable/Spillable.java   |  20 ++-
 .../spillable/SpillableComplexComponent.java    |  42 +++--
 .../state/spillable/inmem/InMemMultiset.java    | 161 +++++++++++++++++
 .../inmem/InMemSpillableArrayList.java          | 175 +++++++++++++++++++
 .../InMemSpillableByteArrayListMultimap.java    | 154 ++++++++++++++++
 .../inmem/InMemSpillableComplexComponent.java   | 117 +++++++++++++
 .../spillable/inmem/InMemMultisetTest.java      |  44 +++++
 .../inmem/InMemSpillableArrayListTest.java      |  44 +++++
 ...InMemSpillableByteArrayListMultimapTest.java |  45 +++++
 9 files changed, 788 insertions(+), 14 deletions(-)
----------------------------------------------------------------------