You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/10/31 19:36:08 UTC
apex-malhar git commit: APEXMALHAR-2244 #comment Use
TimeUnifiedManageStateStore for Spillable Data Structure
Repository: apex-malhar
Updated Branches:
refs/heads/master 27272a588 -> 16edf3067
APEXMALHAR-2244 #comment Use TimeUnifiedManageStateStore for Spillable Data Structure
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/16edf306
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/16edf306
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/16edf306
Branch: refs/heads/master
Commit: 16edf3067226735682ecce63eedd0b120a708427
Parents: 27272a5
Author: Siyuan Hua <hs...@apache.org>
Authored: Thu Oct 27 14:06:51 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Mon Oct 31 11:14:04 2016 -0700
----------------------------------------------------------------------
.../state/managed/AbstractManagedStateImpl.java | 2 +-
.../malhar/lib/state/managed/TimeExtractor.java | 27 +++
.../spillable/SpillableComplexComponent.java | 40 ++++
.../SpillableComplexComponentImpl.java | 44 +++-
.../lib/state/spillable/SpillableMapImpl.java | 45 +++-
.../lib/state/spillable/SpillableSetImpl.java | 23 +-
.../spillable/SpillableSetMultimapImpl.java | 95 ++++++--
...agedTimeUnifiedStateSpillableStateStore.java | 29 +++
.../impl/SpillableWindowedKeyedStorage.java | 4 +-
.../impl/SpillableWindowedPlainStorage.java | 2 +-
.../window/impl/WindowKeyPairTimeExtractor.java | 40 ++++
.../lib/window/impl/WindowTimeExtractor.java | 35 +++
.../state/spillable/SpillableMapImplTest.java | 234 ++++++++-----------
.../state/spillable/SpillableSetImplTest.java | 27 ++-
.../spillable/SpillableSetMultimapImplTest.java | 21 +-
.../lib/state/spillable/SpillableTestUtils.java | 4 +
.../spillable/TestStringTimeExtractor.java | 36 +++
.../window/SpillableWindowedStorageTest.java | 19 +-
.../malhar/lib/window/WindowedOperatorTest.java | 90 +++----
19 files changed, 580 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 20271b0..1c52c31 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -316,7 +316,7 @@ public abstract class AbstractManagedStateImpl
protected int getBucketIdx(long bucketId)
{
- return (int)(bucketId % numBuckets);
+ return (int)Math.abs(bucketId % numBuckets);
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
new file mode 100644
index 0000000..e70e80f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+/**
+ * A way to extract time from data
+ */
+public interface TimeExtractor<T>
+{
+ long getTime(T t);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index 542a914..b6ec6a2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -18,6 +18,7 @@
*/
package org.apache.apex.malhar.lib.state.spillable;
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
import org.apache.apex.malhar.lib.state.spillable.Spillable.SpillableComponent;
import org.apache.apex.malhar.lib.utils.serde.Serde;
@@ -68,6 +69,19 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
Serde<V> serdeValue);
/**
+ * This is a method for creating a {@link SpillableMap}. This method
+ * auto-generates an identifier for the data structure.
+ * @param <K> The type of the keys.
+ * @param <V> The type of the values.
+ * @param serdeKey The Serializer/Deserializer to use for the map's keys.
+ * @param serdeValue The Serializer/Deserializer to use for the map's values.
+ * @param timeExtractor a util object to extract time from key.
+ * @return A {@link SpillableMap}.
+ */
+ <K, V> SpillableMap<K, V> newSpillableMap(Serde<K> serdeKey,
+ Serde<V> serdeValue, TimeExtractor<K> timeExtractor);
+
+ /**
* This is a method for creating a {@link SpillableMap}.
* @param <K> The type of the keys.
* @param <V> The type of the values.
@@ -81,6 +95,19 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
Serde<K> serdeKey, Serde<V> serdeValue);
/**
+ * This is a method for creating a {@link SpillableMap}.
+ * @param <K> The type of the keys.
+ * @param <V> The type of the values.
+ * @param identifier The identifier for this {@link SpillableMap}.
+ * @param serdeKey The Serializer/Deserializer to use for the map's keys.
+ * @param serdeValue The Serializer/Deserializer to use for the map's values.
+ * @param timeExtractor a util object to extract time from key.
+ * @return A {@link SpillableMap}.
+ */
+ <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier,
+ Serde<K> serdeKey, Serde<V> serdeValue, TimeExtractor<K> timeExtractor);
+
+ /**
* This is a method for creating a {@link SpillableListMultimap}. This method
* auto-generates an identifier for the data structure.
* @param <K> The type of the keys.
@@ -118,6 +145,19 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
<K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue);
/**
+ * This is a method for creating a {@link SpillableSetMultimap}.
+ * @param <K> The type of the keys.
+ * @param <V> The type of the values in the map's lists.
+ * @param bucket The bucket that this {@link SpillableSetMultimap} will be spilled to.
+ * @param serdeKey The Serializer/Deserializer to use for the map's keys.
+ * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
+ * @param timeExtractor a util object to extract time from key.
+ * @return A {@link SpillableSetMultimap}.
+ */
+ <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey,
+ Serde<V> serdeValue, TimeExtractor<K> timeExtractor);
+
+ /**
* This is a method for creating a {@link SpillableMultiset}. This method
* auto-generates an identifier for the data structure.
* @param <T> The type of the elements.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
index 1a3f550..1d9fbc6 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -23,6 +23,7 @@ import java.util.Set;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
import org.apache.apex.malhar.lib.utils.serde.Serde;
import org.apache.hadoop.classification.InterfaceStability;
@@ -75,7 +76,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
@Override
public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde)
{
- SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde);
+ SpillableArrayListImpl<T> list = new SpillableArrayListImpl<>(bucket, identifierGenerator.next(), store, serde);
componentList.add(list);
return list;
}
@@ -84,7 +85,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde)
{
identifierGenerator.register(identifier);
- SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde);
+ SpillableArrayListImpl<T> list = new SpillableArrayListImpl<>(bucket, identifier, store, serde);
bucketIds.add(bucket);
componentList.add(list);
return list;
@@ -94,7 +95,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey,
Serde<V> serdeValue)
{
- SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifierGenerator.next(),
+ SpillableMapImpl<K, V> map = new SpillableMapImpl<>(store, identifierGenerator.next(),
bucket, serdeKey, serdeValue);
bucketIds.add(bucket);
componentList.add(map);
@@ -106,16 +107,35 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
Serde<V> serdeValue)
{
identifierGenerator.register(identifier);
- SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue);
+ SpillableMapImpl<K, V> map = new SpillableMapImpl<>(store, identifier, bucket, serdeKey, serdeValue);
bucketIds.add(bucket);
componentList.add(map);
return map;
}
@Override
+ public <K, V> SpillableMap<K, V> newSpillableMap(Serde<K> serdeKey,
+ Serde<V> serdeValue, TimeExtractor<K> timeExtractor)
+ {
+ SpillableMapImpl<K, V> map = new SpillableMapImpl<>(store, identifierGenerator.next(), serdeKey, serdeValue, timeExtractor);
+ componentList.add(map);
+ return map;
+ }
+
+ @Override
+ public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, Serde<K> serdeKey,
+ Serde<V> serdeValue, TimeExtractor<K> timeExtractor)
+ {
+ identifierGenerator.register(identifier);
+ SpillableMapImpl<K, V> map = new SpillableMapImpl<>(store, identifier, serdeKey, serdeValue, timeExtractor);
+ componentList.add(map);
+ return map;
+ }
+
+ @Override
public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue)
{
- SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
+ SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<>(store,
identifierGenerator.next(), bucket, serdeKey, serdeValue);
bucketIds.add(bucket);
componentList.add(map);
@@ -128,7 +148,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
Serde<V> serdeValue)
{
identifierGenerator.register(identifier);
- SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
+ SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<>(store,
identifier, bucket, serdeKey, serdeValue);
bucketIds.add(bucket);
componentList.add(map);
@@ -138,7 +158,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
@Override
public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue)
{
- SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<K, V>(store,
+ SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<>(store,
identifierGenerator.next(), bucket, serdeKey, serdeValue);
bucketIds.add(bucket);
componentList.add(map);
@@ -146,6 +166,16 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
}
@Override
+ public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey,
+ Serde<V> serdeValue, TimeExtractor<K> timeExtractor)
+ {
+ SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<>(store,
+ identifierGenerator.next(), bucket, serdeKey, serdeValue, timeExtractor);
+ componentList.add(map);
+ return map;
+ }
+
+ @Override
public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde)
{
throw new UnsupportedOperationException("Unsupported Operation");
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
index 5fa39d7..e7071a2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
@@ -26,6 +26,7 @@ import java.util.Set;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
import org.apache.apex.malhar.lib.utils.serde.Serde;
@@ -55,10 +56,11 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
private transient Input tmpInput = new Input();
+ private TimeExtractor<K> timeExtractor;
+
@NotNull
private SpillableStateStore store;
- @NotNull
- private byte[] identifier;
+
private long bucket;
private int size = 0;
@@ -76,16 +78,32 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
* @param identifier The Id of this {@link SpillableMapImpl}.
* @param bucket The Id of the bucket used to store this
* {@link SpillableMapImpl} in the provided {@link SpillableStateStore}.
- * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
- * @param keySerde The {@link Serde} to use when serializing and deserializing values.
+ * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+ * @param serdeValue The {@link Serde} to use when serializing and deserializing values.
*/
- public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> keySerde,
- Serde<V> valueSerde)
+ public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> serdeKey,
+ Serde<V> serdeValue)
{
this.store = Preconditions.checkNotNull(store);
- this.identifier = Preconditions.checkNotNull(identifier);
this.bucket = bucket;
- keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(null, identifier, Preconditions.checkNotNull(keySerde), Preconditions.checkNotNull(valueSerde));
+ keyValueSerdeManager = new AffixKeyValueSerdeManager<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue));
+ }
+
+ /**
+ * Creats a {@link SpillableMapImpl}.
+ * @param store The {@link SpillableStateStore} in which to spill to.
+ * @param identifier The Id of this {@link SpillableMapImpl}.
+ * {@link SpillableMapImpl} in the provided {@link SpillableStateStore}.
+ * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+ * @param serdeValue The {@link Serde} to use when serializing and deserializing values.
+ * @param timeExtractor Extract time from the each element and use it to decide where the data goes
+ */
+ public SpillableMapImpl(SpillableStateStore store, byte[] identifier, Serde<K> serdeKey,
+ Serde<V> serdeValue, TimeExtractor<K> timeExtractor)
+ {
+ this.store = Preconditions.checkNotNull(store);
+ keyValueSerdeManager = new AffixKeyValueSerdeManager<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue));
+ this.timeExtractor = timeExtractor;
}
public SpillableStateStore getStore()
@@ -132,7 +150,7 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
return val;
}
- Slice valSlice = store.getSync(bucket, keyValueSerdeManager.serializeDataKey(key, false));
+ Slice valSlice = store.getSync(getBucket(key), keyValueSerdeManager.serializeDataKey(key, false));
if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
return null;
@@ -219,12 +237,12 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
public void endWindow()
{
for (K key: cache.getChangedKeys()) {
- store.put(bucket, keyValueSerdeManager.serializeDataKey(key, true),
+ store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true),
keyValueSerdeManager.serializeValue(cache.get(key)));
}
for (K key: cache.getRemovedKeys()) {
- store.put(this.bucket, keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE);
+ store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE);
}
cache.endWindow();
keyValueSerdeManager.resetReadBuffer();
@@ -234,4 +252,9 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
public void teardown()
{
}
+
+ private long getBucket(K key)
+ {
+ return timeExtractor != null ? timeExtractor.getTime(key) : bucket;
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
index 0dfc411..221cd38 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
@@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
import org.apache.apex.malhar.lib.utils.serde.Serde;
import org.apache.hadoop.classification.InterfaceStability;
@@ -89,8 +90,6 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
}
@NotNull
- private SpillableStateStore store;
- @NotNull
private SpillableMapImpl<T, ListNode<T>> map;
private T head;
@@ -103,7 +102,7 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
public SpillableStateStore getStore()
{
- return store;
+ return map.getStore();
}
/**
@@ -118,9 +117,23 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
@NotNull SpillableStateStore store,
@NotNull Serde<T> serde)
{
- this.store = Preconditions.checkNotNull(store);
+ map = new SpillableMapImpl<>(Preconditions.checkNotNull(store), prefix, bucketId, serde, new ListNodeSerde<>(serde));
+ }
- map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new ListNodeSerde(serde));
+ /**
+ * Creates a {@link SpillableSetImpl}.
+ * {@link SpillableSetImpl} in the provided {@link SpillableStateStore}.
+ * @param prefix The Id of this {@link SpillableSetImpl}.
+ * @param store The {@link SpillableStateStore} in which to spill to.
+ * @param serde The {@link Serde} to use when serializing and deserializing data.
+ * @param timeExtractor Extract time from the each element and use it to decide where the data goes.
+ */
+ public SpillableSetImpl(@NotNull byte[] prefix,
+ @NotNull SpillableStateStore store,
+ @NotNull Serde<T> serde,
+ @NotNull TimeExtractor timeExtractor)
+ {
+ map = new SpillableMapImpl<>(Preconditions.checkNotNull(store), prefix, serde, new ListNodeSerde<>(serde), timeExtractor);
}
public void setSize(int size)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
index 76e47f2..fb88d9c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -27,10 +27,11 @@ import java.util.Set;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.AffixSerde;
import org.apache.apex.malhar.lib.utils.serde.IntSerde;
import org.apache.apex.malhar.lib.utils.serde.PairSerde;
-import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -55,21 +56,46 @@ import com.datatorrent.netlet.util.Slice;
public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMultimap<K, V>,
Spillable.SpillableComponent
{
+
+ private static class FixedTimeExtractor<V> implements TimeExtractor<V>
+ {
+
+ private long fixedTime;
+
+ private FixedTimeExtractor(long fixedTime)
+ {
+ this.fixedTime = fixedTime;
+ }
+
+ private FixedTimeExtractor()
+ {
+ // For kryo
+ }
+
+ @Override
+ public long getTime(V v)
+ {
+ return fixedTime;
+ }
+
+ }
+
public static final int DEFAULT_BATCH_SIZE = 1000;
public static final byte[] META_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
private transient WindowBoundedMapCache<K, SpillableSetImpl<V>> cache = new WindowBoundedMapCache<>();
@NotNull
- private SpillableMapImpl<Slice, Pair<Integer, V>> map;
+ private SpillableMapImpl<K, Pair<Integer, V>> map;
private SpillableStateStore store;
- private byte[] identifier;
private long bucket;
private Serde<V> valueSerde;
private transient List<SpillableSetImpl<V>> removedSets = new ArrayList<>();
- protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
- protected transient Context.OperatorContext context;
+ private TimeExtractor<K> timeExtractor = null;
+ private AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
+ private transient Context.OperatorContext context;
+
private SpillableSetMultimapImpl()
{
// for kryo
@@ -81,8 +107,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
* @param identifier The Id of this {@link SpillableSetMultimapImpl}.
* @param bucket The Id of the bucket used to store this
* {@link SpillableSetMultimapImpl} in the provided {@link SpillableStateStore}.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+ * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
+ * @param valueSerde The {@link Serde} to use when serializing and deserializing values.
*/
public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
Serde<K> keySerde,
@@ -93,7 +119,32 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
this.valueSerde = Preconditions.checkNotNull(valueSerde);
keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(META_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde);
- map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new PairSerde<>(new IntSerde(), valueSerde));
+ map = new SpillableMapImpl<>(store, identifier, bucket, new AffixSerde<>(null, keySerde, META_KEY_SUFFIX), new PairSerde<>(new IntSerde(), valueSerde));
+ }
+
+
+ /**
+ * Creates a {@link SpillableSetMultimapImpl}.
+ * @param store The {@link SpillableStateStore} in which to spill to.
+ * @param identifier The Id of this {@link SpillableSetMultimapImpl}.
+ * @param bucket The Id of the bucket used to store this
+ * {@link SpillableSetMultimapImpl} in the provided {@link SpillableStateStore}.
+ * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
+ * @param valueSerde The {@link Serde} to use when serializing and deserializing values.
+ * @param timeExtractor The {@link TimeExtractor} to be used to retrieve time from key
+ */
+ public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
+ Serde<K> keySerde,
+ Serde<V> valueSerde,
+ TimeExtractor<K> timeExtractor)
+ {
+ this.store = Preconditions.checkNotNull(store);
+ this.bucket = bucket;
+ this.valueSerde = Preconditions.checkNotNull(valueSerde);
+ keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(META_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde);
+ this.timeExtractor = timeExtractor;
+
+ map = new SpillableMapImpl<>(store, identifier, new AffixSerde<>(null, keySerde, META_KEY_SUFFIX), new PairSerde<>(new IntSerde(), valueSerde), timeExtractor);
}
public SpillableStateStore getStore()
@@ -112,14 +163,23 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
SpillableSetImpl<V> spillableSet = cache.get(key);
if (spillableSet == null) {
- Pair<Integer, V> meta = map.get(keyValueSerdeManager.serializeMetaKey(key, false));
+ long keyTime = -1;
+ Pair<Integer, V> meta;
+ if (timeExtractor != null) {
+ keyTime = timeExtractor.getTime(key);
+ }
+ meta = map.get(key);
if (meta == null) {
return null;
}
Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, false);
- spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde);
+ if (timeExtractor != null) {
+ spillableSet = new SpillableSetImpl<>(keyPrefix.toByteArray(), store, valueSerde, new FixedTimeExtractor(keyTime));
+ } else {
+ spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde);
+ }
spillableSet.setSize(meta.getLeft());
spillableSet.setHead(meta.getRight());
spillableSet.setup(context);
@@ -166,8 +226,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
SpillableSetImpl<V> spillableSet = getHelper((K)key);
if (spillableSet != null) {
cache.remove((K)key);
- Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false);
- map.put(keySlice, new ImmutablePair<>(0, spillableSet.getHead()));
+ map.put((K)key, new ImmutablePair<>(0, spillableSet.getHead()));
spillableSet.clear();
removedSets.add(spillableSet);
}
@@ -199,8 +258,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
if (cache.contains((K)key)) {
return true;
}
- Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false);
- Pair<Integer, V> meta = map.get(keySlice);
+ Pair<Integer, V> meta = map.get((K)key);
return meta != null && meta.getLeft() > 0;
}
@@ -227,7 +285,11 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
SpillableSetImpl<V> spillableSet = getHelper(key);
if (spillableSet == null) {
- spillableSet = new SpillableSetImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde);
+ if (timeExtractor == null) {
+ spillableSet = new SpillableSetImpl<>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde);
+ } else {
+ spillableSet = new SpillableSetImpl<>(keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde, new FixedTimeExtractor(timeExtractor.getTime(key)));
+ }
spillableSet.setup(context);
cache.put(key, spillableSet);
}
@@ -304,8 +366,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
SpillableSetImpl<V> spillableSet = cache.get(key);
spillableSet.endWindow();
- map.put(keyValueSerdeManager.serializeMetaKey(key, true),
- new ImmutablePair<>(spillableSet.size(), spillableSet.getHead()));
+ map.put(key, new ImmutablePair<>(spillableSet.size(), spillableSet.getHead()));
}
for (SpillableSetImpl removedSet : removedSets) {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedTimeUnifiedStateSpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedTimeUnifiedStateSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedTimeUnifiedStateSpillableStateStore.java
new file mode 100644
index 0000000..207cb31
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedTimeUnifiedStateSpillableStateStore.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable.managed;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+
+/**
+ *
+ */
+public class ManagedTimeUnifiedStateSpillableStateStore extends ManagedTimeUnifiedStateImpl implements SpillableStateStore
+{
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
index ef111b3..d41c494 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
@@ -180,10 +180,10 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
}
if (windowKeyToValueMap == null) {
- windowKeyToValueMap = scc.newSpillableMap(bucket, windowKeyPairSerde, valueSerde);
+ windowKeyToValueMap = scc.newSpillableMap(windowKeyPairSerde, valueSerde, new WindowKeyPairTimeExtractor());
}
if (windowToKeysMap == null) {
- windowToKeysMap = scc.newSpillableSetMultimap(bucket, windowSerde, keySerde);
+ windowToKeysMap = scc.newSpillableSetMultimap(bucket, windowSerde, keySerde, new WindowTimeExtractor());
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
index 9a8a291..f9bbc17 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
@@ -133,7 +133,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
valueSerde = new GenericSerde<>();
}
if (windowToDataMap == null) {
- windowToDataMap = scc.newSpillableMap(bucket, windowSerde, valueSerde);
+ windowToDataMap = scc.newSpillableMap(windowSerde, valueSerde, new WindowTimeExtractor());
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowKeyPairTimeExtractor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowKeyPairTimeExtractor.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowKeyPairTimeExtractor.java
new file mode 100644
index 0000000..ecf63a5
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowKeyPairTimeExtractor.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * A {@link TimeExtractor} to extract time from Pair of {@link Window} and key
+ * The type of key doesn't matter in this case, so it assumes object as the key type
+ */
+public class WindowKeyPairTimeExtractor<K> implements TimeExtractor<Pair<Window, K>>
+{
+
+ private final WindowTimeExtractor windowTimeExtractor = new WindowTimeExtractor();
+
+ @Override
+ public long getTime(Pair<Window, K> windowKeyPair)
+ {
+ return windowTimeExtractor.getTime(windowKeyPair.getKey());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowTimeExtractor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowTimeExtractor.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowTimeExtractor.java
new file mode 100644
index 0000000..aee389a
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowTimeExtractor.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
+import org.apache.apex.malhar.lib.window.Window;
+
+/**
+ *\u3000A {@link TimeExtractor} to extract time from {@link Window}
+ */
+public class WindowTimeExtractor implements TimeExtractor<Window>
+{
+ @Override
+ public long getTime(Window window)
+ {
+ return window.getBeginTimestamp() + window.getDurationMillis();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
index a96a8fd..760bc5c 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
@@ -21,7 +21,9 @@ package org.apache.apex.malhar.lib.state.spillable;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
import org.apache.apex.malhar.lib.utils.serde.StringSerde;
@@ -31,31 +33,46 @@ import com.datatorrent.api.DAG;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.util.KryoCloneUtils;
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+
+@RunWith(JUnitParamsRunner.class)
public class SpillableMapImplTest
{
public static final byte[] ID1 = new byte[]{(byte)0};
public static final byte[] ID2 = new byte[]{(byte)1};
+ public static final TestStringTimeExtractor TE = new TestStringTimeExtractor();
+
+ private SpillableStateStore store;
+
+ private TimeExtractor<String> te = null;
+
+
@Rule
public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
- @Test
- public void simpleGetAndPutTest()
- {
- InMemSpillableStateStore store = new InMemSpillableStateStore();
- simpleGetAndPutTestHelper(store);
+ private void setup(String opt)
+ {
+ if (opt.equals("InMem")) {
+ store = new InMemSpillableStateStore();
+ te = null;
+ } else if (opt.equals("ManagedState")) {
+ store = testMeta.store;
+ te = null;
+ } else {
+ store = testMeta.timeStore;
+ te = TE;
+ }
}
@Test
- public void simpleGetAndPutManagedStateTest()
- {
- simpleGetAndPutTestHelper(testMeta.store);
- }
-
- private void simpleGetAndPutTestHelper(SpillableStateStore store)
+ @Parameters({"InMem","ManagedState","TimeUnifiedManagedState"})
+ public void simpleGetAndPutTest(String opt)
{
+ setup(opt);
SpillableMapImpl<String, String> map = createSpillableMap(store);
store.setup(testMeta.operatorContext);
@@ -73,15 +90,9 @@ public class SpillableMapImplTest
Assert.assertEquals(3, map.size());
- Assert.assertEquals("1", map.get("a"));
- Assert.assertEquals("2", map.get("b"));
- Assert.assertEquals("3", map.get("c"));
- Assert.assertEquals(null, map.get("d"));
+ assertMultiEqualsFromMap(map, new String[]{"1", "2", "3", null}, new String[]{"a", "b", "c", "d"});
- SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+ multiValueCheck(new String[]{"a", "b", "c", "d"}, ID1, new String[]{null, null, null, null});
map.endWindow();
store.endWindow();
@@ -93,17 +104,11 @@ public class SpillableMapImplTest
store.beginWindow(windowId);
map.beginWindow(windowId);
- SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
- SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
- SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
- SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+ multiValueCheck(new String[]{"a", "b", "c", "d"}, ID1, new String[]{"1", "2", "3", null});
Assert.assertEquals(3, map.size());
- Assert.assertEquals("1", map.get("a"));
- Assert.assertEquals("2", map.get("b"));
- Assert.assertEquals("3", map.get("c"));
- Assert.assertEquals(null, map.get("d"));
+ assertMultiEqualsFromMap(map, new String[]{"1", "2", "3", null}, new String[]{"a", "b", "c", "d"});
map.put("d", "4");
map.put("e", "5");
@@ -111,16 +116,9 @@ public class SpillableMapImplTest
Assert.assertEquals(6, map.size());
- Assert.assertEquals("4", map.get("d"));
- Assert.assertEquals("5", map.get("e"));
- Assert.assertEquals("6", map.get("f"));
+ assertMultiEqualsFromMap(map, new String[]{"4", "5", "6"}, new String[]{"d", "e", "f"});
- SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
- SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
- SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
- SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "e", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "f", ID1, null);
+ multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f"}, ID1, new String[]{"1", "2", "3", null, null, null});
map.endWindow();
store.endWindow();
@@ -132,13 +130,8 @@ public class SpillableMapImplTest
store.beginWindow(windowId);
map.beginWindow(windowId);
- SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
- SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
- SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
- SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
- SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
- SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
- SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+ multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f", "g"}, ID1, new String[]{"1", "2", "3", "4", "5", "6", null});
map.endWindow();
store.endWindow();
@@ -150,28 +143,43 @@ public class SpillableMapImplTest
store.teardown();
}
- @Test
- public void simpleRemoveTest()
+ private void multiValueCheck(String[] keys, byte[] samePrefix, String[] expectedVal)
{
- InMemSpillableStateStore store = new InMemSpillableStateStore();
- simpleRemoveTestHelper(store);
+ for (int i = 0; i < keys.length; i++) {
+ SpillableTestUtils.checkValue(store, _bid(keys[i], te), keys[i], samePrefix, expectedVal[i]);
+ }
}
+ private void assertMultiEqualsFromMap(SpillableMapImpl<String, String> map, String[] expectedV, String[] keys)
+ {
+ for (int i = 0; i < expectedV.length; i++) {
+ Assert.assertEquals(expectedV[i], map.get(keys[i]));
+ }
+ }
- @Test
- public void simpleRemoveManagedStateTest()
+ private long _bid(String key, TimeExtractor<String> te)
{
- simpleRemoveTestHelper(testMeta.store);
+ if (te != null) {
+ return te.getTime(key);
+ } else {
+ return 0L;
+ }
}
- protected SpillableMapImpl<String, String> createSpillableMap(SpillableStateStore store)
+ private SpillableMapImpl<String, String> createSpillableMap(SpillableStateStore store)
{
- return new SpillableMapImpl<String, String>(store, ID1, 0L, new StringSerde(),
- new StringSerde());
+ if (te == null) {
+ return new SpillableMapImpl<>(store,ID1,0L,new StringSerde(), new StringSerde());
+ } else {
+ return new SpillableMapImpl<>(store,ID1,new StringSerde(), new StringSerde(), te);
+ }
}
- private void simpleRemoveTestHelper(SpillableStateStore store)
+ @Test
+ @Parameters({"InMem","ManagedState","TimeUnifiedManagedState"})
+ public void simpleRemoveTest(String opt)
{
+ setup(opt);
SpillableMapImpl<String, String> map = createSpillableMap(store);
store.setup(testMeta.operatorContext);
@@ -199,10 +207,7 @@ public class SpillableMapImplTest
Assert.assertEquals(1, map.size());
- SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+ multiValueCheck(new String[]{"a", "b", "c", "d"}, ID1, new String[]{null, null, null, null});
map.endWindow();
store.endWindow();
@@ -210,10 +215,7 @@ public class SpillableMapImplTest
store.checkpointed(windowId);
store.committed(windowId);
- SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
- SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+ multiValueCheck(new String[]{"a", "b", "c", "d"}, ID1, new String[]{"1", null, null, null});
windowId++;
store.beginWindow(windowId);
@@ -236,12 +238,7 @@ public class SpillableMapImplTest
Assert.assertEquals("5", map.get("e"));
Assert.assertEquals("6", map.get("f"));
- SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
- SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "e", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "f", ID1, null);
+ multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f"}, ID1, new String[]{"1", null, null, null, null, null});
map.endWindow();
store.endWindow();
@@ -253,13 +250,7 @@ public class SpillableMapImplTest
store.beginWindow(windowId);
map.beginWindow(windowId);
- SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
- SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
- SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
- SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
- SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+ multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f", "g"}, ID1, new String[]{"1", null, null, "4", "5", "6", null});
map.remove("a");
map.remove("d");
@@ -271,13 +262,7 @@ public class SpillableMapImplTest
Assert.assertEquals("6", map.get("f"));
Assert.assertEquals(null, map.get("g"));
- SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
- SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
- SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
- SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
- SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+ multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f", "g"}, ID1, new String[]{"1", null, null, "4", "5", "6", null});
map.endWindow();
store.endWindow();
@@ -289,13 +274,7 @@ public class SpillableMapImplTest
store.beginWindow(windowId);
map.beginWindow(windowId);
- SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
- SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
- SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+ multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f", "g"}, ID1, new String[]{null, null, null, null, "5", "6", null});
map.endWindow();
store.endWindow();
@@ -308,29 +287,21 @@ public class SpillableMapImplTest
}
@Test
- public void multiMapPerBucketTest()
- {
- InMemSpillableStateStore store = new InMemSpillableStateStore();
-
- multiMapPerBucketTestHelper(store);
- }
-
- @Test
- public void multiMapPerBucketManagedStateTest()
- {
- multiMapPerBucketTestHelper(testMeta.store);
- }
-
- public void multiMapPerBucketTestHelper(SpillableStateStore store)
+ @Parameters({"InMem","ManagedState","TimeUnifiedManagedState"})
+ public void multiMapPerBucketTest(String opt)
{
+ setup(opt);
StringSerde sss = new StringSerde();
- SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(store, ID1, 0L,
- new StringSerde(),
- new StringSerde());
- SpillableMapImpl<String, String> map2 = new SpillableMapImpl<>(store, ID2, 0L,
- new StringSerde(),
- new StringSerde());
+ SpillableMapImpl<String, String> map1 = null;
+ SpillableMapImpl<String, String> map2 = null;
+ if (te == null) {
+ map1 = new SpillableMapImpl<>(store, ID1, 0L, sss, sss);
+ map2 = new SpillableMapImpl<>(store, ID2, 0L, sss, sss);
+ } else {
+ map1 = new SpillableMapImpl<>(store, ID1, sss, sss, te);
+ map2 = new SpillableMapImpl<>(store, ID2, sss, sss, te);
+ }
store.setup(testMeta.operatorContext);
map1.setup(testMeta.operatorContext);
@@ -372,12 +343,9 @@ public class SpillableMapImplTest
map1.beginWindow(windowId);
map2.beginWindow(windowId);
- SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
- SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+ multiValueCheck(new String[]{"a", "b"}, ID1, new String[]{"1", "2"});
- SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1");
- SpillableTestUtils.checkValue(store, 0L, "b", ID2, null);
- SpillableTestUtils.checkValue(store, 0L, "c", ID2, "3");
+ multiValueCheck(new String[]{"a", "b", "c"}, ID2, new String[]{"a1", null, "3"});
map1.remove("a");
@@ -395,8 +363,8 @@ public class SpillableMapImplTest
map1.beginWindow(windowId);
map2.beginWindow(windowId);
- SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
- SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1");
+ multiValueCheck(new String[]{"a"}, ID1, new String[]{null});
+ multiValueCheck(new String[]{"a"}, ID2, new String[]{"a1"});
map1.endWindow();
map2.endWindow();
@@ -410,18 +378,22 @@ public class SpillableMapImplTest
}
@Test
- public void recoveryWithManagedStateTest() throws Exception
+ @Parameters({"ManagedState","TimeUnifiedManagedState"})
+ public void recoveryWithManagedStateTest(String opt) throws Exception
{
+ setup(opt);
StringSerde sss = new StringSerde();
+ SpillableMapImpl<String, String> map1 = null;
+ if (te == null) {
+ map1 = new SpillableMapImpl<>(store, ID1, 0L, sss, sss);
+ } else {
+ map1 = new SpillableMapImpl<>(store, ID1, sss, sss, te);
+ }
- SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(testMeta.store, ID1, 0L,
- new StringSerde(),
- new StringSerde());
-
- testMeta.store.setup(testMeta.operatorContext);
+ store.setup(testMeta.operatorContext);
map1.setup(testMeta.operatorContext);
- testMeta.store.beginWindow(0);
+ store.beginWindow(0);
map1.beginWindow(0);
map1.put("x", "1");
map1.put("y", "2");
@@ -429,9 +401,9 @@ public class SpillableMapImplTest
map1.put("zz", "33");
Assert.assertEquals(4, map1.size());
map1.endWindow();
- testMeta.store.endWindow();
+ store.endWindow();
- testMeta.store.beginWindow(1);
+ store.beginWindow(1);
map1.beginWindow(1);
Assert.assertEquals(4, map1.size());
map1.put("x", "4");
@@ -439,13 +411,13 @@ public class SpillableMapImplTest
map1.remove("zz");
Assert.assertEquals(3, map1.size());
map1.endWindow();
- testMeta.store.endWindow();
- testMeta.store.beforeCheckpoint(1);
- testMeta.store.checkpointed(1);
+ store.endWindow();
+ store.beforeCheckpoint(1);
+ store.checkpointed(1);
SpillableMapImpl<String, String> clonedMap1 = KryoCloneUtils.cloneObject(map1);
- testMeta.store.beginWindow(2);
+ store.beginWindow(2);
map1.beginWindow(2);
Assert.assertEquals(3, map1.size());
map1.put("x", "6");
@@ -453,11 +425,11 @@ public class SpillableMapImplTest
map1.put("w", "8");
Assert.assertEquals(4, map1.size());
map1.endWindow();
- testMeta.store.endWindow();
+ store.endWindow();
// simulating crash here
map1.teardown();
- testMeta.store.teardown();
+ store.teardown();
Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
index d0343e1..3f078cf 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
@@ -25,6 +25,7 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
import org.apache.apex.malhar.lib.utils.serde.StringSerde;
@@ -37,24 +38,38 @@ public class SpillableSetImplTest
@Rule
public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+ public TimeExtractor<String> te = null;
+
@Test
- public void simpleAddGetAndSetTest1()
+ public void simpleAddGetAndSetTest()
{
InMemSpillableStateStore store = new InMemSpillableStateStore();
- simpleAddGetAndSetTest1Helper(store);
+ simpleAddGetAndSetTestHelper(store);
+ }
+
+ @Test
+ public void simpleAddGetAndSetTimeUnifiedManagedStateTest()
+ {
+ te = new TestStringTimeExtractor();
+ simpleAddGetAndSetTestHelper(testMeta.timeStore);
}
@Test
- public void simpleAddGetAndSetManagedStateTest1()
+ public void simpleAddGetAndSetManagedStateTest()
{
- simpleAddGetAndSetTest1Helper(testMeta.store);
+ simpleAddGetAndSetTestHelper(testMeta.store);
}
- public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
+ public void simpleAddGetAndSetTestHelper(SpillableStateStore store)
{
- SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new StringSerde());
+ SpillableSetImpl<String> set;
+ if (te == null) {
+ set = new SpillableSetImpl<>(0L, ID1, store, new StringSerde());
+ } else {
+ set = new SpillableSetImpl<>(ID1, store, new StringSerde(), te);
+ }
store.setup(testMeta.operatorContext);
set.setup(testMeta.operatorContext);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
index 2f80628..bc1783c 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
@@ -26,6 +26,7 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
import org.apache.apex.malhar.lib.utils.serde.Serde;
import org.apache.apex.malhar.lib.utils.serde.StringSerde;
@@ -46,6 +47,8 @@ public class SpillableSetMultimapImplTest
@Rule
public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+ public TimeExtractor<String> te = null;
+
@Test
public void simpleMultiKeyTest()
{
@@ -60,10 +63,22 @@ public class SpillableSetMultimapImplTest
simpleMultiKeyTestHelper(testMeta.store);
}
+ @Test
+ public void simpleMultiKeyTimeUnifiedManagedStateTest()
+ {
+ te = new TestStringTimeExtractor();
+ simpleMultiKeyTestHelper(testMeta.timeStore);
+ }
+
+
public void simpleMultiKeyTestHelper(SpillableStateStore store)
{
- SpillableSetMultimapImpl<String, String> map =
- new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde());
+ SpillableSetMultimapImpl<String, String> map = null;
+ if (te == null) {
+ map = new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde());
+ } else {
+ map = new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde(), te);
+ }
store.setup(testMeta.operatorContext);
map.setup(testMeta.operatorContext);
@@ -296,7 +311,7 @@ public class SpillableSetMultimapImplTest
store.endWindow();
}
- protected Serde<String> createStringSerde()
+ private Serde<String> createStringSerde()
{
return new StringSerde();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
index d72b1f9..a312f04 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
@@ -27,6 +27,7 @@ import org.junit.runner.Description;
import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils;
import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedTimeUnifiedStateSpillableStateStore;
import org.apache.apex.malhar.lib.utils.serde.CollectionSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
@@ -57,6 +58,7 @@ public class SpillableTestUtils
public static class TestMeta extends TestWatcher
{
public ManagedStateSpillableStateStore store;
+ public ManagedTimeUnifiedStateSpillableStateStore timeStore;
public Context.OperatorContext operatorContext;
public String applicationPath;
@@ -65,8 +67,10 @@ public class SpillableTestUtils
{
TestUtils.deleteTargetTestClassFolder(description);
store = new ManagedStateSpillableStateStore();
+ timeStore = new ManagedTimeUnifiedStateSpillableStateStore();
applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
((FileAccessFSImpl)store.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+ ((FileAccessFSImpl)timeStore.getFileAccess()).setBasePath(applicationPath + "/" + "time_bucket_data");
operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TestStringTimeExtractor.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TestStringTimeExtractor.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TestStringTimeExtractor.java
new file mode 100644
index 0000000..438555f
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TestStringTimeExtractor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
+
+/**
+ * A TimeExtractor for Tests
+ * Get the time value from ASCII code of the first character
+ */
+public class TestStringTimeExtractor implements TimeExtractor<String>
+{
+ static long BASETIME = System.currentTimeMillis();
+ @Override
+ public long getTime(String s)
+ {
+ return s.toCharArray()[0] * 1000 + BASETIME - 7200000;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
index a44e454..afc5227 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
@@ -38,14 +38,16 @@ public class SpillableWindowedStorageTest
@Rule
public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+ public static long BASETIME = System.currentTimeMillis();
+
@Test
public void testWindowedPlainStorage()
{
- SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+ SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore);
SpillableWindowedPlainStorage<Integer> storage = new SpillableWindowedPlainStorage<>();
- Window window1 = new Window.TimeWindow<>(1000, 10);
- Window window2 = new Window.TimeWindow<>(1010, 10);
- Window window3 = new Window.TimeWindow<>(1020, 10);
+ Window window1 = new Window.TimeWindow<>(BASETIME + 1000, 10);
+ Window window2 = new Window.TimeWindow<>(BASETIME + 1010, 10);
+ Window window3 = new Window.TimeWindow<>(BASETIME + 1020, 10);
storage.setSpillableComplexComponent(sccImpl);
/*
@@ -103,11 +105,11 @@ public class SpillableWindowedStorageTest
@Test
public void testWindowedKeyedStorage()
{
- SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+ SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore);
SpillableWindowedKeyedStorage<String, Integer> storage = new SpillableWindowedKeyedStorage<>();
- Window window1 = new Window.TimeWindow<>(1000, 10);
- Window window2 = new Window.TimeWindow<>(1010, 10);
- Window window3 = new Window.TimeWindow<>(1020, 10);
+ Window window1 = new Window.TimeWindow<>(BASETIME + 1000, 10);
+ Window window2 = new Window.TimeWindow<>(BASETIME + 1010, 10);
+ Window window3 = new Window.TimeWindow<>(BASETIME + 1020, 10);
storage.setSpillableComplexComponent(sccImpl);
/*
@@ -118,7 +120,6 @@ public class SpillableWindowedStorageTest
storage.setup(testMeta.operatorContext);
storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
-
sccImpl.beginWindow(1000);
storage.put(window1, "x", 1);
storage.put(window2, "x", 2);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index 4a1cef0..f898e2d 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -57,6 +57,9 @@ import com.datatorrent.lib.util.KeyValPair;
@RunWith(Parameterized.class)
public class WindowedOperatorTest
{
+
+ public static final long BASE = (System.currentTimeMillis() / 1000) * 1000;
+
@Parameterized.Parameters
public static Collection<Object[]> testParameters()
{
@@ -90,7 +93,7 @@ public class WindowedOperatorTest
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>();
if (useSpillable) {
- sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+ sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore);
// TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys.
windowStateStorage = new InMemoryWindowedStorage<>();
SpillableWindowedPlainStorage<MutableLong> pds = new SpillableWindowedPlainStorage<>();
@@ -116,7 +119,7 @@ public class WindowedOperatorTest
{
KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>();
if (useSpillable) {
- sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+ sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore);
// TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys.
windowStateStorage = new InMemoryWindowedStorage<>();
if (forSession) {
@@ -183,7 +186,7 @@ public class WindowedOperatorTest
windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L));
Assert.assertEquals("There should be exactly one window in the storage", 1, plainDataStorage.size());
Assert.assertEquals("There should be exactly one window in the storage", 1, windowStateStorage.size());
@@ -192,23 +195,22 @@ public class WindowedOperatorTest
WindowState windowState = entry.getValue();
Assert.assertEquals(-1, windowState.watermarkArrivalTime);
Assert.assertEquals(2L, plainDataStorage.get(window).longValue());
-
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, 3L));
Assert.assertEquals(5L, plainDataStorage.get(window).longValue());
- windowedOperator.processWatermark(new WatermarkImpl(1200));
+ windowedOperator.processWatermark(new WatermarkImpl(BASE + 1200));
windowedOperator.endWindow();
Assert.assertTrue(windowState.watermarkArrivalTime >= 0);
Assert.assertEquals("We should get one watermark tuple", 1, controlSink.getCount(false));
windowedOperator.beginWindow(2);
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(900L, 4L));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 900L, 4L));
Assert.assertEquals("Late but not too late", 9L, plainDataStorage.get(window).longValue());
- windowedOperator.processWatermark(new WatermarkImpl(3000));
+ windowedOperator.processWatermark(new WatermarkImpl(BASE + 3000));
windowedOperator.endWindow();
Assert.assertEquals("We should get two watermark tuples", 2, controlSink.getCount(false));
windowedOperator.beginWindow(3);
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(120L, 5L)); // this tuple should be dropped
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 120L, 5L)); // this tuple should be dropped
Assert.assertEquals("The window should be dropped because it's too late", 0, plainDataStorage.size());
Assert.assertEquals("The window should be dropped because it's too late", 0, windowStateStorage.size());
windowedOperator.endWindow();
@@ -238,8 +240,8 @@ public class WindowedOperatorTest
windowedOperator.output.setSink(sink);
windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, 3L));
windowedOperator.endWindow();
Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
windowedOperator.beginWindow(2);
@@ -251,11 +253,11 @@ public class WindowedOperatorTest
Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
sink.collectedTuples.clear();
windowedOperator.beginWindow(4);
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, 4L));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 400L, 4L));
windowedOperator.endWindow();
Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
windowedOperator.beginWindow(5);
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, 5L));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, 5L));
windowedOperator.endWindow();
switch (accumulationMode) {
case ACCUMULATING:
@@ -337,8 +339,8 @@ public class WindowedOperatorTest
windowedOperator.output.setSink(sink);
windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, 3L));
windowedOperator.endWindow();
Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
windowedOperator.beginWindow(2);
@@ -376,7 +378,7 @@ public class WindowedOperatorTest
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
windowedOperator.setWindowOption(new WindowOption.GlobalWindow());
windowedOperator.setup(testMeta.operatorContext);
- Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
+ Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1100L, 2L));
Collection<? extends Window> windows = windowedValue.getWindows();
Assert.assertEquals(1, windows.size());
Assert.assertEquals(Window.GlobalWindow.INSTANCE, windows.iterator().next());
@@ -389,11 +391,11 @@ public class WindowedOperatorTest
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
windowedOperator.setup(testMeta.operatorContext);
- Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
+ Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1100L, 2L));
Collection<? extends Window> windows = windowedValue.getWindows();
Assert.assertEquals(1, windows.size());
Window window = windows.iterator().next();
- Assert.assertEquals(1000, window.getBeginTimestamp());
+ Assert.assertEquals(BASE + 1000, window.getBeginTimestamp());
Assert.assertEquals(1000, window.getDurationMillis());
}
@@ -403,19 +405,19 @@ public class WindowedOperatorTest
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200)));
windowedOperator.setup(testMeta.operatorContext);
- Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1600L, 2L));
+ Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1600L, 2L));
Collection<? extends Window> windows = windowedValue.getWindows();
Window[] winArray = windows.toArray(new Window[]{});
Assert.assertEquals(5, winArray.length);
- Assert.assertEquals(800, winArray[0].getBeginTimestamp());
+ Assert.assertEquals(BASE + 800, winArray[0].getBeginTimestamp());
Assert.assertEquals(1000, winArray[0].getDurationMillis());
- Assert.assertEquals(1000, winArray[1].getBeginTimestamp());
+ Assert.assertEquals(BASE + 1000, winArray[1].getBeginTimestamp());
Assert.assertEquals(1000, winArray[1].getDurationMillis());
- Assert.assertEquals(1200, winArray[2].getBeginTimestamp());
+ Assert.assertEquals(BASE + 1200, winArray[2].getBeginTimestamp());
Assert.assertEquals(1000, winArray[2].getDurationMillis());
- Assert.assertEquals(1400, winArray[3].getBeginTimestamp());
+ Assert.assertEquals(BASE + 1400, winArray[3].getBeginTimestamp());
Assert.assertEquals(1000, winArray[3].getDurationMillis());
- Assert.assertEquals(1600, winArray[4].getBeginTimestamp());
+ Assert.assertEquals(BASE + 1600, winArray[4].getBeginTimestamp());
Assert.assertEquals(1000, winArray[4].getDurationMillis());
windowedOperator.teardown();
}
@@ -430,14 +432,14 @@ public class WindowedOperatorTest
windowedOperator.output.setSink((Sink<Object>)(Sink)sink);
windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
- Tuple<KeyValPair<String, Long>> tuple = new Tuple.TimestampedTuple<>(1100L, new KeyValPair<>("a", 2L));
+ Tuple<KeyValPair<String, Long>> tuple = new Tuple.TimestampedTuple<>(BASE + 1100L, new KeyValPair<>("a", 2L));
windowedOperator.processTuple(tuple);
Assert.assertEquals(1, sink.getCount(false));
Tuple.WindowedTuple<KeyValPair<String, Long>> out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
Assert.assertEquals(1, out.getWindows().size());
Window.SessionWindow<String> window1 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
- Assert.assertEquals(1100L, window1.getBeginTimestamp());
+ Assert.assertEquals(BASE + 1100L, window1.getBeginTimestamp());
Assert.assertEquals(2000, window1.getDurationMillis());
Assert.assertEquals("a", window1.getKey());
Assert.assertEquals("a", out.getValue().getKey());
@@ -445,7 +447,7 @@ public class WindowedOperatorTest
sink.clear();
// extending an existing session window
- tuple = new Tuple.TimestampedTuple<>(2000L, new KeyValPair<>("a", 3L));
+ tuple = new Tuple.TimestampedTuple<>(BASE + 2000L, new KeyValPair<>("a", 3L));
windowedOperator.processTuple(tuple);
Assert.assertEquals(2, sink.getCount(false));
@@ -460,27 +462,27 @@ public class WindowedOperatorTest
out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1);
Window.SessionWindow<String> window2 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
- Assert.assertEquals(1100L, window2.getBeginTimestamp());
+ Assert.assertEquals(BASE + 1100L, window2.getBeginTimestamp());
Assert.assertEquals(2900, window2.getDurationMillis());
Assert.assertEquals("a", out.getValue().getKey());
Assert.assertEquals(5L, out.getValue().getValue().longValue());
sink.clear();
// a separate session window
- tuple = new Tuple.TimestampedTuple<>(5000L, new KeyValPair<>("a", 4L));
+ tuple = new Tuple.TimestampedTuple<>(BASE + 5000L, new KeyValPair<>("a", 4L));
windowedOperator.processTuple(tuple);
Assert.assertEquals(1, sink.getCount(false));
out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
Assert.assertEquals(1, out.getWindows().size());
Window.SessionWindow<String> window3 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
- Assert.assertEquals(5000L, window3.getBeginTimestamp());
+ Assert.assertEquals(BASE + 5000L, window3.getBeginTimestamp());
Assert.assertEquals(2000, window3.getDurationMillis());
Assert.assertEquals("a", out.getValue().getKey());
Assert.assertEquals(4L, out.getValue().getValue().longValue());
sink.clear();
// session window merging
- tuple = new Tuple.TimestampedTuple<>(3500L, new KeyValPair<>("a", 3L));
+ tuple = new Tuple.TimestampedTuple<>(BASE + 3500L, new KeyValPair<>("a", 3L));
windowedOperator.processTuple(tuple);
Assert.assertEquals(3, sink.getCount(false));
@@ -509,7 +511,7 @@ public class WindowedOperatorTest
out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(2);
Assert.assertEquals(1, out.getWindows().size());
Window.SessionWindow<String> window4 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
- Assert.assertEquals(1100L, window4.getBeginTimestamp());
+ Assert.assertEquals(BASE + 1100L, window4.getBeginTimestamp());
Assert.assertEquals(5900, window4.getDurationMillis());
Assert.assertEquals("a", out.getValue().getKey());
Assert.assertEquals(12L, out.getValue().getValue().longValue());
@@ -525,14 +527,14 @@ public class WindowedOperatorTest
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L)));
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("a", 3L)));
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("b", 4L)));
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(150L, new KeyValPair<>("b", 5L)));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, new KeyValPair<>("a", 2L)));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, new KeyValPair<>("a", 3L)));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, new KeyValPair<>("b", 4L)));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 150L, new KeyValPair<>("b", 5L)));
windowedOperator.endWindow();
Assert.assertEquals(1, keyedDataStorage.size());
- Assert.assertEquals(5L, keyedDataStorage.get(new Window.TimeWindow(0, 1000), "a").longValue());
- Assert.assertEquals(9L, keyedDataStorage.get(new Window.TimeWindow(0, 1000), "b").longValue());
+ Assert.assertEquals(5L, keyedDataStorage.get(new Window.TimeWindow(BASE, 1000), "a").longValue());
+ Assert.assertEquals(9L, keyedDataStorage.get(new Window.TimeWindow(BASE, 1000), "b").longValue());
windowedOperator.teardown();
}
@@ -559,10 +561,10 @@ public class WindowedOperatorTest
windowedOperator.output.setSink((Sink<Object>)(Sink)sink);
windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L)));
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("b", 3L)));
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, new KeyValPair<>("b", 5L)));
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("a", 4L)));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, new KeyValPair<>("a", 2L)));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, new KeyValPair<>("b", 3L)));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 400L, new KeyValPair<>("b", 5L)));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, new KeyValPair<>("a", 4L)));
windowedOperator.endWindow();
Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
windowedOperator.beginWindow(2);
@@ -581,11 +583,11 @@ public class WindowedOperatorTest
}
sink.collectedTuples.clear();
windowedOperator.beginWindow(4);
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, new KeyValPair<>("a", 8L)));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 400L, new KeyValPair<>("a", 8L)));
windowedOperator.endWindow();
Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
windowedOperator.beginWindow(5);
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("b", 9L)));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, new KeyValPair<>("b", 9L)));
windowedOperator.endWindow();
Map<String, Long> map = new HashMap<>();
switch (accumulationMode) {