You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/09/27 18:09:11 UTC
apex-malhar git commit: APEXMALHAR-2130 Spillable implementation for
WindowedOperator
Repository: apex-malhar
Updated Branches:
refs/heads/master 763d14fca -> c19c80d88
APEXMALHAR-2130 Spillable implementation for WindowedOperator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c19c80d8
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c19c80d8
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c19c80d8
Branch: refs/heads/master
Commit: c19c80d8882fd530926093d8ce6b81c0503febc3
Parents: 763d14f
Author: David Yan <da...@datatorrent.com>
Authored: Mon Aug 15 14:19:08 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Tue Sep 27 09:47:49 2016 -0700
----------------------------------------------------------------------
.../spillable/SpillableSetMultimapImpl.java | 8 +-
.../apex/malhar/lib/window/Accumulation.java | 22 +-
.../apex/malhar/lib/window/ControlTuple.java | 2 +-
.../lib/window/SessionWindowedStorage.java | 8 +-
.../apex/malhar/lib/window/TriggerOption.java | 32 +--
.../apache/apex/malhar/lib/window/Tuple.java | 5 +-
.../apache/apex/malhar/lib/window/Window.java | 5 +-
.../apex/malhar/lib/window/WindowOption.java | 2 +-
.../malhar/lib/window/WindowedOperator.java | 27 +-
.../apex/malhar/lib/window/WindowedStorage.java | 32 +--
.../window/impl/AbstractWindowedOperator.java | 27 +-
.../impl/InMemorySessionWindowedStorage.java | 4 +-
.../impl/SpillableSessionWindowedStorage.java | 123 +++++++++
.../impl/SpillableWindowedKeyedStorage.java | 224 ++++++++++++++++
.../impl/SpillableWindowedPlainStorage.java | 146 +++++++++++
.../lib/helper/OperatorContextTestHelper.java | 5 +
.../window/SpillableWindowedStorageTest.java | 150 +++++++++++
.../malhar/lib/window/WindowedOperatorTest.java | 261 ++++++++++++-------
18 files changed, 904 insertions(+), 179 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
index 951ef76..c227ed7 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -167,7 +167,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
if (spillableSet != null) {
cache.remove((K)key);
Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
- map.remove(keySlice);
+ map.put(keySlice, new ImmutablePair<>(0, spillableSet.getHead()));
spillableSet.clear();
removedSets.add(spillableSet);
}
@@ -200,7 +200,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
return true;
}
Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
- return map.containsKey(keySlice);
+ Pair<Integer, V> meta = map.get(keySlice);
+ return meta != null && meta.getLeft() > 0;
}
@Override
@@ -230,8 +231,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
cache.put(key, spillableSet);
}
- spillableSet.add(value);
- return true;
+ return spillableSet.add(value);
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
index 03f7ff7..44814d0 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
@@ -37,33 +37,33 @@ public interface Accumulation<InputT, AccumT, OutputT>
/**
* Returns the default accumulated value when nothing has been accumulated
*
- * @return
+ * @return the default accumulated value
*/
AccumT defaultAccumulatedValue();
/**
* Accumulates the input to the accumulated value
*
- * @param accumulatedValue
- * @param input
- * @return
+ * @param accumulatedValue the accumulated value
+ * @param input the input value
+ * @return the result accumulated value
*/
AccumT accumulate(AccumT accumulatedValue, InputT input);
/**
* Merges two accumulated values into one
*
- * @param accumulatedValue1
- * @param accumulatedValue2
- * @return
+ * @param accumulatedValue1 the first accumulated value
+ * @param accumulatedValue2 the second accumulated value
+ * @return the result accumulated value
*/
AccumT merge(AccumT accumulatedValue1, AccumT accumulatedValue2);
/**
* Gets the output of the accumulated value. This is used for generating the data for triggers
*
- * @param accumulatedValue
- * @return
+ * @param accumulatedValue the accumulated value
+ * @return the output
*/
OutputT getOutput(AccumT accumulatedValue);
@@ -71,8 +71,8 @@ public interface Accumulation<InputT, AccumT, OutputT>
* Gets the retraction of the value. This is used for retracting previous panes in
* ACCUMULATING_AND_RETRACTING accumulation mode
*
- * @param value
- * @return
+ * @param value the value to be retracted
+ * @return the retracted value
*/
OutputT getRetraction(OutputT value);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java b/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java
index 3288398..69093e5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/ControlTuple.java
@@ -37,7 +37,7 @@ public interface ControlTuple
/**
* Gets the timestamp associated with this watermark
*
- * @return
+ * @return the timestamp
*/
long getTimestamp();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
index 4cb2b1a..3e25d15 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
@@ -38,8 +38,8 @@ public interface SessionWindowedStorage<K, V> extends WindowedStorage.WindowedKe
* Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the
* data to toWindow, and overwrite any existing data in toWindow
*
- * @param fromWindow
- * @param toWindow
+ * @param fromWindow the window we want to migrate from
+ * @param toWindow the window we want to migrate to
*/
void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow);
@@ -51,8 +51,8 @@ public interface SessionWindowedStorage<K, V> extends WindowedStorage.WindowedKe
*
* @param key the key
* @param timestamp the timestamp
- * @param gap
- * @return
+ * @param gap the minimum gap
+ * @return the windows
*/
Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java b/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java
index 266577f..bd9cd9c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java
@@ -138,7 +138,7 @@ public class TriggerOption
/**
* Creates a TriggerOption with an initial trigger that should be fired at the watermark
*
- * @return
+ * @return the TriggerOption
*/
public static TriggerOption AtWatermark()
{
@@ -151,8 +151,8 @@ public class TriggerOption
/**
* A trigger should be fired before the watermark once for every specified duration
*
- * @param duration
- * @return
+ * @param duration the duration
+ * @return the TriggerOption
*/
public TriggerOption withEarlyFiringsAtEvery(Duration duration)
{
@@ -164,8 +164,8 @@ public class TriggerOption
/**
* A trigger should be fired before the watermark once for every n tuple(s)
*
- * @param count
- * @return
+ * @param count the count
+ * @return the TriggerOption
*/
public TriggerOption withEarlyFiringsAtEvery(long count)
{
@@ -177,8 +177,8 @@ public class TriggerOption
/**
* A trigger should be fired after the watermark once for every specified duration
*
- * @param duration
- * @return
+ * @param duration the duration
+ * @return the TriggerOption
*/
public TriggerOption withLateFiringsAtEvery(Duration duration)
{
@@ -190,8 +190,8 @@ public class TriggerOption
/**
* A trigger should be fired after the watermark once for every n late tuple(s)
*
- * @param count
- * @return
+ * @param count the count
+ * @return the TriggerOption
*/
public TriggerOption withLateFiringsAtEvery(long count)
{
@@ -203,7 +203,7 @@ public class TriggerOption
/**
* With discarding mode, the state is discarded after each trigger
*
- * @return
+ * @return the TriggerOption
*/
public TriggerOption discardingFiredPanes()
{
@@ -214,7 +214,7 @@ public class TriggerOption
/**
* With accumulating mode, the state is kept
*
- * @return
+ * @return the TriggerOption
*/
public TriggerOption accumulatingFiredPanes()
{
@@ -227,7 +227,7 @@ public class TriggerOption
* so when new values come in that change the state, a retraction trigger can be fired with the snapshot of the state
* when the last trigger was fired
*
- * @return
+ * @return the TriggerOption
*/
public TriggerOption accumulatingAndRetractingFiredPanes()
{
@@ -239,7 +239,7 @@ public class TriggerOption
* Only fire triggers for data that has changed from the last trigger. This only applies to ACCUMULATING and
* ACCUMULATING_AND_RETRACTING accumulation modes.
*
- * @return
+ * @return the TriggerOption
*/
public TriggerOption firingOnlyUpdatedPanes()
{
@@ -250,7 +250,7 @@ public class TriggerOption
/**
* Gets the accumulation mode
*
- * @return
+ * @return the AccumulationMode
*/
public AccumulationMode getAccumulationMode()
{
@@ -260,7 +260,7 @@ public class TriggerOption
/**
* Gets the trigger list
*
- * @return
+ * @return the trigger list
*/
public List<Trigger> getTriggerList()
{
@@ -271,7 +271,7 @@ public class TriggerOption
* Returns whether we should only fire panes that have been updated since the last trigger.
* When this option is set, DISCARDING accumulation mode must not be used.
*
- * @return
+ * @return whether we want to fire only updated panes
*/
public boolean isFiringOnlyUpdatedPanes()
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
index aea6bf6..c7eba4e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
@@ -35,7 +35,7 @@ public interface Tuple<T>
/**
* Gets the value of the tuple
*
- * @return
+ * @return the value
*/
T getValue();
@@ -81,6 +81,7 @@ public interface Tuple<T>
}
@Override
+ @SuppressWarnings("unchecked")
public boolean equals(Object obj)
{
if (obj instanceof PlainTuple) {
@@ -133,6 +134,7 @@ public interface Tuple<T>
}
@Override
+ @SuppressWarnings("unchecked")
public boolean equals(Object obj)
{
if (obj instanceof TimestampedTuple && super.equals(obj)) {
@@ -186,6 +188,7 @@ public interface Tuple<T>
}
@Override
+ @SuppressWarnings("unchecked")
public boolean equals(Object obj)
{
if (obj instanceof WindowedTuple && super.equals(obj)) {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
index 50d6445..1d7681d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
@@ -105,7 +105,7 @@ public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WI
/**
* Gets the beginning timestamp of this window
*
- * @return
+ * @return the begin timestamp
*/
@Override
public long getBeginTimestamp()
@@ -116,7 +116,7 @@ public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WI
/**
* Gets the duration millis of this window
*
- * @return
+ * @return the duration
*/
@Override
public long getDurationMillis()
@@ -198,6 +198,7 @@ public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WI
return false;
}
if (other instanceof SessionWindow) {
+ @SuppressWarnings("unchecked")
SessionWindow<K> otherSessionWindow = (SessionWindow<K>)other;
if (key == null) {
return otherSessionWindow.key == null;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
index 099709d..a88250e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
@@ -73,7 +73,7 @@ public interface WindowOption
/**
* The time window should be a sliding window with the given slide duration
*
- * @param duration
+ * @param duration the slide by duration
* @return the SlidingTimeWindows
*/
public SlidingTimeWindows slideBy(Duration duration)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
index ccc7ae1..400a97b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
@@ -42,36 +42,36 @@ public interface WindowedOperator<InputT>
/**
* Sets the WindowOption of this operator
*
- * @param windowOption
+ * @param windowOption the window option
*/
void setWindowOption(WindowOption windowOption);
/**
* Sets the TriggerOption of this operator
*
- * @param triggerOption
+ * @param triggerOption the trigger option
*/
void setTriggerOption(TriggerOption triggerOption);
/**
* Sets the allowed lateness of this operator
*
- * @param allowedLateness
+ * @param allowedLateness the allowed lateness
*/
void setAllowedLateness(Duration allowedLateness);
/**
* This sets the function that extracts the timestamp from the input tuple
*
- * @param timestampExtractor
+ * @param timestampExtractor the timestamp extractor
*/
void setTimestampExtractor(Function<InputT, Long> timestampExtractor);
/**
* Assign window(s) for this input tuple
*
- * @param input
- * @return
+ * @param input the input tuple
+ * @return the windowed tuple
*/
Tuple.WindowedTuple<InputT> getWindowedValue(Tuple<InputT> input);
@@ -80,8 +80,8 @@ public interface WindowedOperator<InputT>
* The implementation of this operator should look at the allowed lateness in the WindowOption.
* It should also call this function and if it returns true, it should drop the associated tuple.
*
- * @param timestamp
- * @return
+ * @param timestamp the timestamp
+ * @return whether the timestamp is considered too late
*/
boolean isTooLate(long timestamp);
@@ -89,14 +89,14 @@ public interface WindowedOperator<InputT>
* This method is supposed to drop the tuple because it has passed the allowed lateness. But an implementation
* of this method has the chance to do something different (e.g. emit it to another port)
*
- * @param input
+ * @param input the input tuple
*/
void dropTuple(Tuple<InputT> input);
/**
* This method accumulates the incoming tuple (with the Accumulation interface)
*
- * @param tuple
+ * @param tuple the input tuple
*/
void accumulateTuple(Tuple.WindowedTuple<InputT> tuple);
@@ -106,7 +106,7 @@ public interface WindowedOperator<InputT>
* and change the state of each of those windows. All tuples for those windows arriving after
* the watermark will be considered late.
*
- * @param watermark
+ * @param watermark the watermark tuple
*/
void processWatermark(ControlTuple.Watermark watermark);
@@ -114,14 +114,15 @@ public interface WindowedOperator<InputT>
* This method fires the trigger for the given window, and possibly retraction trigger. The implementation should clear
* the window data in the storage if the accumulation mode is DISCARDING
*
- * @param window
+ * @param window the window
+ * @param windowState the window state
*/
void fireTrigger(Window window, WindowState windowState);
/**
* This method clears the window data in the storage.
*
- * @param window
+ * @param window the window
*/
void clearWindowData(Window window);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
index 42ecdae..e2874ba 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
@@ -37,21 +37,21 @@ public interface WindowedStorage extends Component<Context.OperatorContext>
/**
* Returns true if the storage contains this window
*
- * @param window
+ * @param window the window
*/
boolean containsWindow(Window window);
/**
* Returns the number of windows in the storage
*
- * @return
+ * @return the number of windows
*/
long size();
/**
* Removes all the data associated with the given window. This does NOT mean removing the window in checkpointed state
*
- * @param window
+ * @param window the window
*/
void remove(Window window);
@@ -68,23 +68,23 @@ public interface WindowedStorage extends Component<Context.OperatorContext>
/**
* Sets the data associated with the given window
*
- * @param window
- * @param value
+ * @param window the window
+ * @param value the value
*/
void put(Window window, T value);
/**
* Gets the value associated with the given window
*
- * @param window
- * @return
+ * @param window the window
+ * @return the value
*/
T get(Window window);
/**
* Returns the iterable of the entries in the storage
*
- * @return
+ * @return the entries
*/
Iterable<Map.Entry<Window, T>> entries();
}
@@ -100,26 +100,26 @@ public interface WindowedStorage extends Component<Context.OperatorContext>
/**
* Sets the data associated with the given window and the key
*
- * @param window
- * @param key
- * @param value
+ * @param window the window
+ * @param key the key
+ * @param value the value
*/
void put(Window window, K key, V value);
/**
* Gets an iterable object over the key/value pairs associated with the given window
*
- * @param window
- * @return
+ * @param window the window
+ * @return the entries
*/
Iterable<Map.Entry<K, V>> entries(Window window);
/**
* Gets the data associated with the given window and the key
*
- * @param window
- * @param key
- * @return
+ * @param window the window
+ * @param key the key
+ * @return the value
*/
V get(Window window, K key);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index f90d47d..c778523 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -131,7 +131,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
/**
* Process the incoming data tuple
*
- * @param tuple
+ * @param tuple the incoming tuple
*/
public void processTuple(Tuple<InputT> tuple)
{
@@ -206,21 +206,21 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
/**
* This method sets the storage for the data for each window
*
- * @param storageAgent
+ * @param dataStorage The data storage
*/
- public void setDataStorage(DataStorageT storageAgent)
+ public void setDataStorage(DataStorageT dataStorage)
{
- this.dataStorage = storageAgent;
+ this.dataStorage = dataStorage;
}
/**
* This method sets the storage for the retraction data for each window. Only used when the accumulation mode is ACCUMULATING_AND_RETRACTING
*
- * @param storageAgent
+ * @param retractionStorage The retraction storage
*/
- public void setRetractionStorage(RetractionStorageT storageAgent)
+ public void setRetractionStorage(RetractionStorageT retractionStorage)
{
- this.retractionStorage = storageAgent;
+ this.retractionStorage = retractionStorage;
}
public void addComponent(String key, Component<Context.OperatorContext> component)
@@ -232,7 +232,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
* Sets the accumulation, which basically tells the WindowedOperator what to do if a new tuple comes in and what
* to put in the pane when a trigger is fired
*
- * @param accumulation
+ * @param accumulation the accumulation
*/
public void setAccumulation(AccumulationT accumulation)
{
@@ -345,8 +345,8 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
* If we are doing sliding windows, this will return multiple windows. Otherwise, only one window will be returned.
* Note that this method does not apply to SessionWindows.
*
- * @param timestamp
- * @return
+ * @param timestamp the timestamp
+ * @return the windows this timestamp belongs to
*/
private Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp)
{
@@ -378,7 +378,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
@Override
public boolean isTooLate(long timestamp)
{
- return allowedLatenessMillis < 0 ? false : (timestamp < currentWatermark - allowedLatenessMillis);
+ return allowedLatenessMillis >= 0 && (timestamp < currentWatermark - allowedLatenessMillis);
}
@Override
@@ -395,6 +395,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
}
@Override
+ @SuppressWarnings("unchecked")
public void setup(Context.OperatorContext context)
{
this.timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
@@ -537,7 +538,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
/**
* This method fires the normal trigger for the given window.
*
- * @param window
+ * @param window the window to fire trigger on
* @param fireOnlyUpdatedPanes Do not fire trigger if the old value is the same as the new value. If true, retraction storage is required.
*/
public abstract void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes);
@@ -546,7 +547,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
* This method fires the retraction trigger for the given window. This should only be valid if the accumulation
* mode is ACCUMULATING_AND_RETRACTING
*
- * @param window
+ * @param window the window to fire the retraction trigger on
*/
public abstract void fireRetractionTrigger(Window window);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
index fdceb4d..906b1b9 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
@@ -45,13 +45,15 @@ public class InMemorySessionWindowedStorage<K, V> extends InMemoryWindowedKeyedS
@Override
public void put(Window window, K key, V value)
{
+ @SuppressWarnings("unchecked")
+ Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window;
super.put(window, key, value);
TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key);
if (sessionWindows == null) {
sessionWindows = new TreeSet<>();
keyToWindows.put(key, sessionWindows);
}
- sessionWindows.add((Window.SessionWindow<K>)window);
+ sessionWindows.add(sessionWindow);
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
new file mode 100644
index 0000000..8779739
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Spillable session windowed storage.
+ */
+@InterfaceStability.Evolving
+public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeyedStorage<K, V> implements SessionWindowedStorage<K, V>
+{
+ // additional key to windows map for fast lookup of windows using key
+ private Spillable.SpillableSetMultimap<K, Window.SessionWindow<K>> keyToWindowsMap;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ if (keyToWindowsMap == null) {
+ // NOTE: this will pose difficulties when we try to assign the entries to a time bucket later on.
+ // This is logged in APEXMALHAR-2271
+ keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>, Slice>)(Serde)windowSerde);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void remove(Window window)
+ {
+ super.remove(window);
+ Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window;
+ keyToWindowsMap.remove(sessionWindow.getKey(), sessionWindow);
+ }
+
+ @Override
+ public void put(Window window, K key, V value)
+ {
+ super.put(window, key, value);
+ @SuppressWarnings("unchecked")
+ Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window;
+ if (!keyToWindowsMap.containsEntry(key, sessionWindow)) {
+ keyToWindowsMap.put(key, sessionWindow);
+ }
+ }
+
+ @Override
+ public void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow)
+ {
+ Set<K> keys = windowToKeysMap.get(fromWindow);
+ if (keys == null) {
+ return;
+ }
+ windowKeyToValueMap.remove(toWindow);
+ for (K key : keys) {
+ windowToKeysMap.put(toWindow, key);
+ ImmutablePair<Window, K> oldKey = new ImmutablePair<Window, K>(fromWindow, key);
+ ImmutablePair<Window, K> newKey = new ImmutablePair<Window, K>(toWindow, key);
+
+ V value = windowKeyToValueMap.get(oldKey);
+ windowKeyToValueMap.remove(oldKey);
+ windowKeyToValueMap.put(newKey, value);
+ keyToWindowsMap.remove(key, fromWindow);
+ keyToWindowsMap.put(key, toWindow);
+ }
+ windowToKeysMap.removeAll(fromWindow);
+ }
+
+ @Override
+ public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap)
+ {
+ List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>();
+ Set<Window.SessionWindow<K>> sessionWindows = keyToWindowsMap.get(key);
+ if (sessionWindows != null) {
+ for (Window.SessionWindow<K> window : sessionWindows) {
+ if (timestamp > window.getBeginTimestamp()) {
+ if (window.getBeginTimestamp() + window.getDurationMillis() + gap > timestamp) {
+ results.add(new AbstractMap.SimpleEntry<>(window, windowKeyToValueMap.get(new ImmutablePair<Window, K>(window, key))));
+ }
+ } else if (timestamp < window.getBeginTimestamp()) {
+ if (window.getBeginTimestamp() - gap <= timestamp) {
+ results.add(new AbstractMap.SimpleEntry<>(window, windowKeyToValueMap.get(new ImmutablePair<Window, K>(window, key))));
+ }
+ } else {
+ results.add(new AbstractMap.SimpleEntry<>(window, windowKeyToValueMap.get(new ImmutablePair<Window, K>(window, key))));
+ }
+ }
+ }
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
new file mode 100644
index 0000000..ac77d1b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.AbstractMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Implementation of WindowedKeyedStorage using {@link Spillable} data structures
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
+{
+ @NotNull
+ protected SpillableComplexComponent scc;
+ protected long bucket;
+ protected Serde<Window, Slice> windowSerde;
+ protected Serde<Pair<Window, K>, Slice> windowKeyPairSerde;
+ protected Serde<K, Slice> keySerde;
+ protected Serde<V, Slice> valueSerde;
+
+ protected Spillable.SpillableByteMap<Pair<Window, K>, V> windowKeyToValueMap;
+ protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap;
+
+ private class KVIterator implements Iterator<Map.Entry<K, V>>
+ {
+ final Window window;
+ final Set<K> keys;
+ Iterator<K> iterator;
+
+ KVIterator(Window window)
+ {
+ this.window = window;
+ this.keys = windowToKeysMap.get(window);
+ if (this.keys != null) {
+ this.iterator = this.keys.iterator();
+ }
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return iterator != null && iterator.hasNext();
+ }
+
+ @Override
+ public Map.Entry<K, V> next()
+ {
+ K key = iterator.next();
+ return new AbstractMap.SimpleEntry<>(key, windowKeyToValueMap.get(new ImmutablePair<>(window, key)));
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public SpillableWindowedKeyedStorage()
+ {
+ }
+
+ public SpillableWindowedKeyedStorage(long bucket,
+ Serde<Window, Slice> windowSerde, Serde<Pair<Window, K>, Slice> windowKeyPairSerde, Serde<K, Slice> keySerde, Serde<V, Slice> valueSerde)
+ {
+ this.bucket = bucket;
+ this.windowSerde = windowSerde;
+ this.windowKeyPairSerde = windowKeyPairSerde;
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ }
+
+ public void setSpillableComplexComponent(SpillableComplexComponent scc)
+ {
+ this.scc = scc;
+ }
+
+ public SpillableComplexComponent getSpillableComplexComponent()
+ {
+ return this.scc;
+ }
+
+ public void setBucket(long bucket)
+ {
+ this.bucket = bucket;
+ }
+
+ public void setWindowSerde(Serde<Window, Slice> windowSerde)
+ {
+ this.windowSerde = windowSerde;
+ }
+
+ public void setWindowKeyPairSerde(Serde<Pair<Window, K>, Slice> windowKeyPairSerde)
+ {
+ this.windowKeyPairSerde = windowKeyPairSerde;
+ }
+
+ public void setValueSerde(Serde<V, Slice> valueSerde)
+ {
+ this.valueSerde = valueSerde;
+ }
+
+ @Override
+ public boolean containsWindow(Window window)
+ {
+ return windowToKeysMap.containsKey(window);
+ }
+
+ @Override
+ public long size()
+ {
+ return windowToKeysMap.size();
+ }
+
+ @Override
+ public void remove(Window window)
+ {
+ Set<K> keys = windowToKeysMap.get(window);
+ if (keys != null) {
+ for (K key : keys) {
+ windowKeyToValueMap.remove(new ImmutablePair<>(window, key));
+ }
+ }
+ windowToKeysMap.removeAll(window);
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ if (bucket == 0) {
+ // choose a bucket that is guaranteed to be unique in Apex
+ bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode();
+ }
+ // set default serdes
+ if (windowSerde == null) {
+ windowSerde = new SerdeKryoSlice<>();
+ }
+ if (windowKeyPairSerde == null) {
+ windowKeyPairSerde = new SerdeKryoSlice<>();
+ }
+ if (keySerde == null) {
+ keySerde = new SerdeKryoSlice<>();
+ }
+ if (valueSerde == null) {
+ valueSerde = new SerdeKryoSlice<>();
+ }
+
+ if (windowKeyToValueMap == null) {
+ windowKeyToValueMap = scc.newSpillableByteMap(bucket, windowKeyPairSerde, valueSerde);
+ }
+ if (windowToKeysMap == null) {
+ windowToKeysMap = scc.newSpillableSetMultimap(bucket, windowSerde, keySerde);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public void put(Window window, K key, V value)
+ {
+ if (!windowToKeysMap.containsEntry(window, key)) {
+ windowToKeysMap.put(window, key);
+ }
+ windowKeyToValueMap.put(new ImmutablePair<>(window, key), value);
+ }
+
+ @Override
+ public Iterable<Map.Entry<K, V>> entries(final Window window)
+ {
+ return new Iterable<Map.Entry<K, V>>()
+ {
+ @Override
+ public Iterator<Map.Entry<K, V>> iterator()
+ {
+ return new KVIterator(window);
+ }
+ };
+ }
+
+ @Override
+ public V get(Window window, K key)
+ {
+ return windowKeyToValueMap.get(new ImmutablePair<>(window, key));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
new file mode 100644
index 0000000..81f5dbb
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of WindowedPlainStorage that makes use of {@link Spillable} data structures
+ *
+ * @param <T> Type of the value per window
+ */
+public class SpillableWindowedPlainStorage<T> implements WindowedStorage.WindowedPlainStorage<T>
+{
+ @NotNull
+ private SpillableComplexComponent scc;
+ private long bucket;
+ private Serde<Window, Slice> windowSerde;
+ private Serde<T, Slice> valueSerde;
+
+ protected Spillable.SpillableByteMap<Window, T> windowToDataMap;
+
+ public SpillableWindowedPlainStorage()
+ {
+ }
+
+ public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde)
+ {
+ this.bucket = bucket;
+ this.windowSerde = windowSerde;
+ this.valueSerde = valueSerde;
+ }
+
+ public void setSpillableComplexComponent(SpillableComplexComponent scc)
+ {
+ this.scc = scc;
+ }
+
+ public SpillableComplexComponent getSpillableComplexComponent()
+ {
+ return scc;
+ }
+
+ public void setBucket(long bucket)
+ {
+ this.bucket = bucket;
+ }
+
+ public void setWindowSerde(Serde<Window, Slice> windowSerde)
+ {
+ this.windowSerde = windowSerde;
+ }
+
+ public void setValueSerde(Serde<T, Slice> valueSerde)
+ {
+ this.valueSerde = valueSerde;
+ }
+
+ @Override
+ public void put(Window window, T value)
+ {
+ windowToDataMap.put(window, value);
+ }
+
+ @Override
+ public T get(Window window)
+ {
+ return windowToDataMap.get(window);
+ }
+
+ @Override
+ public Iterable<Map.Entry<Window, T>> entries()
+ {
+ return windowToDataMap.entrySet();
+ }
+
+ @Override
+ public boolean containsWindow(Window window)
+ {
+ return windowToDataMap.containsKey(window);
+ }
+
+ @Override
+ public long size()
+ {
+ return windowToDataMap.size();
+ }
+
+ @Override
+ public void remove(Window window)
+ {
+ windowToDataMap.remove(window);
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ if (bucket == 0) {
+ // choose a bucket that is almost guaranteed to be unique
+ bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode();
+ }
+ // set default serdes
+ if (windowSerde == null) {
+ windowSerde = new SerdeKryoSlice<>();
+ }
+ if (valueSerde == null) {
+ valueSerde = new SerdeKryoSlice<>();
+ }
+ if (windowToDataMap == null) {
+ windowToDataMap = scc.newSpillableByteMap(bucket, windowSerde, valueSerde);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
index 2ece6b2..8fa814d 100644
--- a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
+++ b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
@@ -60,6 +60,11 @@ public class OperatorContextTestHelper
this.attributes = map;
}
+ public com.datatorrent.api.Attribute.AttributeMap getAttributes()
+ {
+ return attributes;
+ }
+
@Override
public int getId()
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
new file mode 100644
index 0000000..3b7789c
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
+import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+/**
+ * Unit tests for Spillable Windowed Storage
+ */
+public class SpillableWindowedStorageTest
+{
+ @Rule
+ public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+ @Test
+ public void testWindowedPlainStorage()
+ {
+ SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+ SpillableWindowedPlainStorage<Integer> storage = new SpillableWindowedPlainStorage<>();
+ Window window1 = new Window.TimeWindow<>(1000, 10);
+ Window window2 = new Window.TimeWindow<>(1010, 10);
+ Window window3 = new Window.TimeWindow<>(1020, 10);
+ storage.setSpillableComplexComponent(sccImpl);
+ storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+ storage.setup(testMeta.operatorContext);
+
+ sccImpl.beginWindow(1000);
+ storage.put(window1, 1);
+ storage.put(window2, 2);
+ storage.put(window3, 3);
+ sccImpl.endWindow();
+ sccImpl.beginWindow(1001);
+ storage.put(window1, 4);
+ storage.put(window2, 5);
+ sccImpl.endWindow();
+ sccImpl.beforeCheckpoint(1001);
+ SpillableWindowedPlainStorage<Integer> clonedStorage = KryoCloneUtils.cloneObject(storage);
+ sccImpl.checkpointed(1001);
+
+
+ sccImpl.beginWindow(1002);
+ storage.put(window1, 6);
+ storage.put(window2, 7);
+ sccImpl.endWindow();
+
+ Assert.assertEquals(6L, storage.get(window1).longValue());
+ Assert.assertEquals(7L, storage.get(window2).longValue());
+ Assert.assertEquals(3L, storage.get(window3).longValue());
+
+ sccImpl.beginWindow(1003);
+ storage.put(window1, 8);
+ storage.put(window2, 9);
+ sccImpl.endWindow();
+
+ // simulating crash here
+ storage.teardown();
+ storage.getSpillableComplexComponent().teardown();
+
+ storage = clonedStorage;
+ testMeta.operatorContext.getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1001L);
+ storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+ storage.setup(testMeta.operatorContext);
+
+ // recovery at window 1002
+ sccImpl.beginWindow(1002);
+ Assert.assertEquals(4L, storage.get(window1).longValue());
+ Assert.assertEquals(5L, storage.get(window2).longValue());
+ Assert.assertEquals(3L, storage.get(window3).longValue());
+ }
+
+ @Test
+ public void testWindowedKeyedStorage()
+ {
+ SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+ SpillableWindowedKeyedStorage<String, Integer> storage = new SpillableWindowedKeyedStorage<>();
+ Window window1 = new Window.TimeWindow<>(1000, 10);
+ Window window2 = new Window.TimeWindow<>(1010, 10);
+ Window window3 = new Window.TimeWindow<>(1020, 10);
+ storage.setSpillableComplexComponent(sccImpl);
+ storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+ storage.setup(testMeta.operatorContext);
+
+ sccImpl.beginWindow(1000);
+ storage.put(window1, "x", 1);
+ storage.put(window2, "x", 2);
+ storage.put(window3, "x", 3);
+ sccImpl.endWindow();
+ sccImpl.beginWindow(1001);
+ storage.put(window1, "x", 4);
+ storage.put(window2, "x", 5);
+ sccImpl.endWindow();
+ sccImpl.beforeCheckpoint(1001);
+ SpillableWindowedKeyedStorage<String, Integer> clonedStorage = KryoCloneUtils.cloneObject(storage);
+ sccImpl.checkpointed(1001);
+
+ sccImpl.beginWindow(1002);
+ storage.put(window1, "x", 6);
+ storage.put(window2, "x", 7);
+ storage.put(window2, "y", 8);
+ sccImpl.endWindow();
+
+ Assert.assertEquals(6L, storage.get(window1, "x").longValue());
+ Assert.assertEquals(7L, storage.get(window2, "x").longValue());
+ Assert.assertEquals(3L, storage.get(window3, "x").longValue());
+ Assert.assertEquals(8L, storage.get(window2, "y").longValue());
+
+ // simulating crash here
+ storage.teardown();
+ storage.getSpillableComplexComponent().teardown();
+
+ storage = clonedStorage;
+ testMeta.operatorContext.getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1001L);
+ storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+ storage.setup(testMeta.operatorContext);
+
+ // recovery at window 1002
+ sccImpl.beginWindow(1002);
+ Assert.assertEquals(4L, storage.get(window1, "x").longValue());
+ Assert.assertEquals(5L, storage.get(window2, "x").longValue());
+ Assert.assertEquals(3L, storage.get(window3, "x").longValue());
+ Assert.assertNull(storage.get(window2, "y"));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c19c80d8/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index 7396994..4edbcd0 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -18,35 +18,64 @@
*/
package org.apache.apex.malhar.lib.window;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import java.util.TreeMap;
import javax.validation.ValidationException;
import org.joda.time.Duration;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
import org.apache.apex.malhar.lib.window.impl.InMemorySessionWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
+import org.apache.apex.malhar.lib.window.impl.SpillableSessionWindowedStorage;
+import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage;
import org.apache.apex.malhar.lib.window.impl.WatermarkImpl;
import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
import org.apache.commons.lang3.mutable.MutableLong;
-import com.datatorrent.api.Attribute;
import com.datatorrent.api.Sink;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.KeyValPair;
/**
* Unit tests for WindowedOperator
*/
+@RunWith(Parameterized.class)
public class WindowedOperatorTest
{
+ @Parameterized.Parameters
+ public static Collection<Object[]> testParameters()
+ {
+ return Arrays.asList(new Object[][]{{false}, {true}});
+ }
+
+ @Parameterized.Parameter
+ public Boolean useSpillable;
+
+ private WindowedStorage.WindowedPlainStorage<WindowState> windowStateStorage;
+ private WindowedStorage.WindowedPlainStorage<MutableLong> plainDataStorage;
+ private WindowedStorage.WindowedPlainStorage<Long> plainRetractionStorage;
+ private WindowedStorage.WindowedKeyedStorage<String, MutableLong> keyedDataStorage;
+ private WindowedStorage.WindowedKeyedStorage<String, Long> keyedRetractionStorage;
+ private SpillableComplexComponentImpl sccImpl;
+
+ @Rule
+ public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
private void verifyValidationFailure(WindowedOperatorImpl windowedOperator, String message)
{
try {
@@ -60,19 +89,61 @@ public class WindowedOperatorTest
private WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator()
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>();
- windowedOperator.setDataStorage(new InMemoryWindowedStorage<MutableLong>());
- windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<Long>());
- windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
+ if (useSpillable) {
+ sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+ // TODO: We don't yet support Spillable data structures for window state storage because SpillableByteMapImpl does not yet support iterating over all keys.
+ windowStateStorage = new InMemoryWindowedStorage<>();
+ SpillableWindowedPlainStorage<MutableLong> pds = new SpillableWindowedPlainStorage<>();
+ pds.setSpillableComplexComponent(sccImpl);
+ plainDataStorage = pds;
+ SpillableWindowedPlainStorage<Long> prs = new SpillableWindowedPlainStorage<>();
+ prs.setSpillableComplexComponent(sccImpl);
+ plainRetractionStorage = prs;
+ windowedOperator.addComponent("SpillableComplexComponent", sccImpl);
+ } else {
+ windowStateStorage = new InMemoryWindowedStorage<>();
+ plainDataStorage = new InMemoryWindowedStorage<>();
+ plainRetractionStorage = new InMemoryWindowedStorage<>();
+ }
+ windowedOperator.setDataStorage(plainDataStorage);
+ windowedOperator.setRetractionStorage(plainRetractionStorage);
+ windowedOperator.setWindowStateStorage(windowStateStorage);
windowedOperator.setAccumulation(new SumAccumulation());
return windowedOperator;
}
- private KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator()
+ private KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator(boolean forSession)
{
KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>();
- windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableLong>());
- windowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, Long>());
- windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
+ if (useSpillable) {
+ sccImpl = new SpillableComplexComponentImpl(testMeta.store);
+ // TODO: We don't yet support Spillable data structures for window state storage because SpillableByteMapImpl does not yet support iterating over all keys.
+ windowStateStorage = new InMemoryWindowedStorage<>();
+ if (forSession) {
+ SpillableSessionWindowedStorage<String, MutableLong> sws = new SpillableSessionWindowedStorage<>();
+ sws.setSpillableComplexComponent(sccImpl);
+ keyedDataStorage = sws;
+ } else {
+ SpillableWindowedKeyedStorage<String, MutableLong> kds = new SpillableWindowedKeyedStorage<>();
+ kds.setSpillableComplexComponent(sccImpl);
+ keyedDataStorage = kds;
+ }
+ SpillableWindowedKeyedStorage<String, Long> krs = new SpillableWindowedKeyedStorage<>();
+ krs.setSpillableComplexComponent(sccImpl);
+ keyedRetractionStorage = krs;
+ windowedOperator.addComponent("SpillableComplexComponent", sccImpl);
+ } else {
+ windowStateStorage = new InMemoryWindowedStorage<>();
+ if (forSession) {
+ keyedDataStorage = new InMemorySessionWindowedStorage<>();
+ } else {
+ keyedDataStorage = new InMemoryWindowedKeyedStorage<>();
+ }
+ keyedRetractionStorage = new InMemoryWindowedKeyedStorage<>();
+ }
+ windowedOperator.setDataStorage(keyedDataStorage);
+ windowedOperator.setRetractionStorage(keyedRetractionStorage);
+ windowedOperator.setWindowStateStorage(windowStateStorage);
windowedOperator.setAccumulation(new SumAccumulation());
return windowedOperator;
}
@@ -102,8 +173,6 @@ public class WindowedOperatorTest
@Test
public void testWatermarkAndAllowedLateness()
{
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
- new Attribute.AttributeMap.DefaultAttributeMap());
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
CollectorTestSink controlSink = new CollectorTestSink();
@@ -112,26 +181,20 @@ public class WindowedOperatorTest
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
windowedOperator.setAllowedLateness(Duration.millis(1000));
- WindowedStorage.WindowedPlainStorage<MutableLong> dataStorage = new InMemoryWindowedStorage<>();
- WindowedStorage.WindowedPlainStorage<WindowState> windowStateStorage = new InMemoryWindowedStorage<>();
-
- windowedOperator.setDataStorage(dataStorage);
- windowedOperator.setWindowStateStorage(windowStateStorage);
-
- windowedOperator.setup(context);
+ windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
- Assert.assertEquals("There should be exactly one window in the storage", 1, dataStorage.size());
+ Assert.assertEquals("There should be exactly one window in the storage", 1, plainDataStorage.size());
Assert.assertEquals("There should be exactly one window in the storage", 1, windowStateStorage.size());
Map.Entry<Window, WindowState> entry = windowStateStorage.entries().iterator().next();
Window window = entry.getKey();
WindowState windowState = entry.getValue();
Assert.assertEquals(-1, windowState.watermarkArrivalTime);
- Assert.assertEquals(2L, dataStorage.get(window).longValue());
+ Assert.assertEquals(2L, plainDataStorage.get(window).longValue());
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
- Assert.assertEquals(5L, dataStorage.get(window).longValue());
+ Assert.assertEquals(5L, plainDataStorage.get(window).longValue());
windowedOperator.processWatermark(new WatermarkImpl(1200));
windowedOperator.endWindow();
@@ -140,15 +203,16 @@ public class WindowedOperatorTest
windowedOperator.beginWindow(2);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(900L, 4L));
- Assert.assertEquals("Late but not too late", 9L, dataStorage.get(window).longValue());
+ Assert.assertEquals("Late but not too late", 9L, plainDataStorage.get(window).longValue());
windowedOperator.processWatermark(new WatermarkImpl(3000));
windowedOperator.endWindow();
Assert.assertEquals("We should get two watermark tuples", 2, controlSink.getCount(false));
windowedOperator.beginWindow(3);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(120L, 5L)); // this tuple should be dropped
- Assert.assertEquals("The window should be dropped because it's too late", 0, dataStorage.size());
+ Assert.assertEquals("The window should be dropped because it's too late", 0, plainDataStorage.size());
Assert.assertEquals("The window should be dropped because it's too late", 0, windowStateStorage.size());
windowedOperator.endWindow();
+ windowedOperator.teardown();
}
private void testTrigger(TriggerOption.AccumulationMode accumulationMode)
@@ -172,9 +236,7 @@ public class WindowedOperatorTest
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
CollectorTestSink sink = new CollectorTestSink();
windowedOperator.output.setSink(sink);
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
- new Attribute.AttributeMap.DefaultAttributeMap());
- windowedOperator.setup(context);
+ windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
@@ -212,6 +274,7 @@ public class WindowedOperatorTest
default:
throw new RuntimeException("Unknown accumulation mode: " + accumulationMode);
}
+ windowedOperator.teardown();
}
@Test
@@ -233,71 +296,77 @@ public class WindowedOperatorTest
}
@Test
+ public void testTriggerWithAccumulatingModeFiringAllPanes()
+ {
+ testTriggerWithAccumulatingModeHelper(false);
+ }
+
+ @Test
public void testTriggerWithAccumulatingModeFiringOnlyUpdatedPanes()
{
- for (boolean firingOnlyUpdatedPanes : new boolean[]{true, false}) {
- WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
- TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000))
- .accumulatingFiredPanes();
- if (firingOnlyUpdatedPanes) {
- triggerOption.firingOnlyUpdatedPanes();
- }
- windowedOperator.setTriggerOption(triggerOption);
- windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
- CollectorTestSink sink = new CollectorTestSink();
- windowedOperator.output.setSink(sink);
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
- new Attribute.AttributeMap.DefaultAttributeMap());
- windowedOperator.setup(context);
- windowedOperator.beginWindow(1);
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
- windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
- windowedOperator.endWindow();
- Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
- windowedOperator.beginWindow(2);
- windowedOperator.endWindow();
- Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
- windowedOperator.beginWindow(3);
- windowedOperator.endWindow();
+ testTriggerWithAccumulatingModeHelper(true);
+ }
+
+ public void testTriggerWithAccumulatingModeHelper(boolean firingOnlyUpdatedPanes)
+ {
+ WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
+ TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000))
+ .accumulatingFiredPanes();
+ if (firingOnlyUpdatedPanes) {
+ triggerOption.firingOnlyUpdatedPanes();
+ }
+ windowedOperator.setTriggerOption(triggerOption);
+ windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
+ CollectorTestSink sink = new CollectorTestSink();
+ windowedOperator.output.setSink(sink);
+ windowedOperator.setup(testMeta.operatorContext);
+ windowedOperator.beginWindow(1);
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
+ windowedOperator.endWindow();
+ Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
+ windowedOperator.beginWindow(2);
+ windowedOperator.endWindow();
+ Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
+ windowedOperator.beginWindow(3);
+ windowedOperator.endWindow();
+ Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size());
+ Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+ sink.collectedTuples.clear();
+ windowedOperator.beginWindow(4);
+ windowedOperator.endWindow();
+ Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
+ windowedOperator.beginWindow(5);
+ windowedOperator.endWindow();
+ if (firingOnlyUpdatedPanes) {
+ Assert.assertTrue("There should not be any trigger since no panes have been updated", sink.collectedTuples
+ .isEmpty());
+ } else {
Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size());
Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
- sink.collectedTuples.clear();
- windowedOperator.beginWindow(4);
- windowedOperator.endWindow();
- Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
- windowedOperator.beginWindow(5);
- windowedOperator.endWindow();
- if (firingOnlyUpdatedPanes) {
- Assert.assertTrue("There should not be any trigger since no panes have been updated", sink.collectedTuples.isEmpty());
- } else {
- Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size());
- Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
- }
}
+ windowedOperator.teardown();
}
@Test
public void testGlobalWindowAssignment()
{
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
- new Attribute.AttributeMap.DefaultAttributeMap());
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
windowedOperator.setWindowOption(new WindowOption.GlobalWindow());
- windowedOperator.setup(context);
+ windowedOperator.setup(testMeta.operatorContext);
Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
Collection<? extends Window> windows = windowedValue.getWindows();
Assert.assertEquals(1, windows.size());
Assert.assertEquals(Window.GlobalWindow.INSTANCE, windows.iterator().next());
+ windowedOperator.teardown();
}
@Test
public void testTimeWindowAssignment()
{
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
- new Attribute.AttributeMap.DefaultAttributeMap());
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
- windowedOperator.setup(context);
+ windowedOperator.setup(testMeta.operatorContext);
Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
Collection<? extends Window> windows = windowedValue.getWindows();
Assert.assertEquals(1, windows.size());
@@ -309,11 +378,9 @@ public class WindowedOperatorTest
@Test
public void testSlidingWindowAssignment()
{
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
- new Attribute.AttributeMap.DefaultAttributeMap());
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200)));
- windowedOperator.setup(context);
+ windowedOperator.setup(testMeta.operatorContext);
Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1600L, 2L));
Collection<? extends Window> windows = windowedValue.getWindows();
Window[] winArray = windows.toArray(new Window[]{});
@@ -328,21 +395,18 @@ public class WindowedOperatorTest
Assert.assertEquals(1000, winArray[3].getDurationMillis());
Assert.assertEquals(1600, winArray[4].getBeginTimestamp());
Assert.assertEquals(1000, winArray[4].getDurationMillis());
+ windowedOperator.teardown();
}
@Test
public void testSessionWindows()
{
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
- new Attribute.AttributeMap.DefaultAttributeMap());
- KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator();
- windowedOperator.setDataStorage(new InMemorySessionWindowedStorage<String, MutableLong>());
- windowedOperator.setRetractionStorage(new InMemorySessionWindowedStorage<String, Long>());
+ KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(true);
windowedOperator.setWindowOption(new WindowOption.SessionWindows(Duration.millis(2000)));
windowedOperator.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1).accumulatingAndRetractingFiredPanes().firingOnlyUpdatedPanes());
CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new CollectorTestSink();
windowedOperator.output.setSink((Sink<Object>)(Sink)sink);
- windowedOperator.setup(context);
+ windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
Tuple<KeyValPair<String, Long>> tuple = new Tuple.TimestampedTuple<>(1100L, new KeyValPair<>("a", 2L));
windowedOperator.processTuple(tuple);
@@ -400,16 +464,24 @@ public class WindowedOperatorTest
Assert.assertEquals(3, sink.getCount(false));
// retraction of the two old windows
+ Map<Window, KeyValPair<String, Long>> tuples = new TreeMap<>();
out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
Assert.assertEquals(1, out.getWindows().size());
- Assert.assertEquals(window2, out.getWindows().iterator().next());
- Assert.assertEquals("a", out.getValue().getKey());
- Assert.assertEquals(-5L, out.getValue().getValue().longValue());
+ tuples.put(out.getWindows().iterator().next(), out.getValue());
+
out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1);
Assert.assertEquals(1, out.getWindows().size());
- Assert.assertEquals(window3, out.getWindows().iterator().next());
- Assert.assertEquals("a", out.getValue().getKey());
- Assert.assertEquals(-4L, out.getValue().getValue().longValue());
+ tuples.put(out.getWindows().iterator().next(), out.getValue());
+
+ Iterator<Map.Entry<Window, KeyValPair<String, Long>>> iterator = tuples.entrySet().iterator();
+ Map.Entry<Window, KeyValPair<String, Long>> entry = iterator.next();
+ Assert.assertEquals(window2, entry.getKey());
+ Assert.assertEquals("a", entry.getValue().getKey());
+ Assert.assertEquals(-5L, entry.getValue().getValue().longValue());
+ entry = iterator.next();
+ Assert.assertEquals(window3, entry.getKey());
+ Assert.assertEquals("a", entry.getValue().getKey());
+ Assert.assertEquals(-4L, entry.getValue().getValue().longValue());
// normal trigger
out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(2);
@@ -421,32 +493,30 @@ public class WindowedOperatorTest
Assert.assertEquals(12L, out.getValue().getValue().longValue());
windowedOperator.endWindow();
+ windowedOperator.teardown();
}
@Test
public void testKeyedAccumulation()
{
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
- new Attribute.AttributeMap.DefaultAttributeMap());
- KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator();
+ KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(false);
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
- WindowedStorage.WindowedKeyedStorage<String, MutableLong> dataStorage = new InMemoryWindowedKeyedStorage<>();
- windowedOperator.setDataStorage(dataStorage);
- windowedOperator.setup(context);
+ windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L)));
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("a", 3L)));
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("b", 4L)));
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(150L, new KeyValPair<>("b", 5L)));
windowedOperator.endWindow();
- Assert.assertEquals(1, dataStorage.size());
- Assert.assertEquals(5L, dataStorage.get(new Window.TimeWindow(0, 1000), "a").longValue());
- Assert.assertEquals(9L, dataStorage.get(new Window.TimeWindow(0, 1000), "b").longValue());
+ Assert.assertEquals(1, keyedDataStorage.size());
+ Assert.assertEquals(5L, keyedDataStorage.get(new Window.TimeWindow(0, 1000), "a").longValue());
+ Assert.assertEquals(9L, keyedDataStorage.get(new Window.TimeWindow(0, 1000), "b").longValue());
+ windowedOperator.teardown();
}
private void testKeyedTrigger(TriggerOption.AccumulationMode accumulationMode)
{
- KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator();
+ KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(false);
TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000));
switch (accumulationMode) {
case ACCUMULATING:
@@ -465,9 +535,7 @@ public class WindowedOperatorTest
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new CollectorTestSink();
windowedOperator.output.setSink((Sink<Object>)(Sink)sink);
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
- new Attribute.AttributeMap.DefaultAttributeMap());
- windowedOperator.setup(context);
+ windowedOperator.setup(testMeta.operatorContext);
windowedOperator.beginWindow(1);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L)));
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("b", 3L)));
@@ -530,6 +598,7 @@ public class WindowedOperatorTest
default:
throw new RuntimeException("Unknown accumulation mode: " + accumulationMode);
}
+ windowedOperator.teardown();
}
@Test