You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/09/27 23:08:27 UTC
[2/2] apex-malhar git commit: APEXMALHAR-2267 #resolve renamed
spillable data structures to remove the word "Byte"
APEXMALHAR-2267 #resolve renamed spillable data structures to remove the word "Byte"
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f5f1943d
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f5f1943d
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f5f1943d
Branch: refs/heads/master
Commit: f5f1943d2bcc2b61240fab649828c1aaf520d22b
Parents: c19c80d
Author: David Yan <da...@datatorrent.com>
Authored: Fri Sep 23 12:46:20 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue Sep 27 14:26:46 2016 -0700
----------------------------------------------------------------------
.../lib/join/AbstractInnerJoinOperator.java | 12 +-
.../AbstractManagedStateInnerJoinOperator.java | 4 +-
.../managed/ManagedTimeStateMultiValue.java | 4 +-
.../malhar/lib/state/spillable/Spillable.java | 21 +-
.../state/spillable/SpillableArrayListImpl.java | 6 +-
.../SpillableArrayListMultimapImpl.java | 310 ++++++++++++
.../SpillableByteArrayListMultimapImpl.java | 310 ------------
.../state/spillable/SpillableByteMapImpl.java | 237 ---------
.../spillable/SpillableComplexComponent.java | 86 ++--
.../SpillableComplexComponentImpl.java | 24 +-
.../lib/state/spillable/SpillableMapImpl.java | 237 +++++++++
.../lib/state/spillable/SpillableSetImpl.java | 4 +-
.../spillable/SpillableSetMultimapImpl.java | 4 +-
.../impl/SpillableWindowedKeyedStorage.java | 4 +-
.../impl/SpillableWindowedPlainStorage.java | 4 +-
.../SpillableArrayListMultimapImplTest.java | 370 ++++++++++++++
.../SpillableByteArrayListMultimapImplTest.java | 371 --------------
.../spillable/SpillableByteMapImplTest.java | 484 -------------------
.../SpillableComplexComponentImplTest.java | 2 +-
.../state/spillable/SpillableMapImplTest.java | 484 +++++++++++++++++++
.../spillable/SpillableSetMultimapImplTest.java | 3 +-
.../malhar/lib/window/WindowedOperatorTest.java | 4 +-
22 files changed, 1495 insertions(+), 1490 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
index 816ca58..c1ebdd5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
@@ -88,8 +88,8 @@ public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
private boolean isLeftKeyPrimary = false;
private boolean isRightKeyPrimary = false;
protected SpillableComplexComponent component;
- protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data;
- protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data;
+ protected Spillable.SpillableListMultimap<K,T> stream1Data;
+ protected Spillable.SpillableListMultimap<K,T> stream2Data;
/**
* Process the tuple which are received from input ports with the following steps:
@@ -103,12 +103,12 @@ public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
*/
protected void processTuple(T tuple, boolean isStream1Data)
{
- Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
+ Spillable.SpillableListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
K key = extractKey(tuple,isStream1Data);
if (!store.put(key, tuple)) {
return;
}
- Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
+ Spillable.SpillableListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
joinStream(tuple,isStream1Data, valuestore.get(key));
}
@@ -210,8 +210,8 @@ public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
*/
public void createStores()
{
- stream1Data = component.newSpillableByteArrayListMultimap(0,null,null);
- stream2Data = component.newSpillableByteArrayListMultimap(0,null,null);
+ stream1Data = component.newSpillableArrayListMultimap(0,null,null);
+ stream2Data = component.newSpillableArrayListMultimap(0,null,null);
}
/**
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
index 8b19ebc..c82c3e3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
@@ -93,13 +93,13 @@ public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends Abstrac
@Override
protected void processTuple(T tuple, boolean isStream1Data)
{
- Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
+ Spillable.SpillableListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
K key = extractKey(tuple,isStream1Data);
long timeBucket = extractTime(tuple,isStream1Data);
if (!((ManagedTimeStateMultiValue)store).put(key, tuple,timeBucket)) {
return;
}
- Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
+ Spillable.SpillableListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
Future<List> future = ((ManagedTimeStateMultiValue)valuestore).getAsync(key);
if (future.isDone()) {
try {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
index 3ca43a4..beeeb4e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
@@ -42,7 +42,7 @@ import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
import com.datatorrent.netlet.util.Slice;
/**
- * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator.
+ * Concrete implementation of SpillableListMultimap which is needed for join operator.
*
* <b>Properties:</b><br>
* <b>isKeyContainsMultiValue</b>: Specifies whether the key has multiple value or not. <br>
@@ -52,7 +52,7 @@ import com.datatorrent.netlet.util.Slice;
* @since 3.5.0
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
-public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableByteArrayListMultimap<K,V>
+public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListMultimap<K,V>
{
private transient StreamCodec streamCodec = null;
private boolean isKeyContainsMultiValue = false;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
index 849389b..6b765a8 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
@@ -38,13 +38,13 @@ import com.datatorrent.api.Context.OperatorContext;
public interface Spillable
{
/**
- * This represents a spillable {@link java.util.List}. The underlying implementation
- * of this list is similar to that of an {@link java.util.ArrayList}. Users that receive an
+ * This represents a spillable {@link java.util.List}. Users that receive an
* implementation of this interface don't need to worry about propagating operator call-backs
* to the data structure.
- * @param <T> The type of the data stored in the {@link SpillableArrayList}.
+ *
+ * @param <T> The type of the data stored in the {@link SpillableList}.
*/
- interface SpillableArrayList<T> extends List<T>
+ interface SpillableList<T> extends List<T>
{
}
@@ -52,6 +52,7 @@ public interface Spillable
* This represents a spillable {@link java.util.Set}. Users that receive an
* implementation of this interface don't need to worry about propagating operator call-backs
* to the data structure.
+ *
* @param <T> The type of the data stored in the {@link SpillableSet}.
*/
interface SpillableSet<T> extends Set<T>
@@ -64,10 +65,11 @@ public interface Spillable
* that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an
* implementation of this interface don't need to worry about propagating operator call-backs
* to the data structure.
+ *
* @param <K> The type of the keys.
* @param <V> The type of the values.
*/
- interface SpillableByteMap<K, V> extends Map<K, V>
+ interface SpillableMap<K, V> extends Map<K, V>
{
}
@@ -77,10 +79,11 @@ public interface Spillable
* that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an
* implementation of this interface don't need to worry about propagating operator call-backs
* to the data structure.
+ *
* @param <K> The type of the keys.
* @param <V> The type of the values.
*/
- interface SpillableByteArrayListMultimap<K, V> extends ListMultimap<K, V>
+ interface SpillableListMultimap<K, V> extends ListMultimap<K, V>
{
}
@@ -90,6 +93,7 @@ public interface Spillable
* that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an
* implementation of this interface don't need to worry about propagating operator call-backs
* to the data structure.
+ *
* @param <K> The type of the keys.
* @param <V> The type of the values.
*/
@@ -102,8 +106,10 @@ public interface Spillable
* some assumptions about serialization and equality. Consider two elements T1 and T2. The assumption is
* that T1.equals(T2) should be consistent with T1.toByteArray().equals(T2.toByteArray()). Users that receive an
* implementation of this interface don't need to worry about propagating operator call-backs to the data structure.
+ *
+ * @param <T> The type of the data stored in the set.
*/
- interface SpillableByteMultiset<T> extends Multiset<T>
+ interface SpillableMultiset<T> extends Multiset<T>
{
}
@@ -111,6 +117,7 @@ public interface Spillable
* This represents a spillable {@link java.util.Queue} implementation. Users that receive an
* implementation of this interface don't need to worry about propagating operator call-backs
* to the data structure.
+ *
* @param <T> The type of the data stored in the queue.
*/
interface SpillableQueue<T> extends Queue<T>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
index 4ea1923..a59872c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
@@ -47,7 +47,7 @@ import com.datatorrent.netlet.util.Slice;
*/
@DefaultSerializer(FieldSerializer.class)
@InterfaceStability.Evolving
-public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T>, Spillable.SpillableComponent
+public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Spillable.SpillableComponent
{
public static final int DEFAULT_BATCH_SIZE = 1000;
@@ -60,7 +60,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T
@NotNull
private Serde<T, Slice> serde;
@NotNull
- private SpillableByteMapImpl<Integer, List<T>> map;
+ private SpillableMapImpl<Integer, List<T>> map;
private boolean sizeCached = false;
private int size;
@@ -93,7 +93,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T
this.store = Preconditions.checkNotNull(store);
this.serde = Preconditions.checkNotNull(serde);
- map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(),
+ map = new SpillableMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(),
new SerdeCollectionSlice<>(serde, (Class<List<T>>)(Class)ArrayList.class));
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
new file mode 100644
index 0000000..0944583
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}.
+ *
+ * @since 3.5.0
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableArrayListMultimapImpl<K, V> implements Spillable.SpillableListMultimap<K, V>,
+ Spillable.SpillableComponent
+{
+ public static final int DEFAULT_BATCH_SIZE = 1000;
+ public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
+
+ private transient WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
+ private transient boolean isRunning = false;
+ private transient boolean isInWindow = false;
+
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ @NotNull
+ private SpillableMapImpl<Slice, Integer> map;
+ private SpillableStateStore store;
+ private byte[] identifier;
+ private long bucket;
+ private Serde<K, Slice> serdeKey;
+ private Serde<V, Slice> serdeValue;
+
+ private SpillableArrayListMultimapImpl()
+ {
+ // for kryo
+ }
+
+ /**
+ * Creates a {@link SpillableArrayListMultimapImpl}.
+ * @param store The {@link SpillableStateStore} in which to spill to.
+ * @param identifier The Id of this {@link SpillableArrayListMultimapImpl}.
+ * @param bucket The Id of the bucket used to store this
+ * {@link SpillableArrayListMultimapImpl} in the provided {@link SpillableStateStore}.
+ * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+ * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+ */
+ public SpillableArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
+ Serde<K, Slice> serdeKey,
+ Serde<V, Slice> serdeValue)
+ {
+ this.store = Preconditions.checkNotNull(store);
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.bucket = bucket;
+ this.serdeKey = Preconditions.checkNotNull(serdeKey);
+ this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+ map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice());
+ }
+
+ public SpillableStateStore getStore()
+ {
+ return store;
+ }
+
+ @Override
+ public List<V> get(@Nullable K key)
+ {
+ return getHelper(key);
+ }
+
+ private SpillableArrayListImpl<V> getHelper(@Nullable K key)
+ {
+ SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
+
+ if (spillableArrayList == null) {
+ Slice keySlice = serdeKey.serialize(key);
+ Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX));
+
+ if (size == null) {
+ return null;
+ }
+
+ Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
+ spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+ spillableArrayList.setSize(size);
+ }
+
+ cache.put(key, spillableArrayList);
+
+ return spillableArrayList;
+ }
+
+ @Override
+ public Set<K> keySet()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Multiset<K> keys()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<V> values()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<Map.Entry<K, V>> entries()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<V> removeAll(@Nullable Object key)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size()
+ {
+ // TODO: This is actually wrong since in a Multimap, size() should return the number of entries, not the number of distinct keys
+ return map.size();
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return map.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(@Nullable Object key)
+ {
+ return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key),
+ SIZE_KEY_SUFFIX));
+ }
+
+ @Override
+ public boolean containsValue(@Nullable Object value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsEntry(@Nullable Object key, @Nullable Object value)
+ {
+ SpillableArrayListImpl<V> spillableArrayList = getHelper((K)key);
+ if (spillableArrayList == null) {
+ return false;
+ }
+ for (int i = 0; i < spillableArrayList.size(); i++) {
+ V v = spillableArrayList.get(i);
+ if (v == null) {
+ if (value == null) {
+ return true;
+ }
+ } else {
+ if (v.equals(value)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean put(@Nullable K key, @Nullable V value)
+ {
+ SpillableArrayListImpl<V> spillableArrayList = getHelper(key);
+
+ if (spillableArrayList == null) {
+ Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
+ spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+
+ cache.put(key, spillableArrayList);
+ }
+
+ spillableArrayList.add(value);
+ return true;
+ }
+
+ @Override
+ public boolean remove(@Nullable Object key, @Nullable Object value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean putAll(@Nullable K key, Iterable<? extends V> values)
+ {
+ boolean changed = false;
+
+ for (V value: values) {
+ changed |= put(key, value);
+ }
+
+ return changed;
+ }
+
+ @Override
+ public boolean putAll(Multimap<? extends K, ? extends V> multimap)
+ {
+ boolean changed = false;
+
+ for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) {
+ changed |= put(entry.getKey(), entry.getValue());
+ }
+
+ return changed;
+ }
+
+ @Override
+ public List<V> replaceValues(K key, Iterable<? extends V> values)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<K, Collection<V>> asMap()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ map.setup(context);
+ isRunning = true;
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ map.beginWindow(windowId);
+ isInWindow = true;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ isInWindow = false;
+ for (K key: cache.getChangedKeys()) {
+
+ SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
+ spillableArrayList.endWindow();
+
+ Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX),
+ spillableArrayList.size());
+ }
+
+ Preconditions.checkState(cache.getRemovedKeys().isEmpty());
+ cache.endWindow();
+ map.endWindow();
+ }
+
+ @Override
+ public void teardown()
+ {
+ isRunning = false;
+ map.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
deleted file mode 100644
index c0466bd..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-import javax.validation.constraints.NotNull;
-
-import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
-import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multiset;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}.
- *
- * @since 3.5.0
- */
-@DefaultSerializer(FieldSerializer.class)
-@InterfaceStability.Evolving
-public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
- Spillable.SpillableComponent
-{
- public static final int DEFAULT_BATCH_SIZE = 1000;
- public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
-
- private transient WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
- private transient boolean isRunning = false;
- private transient boolean isInWindow = false;
-
- private int batchSize = DEFAULT_BATCH_SIZE;
- @NotNull
- private SpillableByteMapImpl<Slice, Integer> map;
- private SpillableStateStore store;
- private byte[] identifier;
- private long bucket;
- private Serde<K, Slice> serdeKey;
- private Serde<V, Slice> serdeValue;
-
- private SpillableByteArrayListMultimapImpl()
- {
- // for kryo
- }
-
- /**
- * Creates a {@link SpillableByteArrayListMultimapImpl}.
- * @param store The {@link SpillableStateStore} in which to spill to.
- * @param identifier The Id of this {@link SpillableByteArrayListMultimapImpl}.
- * @param bucket The Id of the bucket used to store this
- * {@link SpillableByteArrayListMultimapImpl} in the provided {@link SpillableStateStore}.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
- */
- public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
- Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
- {
- this.store = Preconditions.checkNotNull(store);
- this.identifier = Preconditions.checkNotNull(identifier);
- this.bucket = bucket;
- this.serdeKey = Preconditions.checkNotNull(serdeKey);
- this.serdeValue = Preconditions.checkNotNull(serdeValue);
-
- map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice());
- }
-
- public SpillableStateStore getStore()
- {
- return store;
- }
-
- @Override
- public List<V> get(@Nullable K key)
- {
- return getHelper(key);
- }
-
- private SpillableArrayListImpl<V> getHelper(@Nullable K key)
- {
- SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
-
- if (spillableArrayList == null) {
- Slice keySlice = serdeKey.serialize(key);
- Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX));
-
- if (size == null) {
- return null;
- }
-
- Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
- spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
- spillableArrayList.setSize(size);
- }
-
- cache.put(key, spillableArrayList);
-
- return spillableArrayList;
- }
-
- @Override
- public Set<K> keySet()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Multiset<K> keys()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Collection<V> values()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Collection<Map.Entry<K, V>> entries()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<V> removeAll(@Nullable Object key)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void clear()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int size()
- {
- // TODO: This is actually wrong since in a Multimap, size() should return the number of entries, not the number of distinct keys
- return map.size();
- }
-
- @Override
- public boolean isEmpty()
- {
- return map.isEmpty();
- }
-
- @Override
- public boolean containsKey(@Nullable Object key)
- {
- return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key),
- SIZE_KEY_SUFFIX));
- }
-
- @Override
- public boolean containsValue(@Nullable Object value)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean containsEntry(@Nullable Object key, @Nullable Object value)
- {
- SpillableArrayListImpl<V> spillableArrayList = getHelper((K)key);
- if (spillableArrayList == null) {
- return false;
- }
- for (int i = 0; i < spillableArrayList.size(); i++) {
- V v = spillableArrayList.get(i);
- if (v == null) {
- if (value == null) {
- return true;
- }
- } else {
- if (v.equals(value)) {
- return true;
- }
- }
- }
- return false;
- }
-
- @Override
- public boolean put(@Nullable K key, @Nullable V value)
- {
- SpillableArrayListImpl<V> spillableArrayList = getHelper(key);
-
- if (spillableArrayList == null) {
- Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
- spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
-
- cache.put(key, spillableArrayList);
- }
-
- spillableArrayList.add(value);
- return true;
- }
-
- @Override
- public boolean remove(@Nullable Object key, @Nullable Object value)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean putAll(@Nullable K key, Iterable<? extends V> values)
- {
- boolean changed = false;
-
- for (V value: values) {
- changed |= put(key, value);
- }
-
- return changed;
- }
-
- @Override
- public boolean putAll(Multimap<? extends K, ? extends V> multimap)
- {
- boolean changed = false;
-
- for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) {
- changed |= put(entry.getKey(), entry.getValue());
- }
-
- return changed;
- }
-
- @Override
- public List<V> replaceValues(K key, Iterable<? extends V> values)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<K, Collection<V>> asMap()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- map.setup(context);
- isRunning = true;
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- map.beginWindow(windowId);
- isInWindow = true;
- }
-
- @Override
- public void endWindow()
- {
- isInWindow = false;
- for (K key: cache.getChangedKeys()) {
-
- SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
- spillableArrayList.endWindow();
-
- Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX),
- spillableArrayList.size());
- }
-
- Preconditions.checkState(cache.getRemovedKeys().isEmpty());
- cache.endWindow();
- map.endWindow();
- }
-
- @Override
- public void teardown()
- {
- isRunning = false;
- map.teardown();
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
deleted file mode 100644
index f36f2dc..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.apex.malhar.lib.state.BucketedState;
-import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.google.common.base.Preconditions;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * A Spillable implementation of {@link Map}
- * @param <K> The types of keys.
- * @param <V> The types of values.
- *
- * @since 3.5.0
- */
-@DefaultSerializer(FieldSerializer.class)
-@InterfaceStability.Evolving
-public class SpillableByteMapImpl<K, V> implements Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent,
- Serializable
-{
- private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
- private transient MutableInt tempOffset = new MutableInt();
-
- @NotNull
- private SpillableStateStore store;
- @NotNull
- private byte[] identifier;
- private long bucket;
- @NotNull
- private Serde<K, Slice> serdeKey;
- @NotNull
- private Serde<V, Slice> serdeValue;
-
- private int size = 0;
-
- private SpillableByteMapImpl()
- {
- //for kryo
- }
-
- /**
- * Creats a {@link SpillableByteMapImpl}.
- * @param store The {@link SpillableStateStore} in which to spill to.
- * @param identifier The Id of this {@link SpillableByteMapImpl}.
- * @param bucket The Id of the bucket used to store this
- * {@link SpillableByteMapImpl} in the provided {@link SpillableStateStore}.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
- */
- public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
- {
- this.store = Preconditions.checkNotNull(store);
- this.identifier = Preconditions.checkNotNull(identifier);
- this.bucket = bucket;
- this.serdeKey = Preconditions.checkNotNull(serdeKey);
- this.serdeValue = Preconditions.checkNotNull(serdeValue);
- }
-
- public SpillableStateStore getStore()
- {
- return this.store;
- }
-
- @Override
- public int size()
- {
- return size;
- }
-
- @Override
- public boolean isEmpty()
- {
- return size == 0;
- }
-
- @Override
- public boolean containsKey(Object o)
- {
- return get(o) != null;
- }
-
- @Override
- public boolean containsValue(Object o)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public V get(Object o)
- {
- K key = (K)o;
-
- if (cache.getRemovedKeys().contains(key)) {
- return null;
- }
-
- V val = cache.get(key);
-
- if (val != null) {
- return val;
- }
-
- Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
-
- if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
- return null;
- }
-
- tempOffset.setValue(0);
- return serdeValue.deserialize(valSlice, tempOffset);
- }
-
- @Override
- public V put(K k, V v)
- {
- V value = get(k);
-
- if (value == null) {
- size++;
- }
-
- cache.put(k, v);
-
- return value;
- }
-
- @Override
- public V remove(Object o)
- {
- V value = get(o);
-
- if (value != null) {
- size--;
- }
-
- cache.remove((K)o);
-
- return value;
- }
-
- @Override
- public void putAll(Map<? extends K, ? extends V> map)
- {
- for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
- put(entry.getKey(), entry.getValue());
- }
- }
-
- @Override
- public void clear()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Set<K> keySet()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Collection<V> values()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Set<Entry<K, V>> entrySet()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- }
-
- @Override
- public void endWindow()
- {
- for (K key: cache.getChangedKeys()) {
- store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
- serdeValue.serialize(cache.get(key)));
- }
-
- for (K key: cache.getRemovedKeys()) {
- store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
- new Slice(ArrayUtils.EMPTY_BYTE_ARRAY));
- }
-
- cache.endWindow();
- }
-
- @Override
- public void teardown()
- {
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index e4836c4..c4462d5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -36,75 +36,75 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
Operator.CheckpointNotificationListener
{
/**
- * This is a method for creating a {@link SpillableArrayList}. This method
+ * This is a method for creating a {@link SpillableList}. This method
* auto-generates an identifier for the data structure.
- * @param <T> The type of data stored in the {@link SpillableArrayList}.
- * @param bucket The bucket that this {@link SpillableArrayList} will be spilled to.
- * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}.
- * @return A {@link SpillableArrayList}.
+ * @param <T> The type of data stored in the {@link SpillableList}.
+ * @param bucket The bucket that this {@link SpillableList} will be spilled to.
+ * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}.
+ * @return A {@link SpillableList}.
*/
- <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde);
+ <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde);
/**
- * This is a method for creating a {@link SpillableArrayList}.
- * @param <T> The type of data stored in the {@link SpillableArrayList}.
- * @param identifier The identifier for this {@link SpillableArrayList}.
- * @param bucket The bucket that this {@link SpillableArrayList} will be spilled to.
- * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}.
- * @return A {@link SpillableArrayList}.
+ * This is a method for creating a {@link SpillableList}.
+ * @param <T> The type of data stored in the {@link SpillableList}.
+ * @param identifier The identifier for this {@link SpillableList}.
+ * @param bucket The bucket that this {@link SpillableList} will be spilled to.
+ * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}.
+ * @return A {@link SpillableList}.
*/
- <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde);
+ <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde);
/**
- * This is a method for creating a {@link SpillableByteMap}. This method
+ * This is a method for creating a {@link SpillableMap}. This method
* auto-generates an identifier for the data structure.
* @param <K> The type of the keys.
* @param <V> The type of the values.
- * @param bucket The bucket that this {@link SpillableByteMap} will be spilled to.
+ * @param bucket The bucket that this {@link SpillableMap} will be spilled to.
* @param serdeKey The Serializer/Deserializer to use for the map's keys.
* @param serdeValue The Serializer/Deserializer to use for the map's values.
- * @return A {@link SpillableByteMap}.
+ * @return A {@link SpillableMap}.
*/
- <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey,
+ <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey,
Serde<V, Slice> serdeValue);
/**
- * This is a method for creating a {@link SpillableByteMap}.
+ * This is a method for creating a {@link SpillableMap}.
* @param <K> The type of the keys.
* @param <V> The type of the values.
- * @param identifier The identifier for this {@link SpillableByteMap}.
- * @param bucket The bucket that this {@link SpillableByteMap} will be spilled to.
+ * @param identifier The identifier for this {@link SpillableMap}.
+ * @param bucket The bucket that this {@link SpillableMap} will be spilled to.
* @param serdeKey The Serializer/Deserializer to use for the map's keys.
* @param serdeValue The Serializer/Deserializer to use for the map's values.
- * @return A {@link SpillableByteMap}.
+ * @return A {@link SpillableMap}.
*/
- <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket,
+ <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket,
Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue);
/**
- * This is a method for creating a {@link SpillableByteArrayListMultimap}. This method
+ * This is a method for creating a {@link SpillableListMultimap}. This method
* auto-generates an identifier for the data structure.
* @param <K> The type of the keys.
* @param <V> The type of the values in the map's lists.
- * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled to.
+ * @param bucket The bucket that this {@link SpillableListMultimap} will be spilled to.
* @param serdeKey The Serializer/Deserializer to use for the map's keys.
* @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
- * @return A {@link SpillableByteArrayListMultimap}.
+ * @return A {@link SpillableListMultimap}.
*/
- <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket, Serde<K,
+ <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K,
Slice> serdeKey, Serde<V, Slice> serdeValue);
/**
- * This is a method for creating a {@link SpillableByteArrayListMultimap}.
+ * This is a method for creating a {@link SpillableListMultimap}.
* @param <K> The type of the keys.
* @param <V> The type of the values in the map's lists.
- * @param identifier The identifier for this {@link SpillableByteArrayListMultimap}.
- * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled to.
+ * @param identifier The identifier for this {@link SpillableListMultimap}.
+ * @param bucket The bucket that this {@link SpillableListMultimap} will be spilled to.
* @param serdeKey The Serializer/Deserializer to use for the map's keys.
* @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
- * @return A {@link SpillableByteArrayListMultimap}.
+ * @return A {@link SpillableListMultimap}.
*/
- <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier, long bucket,
+ <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket,
Serde<K, Slice> serdeKey,
Serde<V, Slice> serdeValue);
@@ -121,24 +121,24 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
Slice> serdeKey, Serde<V, Slice> serdeValue);
/**
- * This is a method for creating a {@link SpillableByteMultiset}. This method
+ * This is a method for creating a {@link SpillableMultiset}. This method
* auto-generates an identifier for the data structure.
* @param <T> The type of the elements.
- * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled to.
- * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}.
- * @return A {@link SpillableByteMultiset}.
+ * @param bucket The bucket that this {@link SpillableMultiset} will be spilled to.
+ * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}.
+ * @return A {@link SpillableMultiset}.
*/
- <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde);
+ <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde);
/**
- * This is a method for creating a {@link SpillableByteMultiset}.
+ * This is a method for creating a {@link SpillableMultiset}.
* @param <T> The type of the elements.
- * @param identifier The identifier for this {@link SpillableByteMultiset}.
- * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled to.
- * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}.
- * @return A {@link SpillableByteMultiset}.
+ * @param identifier The identifier for this {@link SpillableMultiset}.
+ * @param bucket The bucket that this {@link SpillableMultiset} will be spilled to.
+ * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}.
+ * @return A {@link SpillableMultiset}.
*/
- <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde);
+ <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde);
/**
* This is a method for creating a {@link SpillableQueue}. This method
@@ -153,7 +153,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
/**
* This is a method for creating a {@link SpillableQueue}.
* @param <T> The type of the data stored in the {@link SpillableQueue}.
- * @param identifier The identifier for this {@link SpillableByteArrayListMultimap}.
+ * @param identifier The identifier for this {@link SpillableListMultimap}.
* @param bucket The bucket that this {@link SpillableQueue} will be spilled to.
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
* @return A {@link SpillableQueue}.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
index 9c3defc..aad219d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -66,14 +66,14 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
this.identifierGenerator = Preconditions.checkNotNull(identifierGenerator);
}
- public <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
+ public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
{
SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde);
componentList.add(list);
return list;
}
- public <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde)
+ public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde)
{
identifierGenerator.register(identifier);
SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde);
@@ -81,39 +81,39 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
return list;
}
- public <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey,
+ public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey,
Serde<V, Slice> serdeValue)
{
- SpillableByteMapImpl<K, V> map = new SpillableByteMapImpl<K, V>(store, identifierGenerator.next(),
+ SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifierGenerator.next(),
bucket, serdeKey, serdeValue);
componentList.add(map);
return map;
}
- public <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
+ public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
Serde<V, Slice> serdeValue)
{
identifierGenerator.register(identifier);
- SpillableByteMapImpl<K, V> map = new SpillableByteMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue);
+ SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue);
componentList.add(map);
return map;
}
- public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket, Serde<K,
+ public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K,
Slice> serdeKey, Serde<V, Slice> serdeValue)
{
- SpillableByteArrayListMultimapImpl<K, V> map = new SpillableByteArrayListMultimapImpl<K, V>(store,
+ SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
identifierGenerator.next(), bucket, serdeKey, serdeValue);
componentList.add(map);
return map;
}
- public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier, long bucket,
+ public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket,
Serde<K, Slice> serdeKey,
Serde<V, Slice> serdeValue)
{
identifierGenerator.register(identifier);
- SpillableByteArrayListMultimapImpl<K, V> map = new SpillableByteArrayListMultimapImpl<K, V>(store,
+ SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
identifier, bucket, serdeKey, serdeValue);
componentList.add(map);
return map;
@@ -128,12 +128,12 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
return map;
}
- public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde)
+ public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde)
{
throw new UnsupportedOperationException("Unsupported Operation");
}
- public <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde)
+ public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde)
{
throw new UnsupportedOperationException("Unsupported Operation");
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
new file mode 100644
index 0000000..016aeec
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A Spillable implementation of {@link Map}
+ * @param <K> The types of keys.
+ * @param <V> The types of values.
+ *
+ * @since 3.5.0
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spillable.SpillableComponent,
+ Serializable
+{
+ private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
+ private transient MutableInt tempOffset = new MutableInt();
+
+ @NotNull
+ private SpillableStateStore store;
+ @NotNull
+ private byte[] identifier;
+ private long bucket;
+ @NotNull
+ private Serde<K, Slice> serdeKey;
+ @NotNull
+ private Serde<V, Slice> serdeValue;
+
+ private int size = 0;
+
+ private SpillableMapImpl()
+ {
+ //for kryo
+ }
+
+ /**
+ * Creats a {@link SpillableMapImpl}.
+ * @param store The {@link SpillableStateStore} in which to spill to.
+ * @param identifier The Id of this {@link SpillableMapImpl}.
+ * @param bucket The Id of the bucket used to store this
+ * {@link SpillableMapImpl} in the provided {@link SpillableStateStore}.
+ * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+ * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+ */
+ public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
+ Serde<V, Slice> serdeValue)
+ {
+ this.store = Preconditions.checkNotNull(store);
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.bucket = bucket;
+ this.serdeKey = Preconditions.checkNotNull(serdeKey);
+ this.serdeValue = Preconditions.checkNotNull(serdeValue);
+ }
+
+ public SpillableStateStore getStore()
+ {
+ return this.store;
+ }
+
+ @Override
+ public int size()
+ {
+ return size;
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return size == 0;
+ }
+
+ @Override
+ public boolean containsKey(Object o)
+ {
+ return get(o) != null;
+ }
+
+ @Override
+ public boolean containsValue(Object o)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public V get(Object o)
+ {
+ K key = (K)o;
+
+ if (cache.getRemovedKeys().contains(key)) {
+ return null;
+ }
+
+ V val = cache.get(key);
+
+ if (val != null) {
+ return val;
+ }
+
+ Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
+
+ if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
+ return null;
+ }
+
+ tempOffset.setValue(0);
+ return serdeValue.deserialize(valSlice, tempOffset);
+ }
+
+ @Override
+ public V put(K k, V v)
+ {
+ V value = get(k);
+
+ if (value == null) {
+ size++;
+ }
+
+ cache.put(k, v);
+
+ return value;
+ }
+
+ @Override
+ public V remove(Object o)
+ {
+ V value = get(o);
+
+ if (value != null) {
+ size--;
+ }
+
+ cache.remove((K)o);
+
+ return value;
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> map)
+ {
+ for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void clear()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<K> keySet()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<V> values()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<Entry<K, V>> entrySet()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ for (K key: cache.getChangedKeys()) {
+ store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
+ serdeValue.serialize(cache.get(key)));
+ }
+
+ for (K key: cache.getRemovedKeys()) {
+ store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
+ new Slice(ArrayUtils.EMPTY_BYTE_ARRAY));
+ }
+
+ cache.endWindow();
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
index 122cd2d..c2741b0 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
@@ -110,7 +110,7 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
@NotNull
private SpillableStateStore store;
@NotNull
- private SpillableByteMapImpl<T, ListNode<T>> map;
+ private SpillableMapImpl<T, ListNode<T>> map;
private T head;
private int size;
@@ -139,7 +139,7 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
{
this.store = Preconditions.checkNotNull(store);
- map = new SpillableByteMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde));
+ map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde));
}
public void setSize(int size)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
index c227ed7..98f60d2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -61,7 +61,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
private transient WindowBoundedMapCache<K, SpillableSetImpl<V>> cache = new WindowBoundedMapCache<>();
@NotNull
- private SpillableByteMapImpl<Slice, Pair<Integer, V>> map;
+ private SpillableMapImpl<Slice, Pair<Integer, V>> map;
private SpillableStateStore store;
private byte[] identifier;
private long bucket;
@@ -93,7 +93,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
this.serdeKey = Preconditions.checkNotNull(serdeKey);
this.serdeValue = Preconditions.checkNotNull(serdeValue);
- map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue));
+ map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue));
}
public SpillableStateStore getStore()
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
index ac77d1b..ac386ab 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
@@ -53,7 +53,7 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
protected Serde<K, Slice> keySerde;
protected Serde<V, Slice> valueSerde;
- protected Spillable.SpillableByteMap<Pair<Window, K>, V> windowKeyToValueMap;
+ protected Spillable.SpillableMap<Pair<Window, K>, V> windowKeyToValueMap;
protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap;
private class KVIterator implements Iterator<Map.Entry<K, V>>
@@ -181,7 +181,7 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
}
if (windowKeyToValueMap == null) {
- windowKeyToValueMap = scc.newSpillableByteMap(bucket, windowKeyPairSerde, valueSerde);
+ windowKeyToValueMap = scc.newSpillableMap(bucket, windowKeyPairSerde, valueSerde);
}
if (windowToKeysMap == null) {
windowToKeysMap = scc.newSpillableSetMultimap(bucket, windowSerde, keySerde);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
index 81f5dbb..6666381 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
@@ -45,7 +45,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
private Serde<Window, Slice> windowSerde;
private Serde<T, Slice> valueSerde;
- protected Spillable.SpillableByteMap<Window, T> windowToDataMap;
+ protected Spillable.SpillableMap<Window, T> windowToDataMap;
public SpillableWindowedPlainStorage()
{
@@ -134,7 +134,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
valueSerde = new SerdeKryoSlice<>();
}
if (windowToDataMap == null) {
- windowToDataMap = scc.newSpillableByteMap(bucket, windowSerde, valueSerde);
+ windowToDataMap = scc.newSpillableMap(bucket, windowSerde, valueSerde);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
new file mode 100644
index 0000000..82fb340
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class SpillableArrayListMultimapImplTest
+{
+ public static final byte[] ID1 = new byte[]{(byte)0};
+
+ @Rule
+ public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+ @Test
+ public void simpleMultiKeyTest()
+ {
+ InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+ simpleMultiKeyTestHelper(store);
+ }
+
+ @Test
+ public void simpleMultiKeyManagedStateTest()
+ {
+ simpleMultiKeyTestHelper(testMeta.store);
+ }
+
+ public void simpleMultiKeyTestHelper(SpillableStateStore store)
+ {
+ SpillableArrayListMultimapImpl<String, String> map =
+ new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new SerdeStringSlice(),
+ new SerdeStringSlice());
+
+ store.setup(testMeta.operatorContext);
+ map.setup(testMeta.operatorContext);
+
+ long nextWindowId = 0L;
+ nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
+ nextWindowId++;
+
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(1, map.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ nextWindowId = simpleMultiKeyTestHelper(store, map, "b", nextWindowId);
+ nextWindowId++;
+
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(2, map.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ simpleMultiKeyTestHelper(store, map, "c", nextWindowId);
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(3, map.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ map.teardown();
+ store.teardown();
+ }
+
+ public long simpleMultiKeyTestHelper(SpillableStateStore store,
+ SpillableArrayListMultimapImpl<String, String> map, String key, long nextWindowId)
+ {
+ SerdeStringSlice serdeString = new SerdeStringSlice();
+ SerdeIntSlice serdeInt = new SerdeIntSlice();
+
+ Slice keySlice = serdeString.serialize(key);
+
+ byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertNull(map.get(key));
+
+ Assert.assertFalse(map.containsKey(key));
+
+ map.put(key, "a");
+
+ Assert.assertTrue(map.containsKey(key));
+
+ List<String> list1 = map.get(key);
+ Assert.assertEquals(1, list1.size());
+
+ Assert.assertEquals("a", list1.get(0));
+
+ list1.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
+
+ Assert.assertEquals(8, list1.size());
+
+ Assert.assertEquals("a", list1.get(0));
+ Assert.assertEquals("a", list1.get(1));
+ Assert.assertEquals("b", list1.get(2));
+ Assert.assertEquals("c", list1.get(3));
+ Assert.assertEquals("d", list1.get(4));
+ Assert.assertEquals("e", list1.get(5));
+ Assert.assertEquals("f", list1.get(6));
+ Assert.assertEquals("g", list1.get(7));
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ SpillableTestUtils.checkValue(store, 0L,
+ SliceUtils.concatenate(keyBytes, SpillableArrayListMultimapImpl.SIZE_KEY_SUFFIX), 8, 0, serdeInt);
+
+ SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e",
+ "f", "g"));
+
+ List<String> list2 = map.get(key);
+
+ Assert.assertEquals(8, list2.size());
+
+ Assert.assertEquals("a", list2.get(0));
+ Assert.assertEquals("a", list2.get(1));
+ Assert.assertEquals("b", list2.get(2));
+ Assert.assertEquals("c", list2.get(3));
+ Assert.assertEquals("d", list2.get(4));
+ Assert.assertEquals("e", list2.get(5));
+ Assert.assertEquals("f", list2.get(6));
+ Assert.assertEquals("g", list2.get(7));
+
+ list2.add("tt");
+ list2.add("ab");
+ list2.add("99");
+ list2.add("oo");
+
+ Assert.assertEquals("tt", list2.get(8));
+ Assert.assertEquals("ab", list2.get(9));
+ Assert.assertEquals("99", list2.get(10));
+ Assert.assertEquals("oo", list2.get(11));
+
+ Assert.assertEquals(12, list2.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ Assert.assertEquals(12, list2.size());
+
+ SpillableTestUtils.checkValue(store, 0L,
+ SliceUtils.concatenate(keyBytes, SpillableArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt);
+
+ SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e",
+ "f", "g", "tt", "ab", "99", "oo"));
+
+ List<String> list3 = map.get(key);
+
+ list3.set(1, "111");
+ list3.set(3, "222");
+ list3.set(5, "333");
+ list3.set(11, "444");
+
+ Assert.assertEquals("a", list3.get(0));
+ Assert.assertEquals("111", list3.get(1));
+ Assert.assertEquals("b", list3.get(2));
+ Assert.assertEquals("222", list3.get(3));
+ Assert.assertEquals("d", list3.get(4));
+ Assert.assertEquals("333", list3.get(5));
+ Assert.assertEquals("f", list3.get(6));
+ Assert.assertEquals("g", list3.get(7));
+ Assert.assertEquals("tt", list3.get(8));
+ Assert.assertEquals("ab", list3.get(9));
+ Assert.assertEquals("99", list3.get(10));
+ Assert.assertEquals("444", list3.get(11));
+
+ Assert.assertEquals(12, list2.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ nextWindowId++;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ SpillableTestUtils.checkValue(store, 0L,
+ SliceUtils.concatenate(keyBytes, SpillableArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt);
+
+ SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d", "333",
+ "f", "g", "tt", "ab", "99", "444"));
+
+ map.endWindow();
+ store.endWindow();
+
+ return nextWindowId;
+ }
+
+ @Test
+ public void recoveryTestWithManagedState()
+ {
+ SpillableStateStore store = testMeta.store;
+
+ SpillableArrayListMultimapImpl<String, String> map =
+ new SpillableArrayListMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+ store.setup(testMeta.operatorContext);
+ map.setup(testMeta.operatorContext);
+
+ long nextWindowId = 0L;
+ nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
+ long activationWindow = nextWindowId;
+ store.beforeCheckpoint(nextWindowId);
+ SpillableArrayListMultimapImpl<String, String> clonedMap = KryoCloneUtils.cloneObject(map);
+ store.checkpointed(nextWindowId);
+ store.committed(nextWindowId);
+
+ nextWindowId++;
+
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ List<String> list1 = map.get("a");
+
+ Assert.assertEquals(12, list1.size());
+
+ Assert.assertEquals("a", list1.get(0));
+ Assert.assertEquals("111", list1.get(1));
+ Assert.assertEquals("b", list1.get(2));
+ Assert.assertEquals("222", list1.get(3));
+ Assert.assertEquals("d", list1.get(4));
+ Assert.assertEquals("333", list1.get(5));
+ Assert.assertEquals("f", list1.get(6));
+ Assert.assertEquals("g", list1.get(7));
+ Assert.assertEquals("tt", list1.get(8));
+ Assert.assertEquals("ab", list1.get(9));
+ Assert.assertEquals("99", list1.get(10));
+ Assert.assertEquals("444", list1.get(11));
+
+ list1.add("111");
+
+ Assert.assertEquals("a", list1.get(0));
+ Assert.assertEquals("111", list1.get(1));
+ Assert.assertEquals("b", list1.get(2));
+ Assert.assertEquals("222", list1.get(3));
+ Assert.assertEquals("d", list1.get(4));
+ Assert.assertEquals("333", list1.get(5));
+ Assert.assertEquals("f", list1.get(6));
+ Assert.assertEquals("g", list1.get(7));
+ Assert.assertEquals("tt", list1.get(8));
+ Assert.assertEquals("ab", list1.get(9));
+ Assert.assertEquals("99", list1.get(10));
+ Assert.assertEquals("444", list1.get(11));
+ Assert.assertEquals("111", list1.get(12));
+
+ Assert.assertEquals(13, list1.size());
+
+ map.endWindow();
+ store.endWindow();
+
+ map.teardown();
+ store.teardown();
+
+ map = clonedMap;
+ store = map.getStore();
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+ attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow);
+ Context.OperatorContext context =
+ new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+
+ store.setup(context);
+ map.setup(context);
+ nextWindowId = activationWindow + 1;
+ store.beginWindow(nextWindowId);
+ map.beginWindow(nextWindowId);
+
+ SerdeStringSlice serdeString = new SerdeStringSlice();
+ Slice keySlice = serdeString.serialize("a");
+ byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
+
+ SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d",
+ "333", "f", "g", "tt", "ab", "99", "444"));
+
+ Assert.assertEquals(1, map.size());
+ Assert.assertEquals(12, map.get("a").size());
+
+ map.endWindow();
+ store.endWindow();
+
+ map.teardown();
+ store.teardown();
+ }
+
+ @Test
+ public void testLoad()
+ {
+ Random random = new Random();
+ final int keySize = 1000000;
+ final int valueSize = 100000000;
+ final int numOfEntry = 100000;
+
+ SpillableStateStore store = testMeta.store;
+ SpillableArrayListMultimapImpl<String, String> multimap = new SpillableArrayListMultimapImpl<>(
+ this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+ Context.OperatorContext context =
+ new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+ store.setup(context);
+ multimap.setup(context);
+
+ store.beginWindow(1);
+ multimap.beginWindow(1);
+ for (int i = 0; i < numOfEntry; ++i) {
+ multimap.put(String.valueOf(random.nextInt(keySize)), String.valueOf(random.nextInt(valueSize)));
+ }
+ multimap.endWindow();
+ store.endWindow();
+ }
+}