You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/09/14 18:44:59 UTC
apex-malhar git commit: APEXMALHAR-2130 WindowStorage interface
changes in preparation of incorporating spillable data structures
Repository: apex-malhar
Updated Branches:
refs/heads/master b811e3356 -> 9f9da0ee1
APEXMALHAR-2130 WindowStorage interface changes in preparation of incorporating spillable data structures
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9f9da0ee
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9f9da0ee
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9f9da0ee
Branch: refs/heads/master
Commit: 9f9da0ee15e00b51d57725c75119c145697bfd64
Parents: b811e33
Author: David Yan <da...@datatorrent.com>
Authored: Sun Sep 11 00:30:20 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Wed Sep 14 11:24:56 2016 -0700
----------------------------------------------------------------------
.../stream/sample/complete/AutoComplete.java | 2 +-
.../sample/complete/TopWikipediaSessions.java | 14 +-
.../sample/complete/TwitterAutoComplete.java | 2 +-
.../sample/cookbook/MaxPerKeyExamples.java | 2 +-
.../apex/malhar/lib/window/Accumulation.java | 2 +-
.../lib/window/SessionWindowedStorage.java | 11 +-
.../apache/apex/malhar/lib/window/Tuple.java | 70 +++++++++-
.../apache/apex/malhar/lib/window/Window.java | 125 +++++++++++------
.../apex/malhar/lib/window/WindowOption.java | 4 +-
.../malhar/lib/window/WindowedKeyedStorage.java | 81 -----------
.../malhar/lib/window/WindowedOperator.java | 7 -
.../apex/malhar/lib/window/WindowedStorage.java | 98 +++++++++-----
.../window/impl/AbstractWindowedOperator.java | 135 ++++++++++++++-----
.../impl/InMemorySessionWindowedStorage.java | 87 ++++++++++++
.../impl/InMemoryWindowedKeyedStorage.java | 42 +-----
.../window/impl/InMemoryWindowedStorage.java | 22 ++-
.../window/impl/KeyedWindowedOperatorImpl.java | 14 +-
.../lib/window/impl/WindowedOperatorImpl.java | 2 +-
.../malhar/lib/window/WindowedOperatorTest.java | 44 +++---
.../apex/malhar/stream/api/util/TupleUtil.java | 13 +-
20 files changed, 481 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
index 6b208aa..2db59b6 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
@@ -208,7 +208,7 @@ public class AutoComplete implements StreamingApplication
public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
{
// TODO: Should be removed after Auto-wrapping is supported.
- return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple);
+ return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
}
});
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
index 5ac3e7f..68ec733 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -28,6 +28,7 @@ import org.joda.time.Duration;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.accumulation.TopN;
import org.apache.apex.malhar.stream.api.ApexStream;
@@ -269,7 +270,8 @@ public class TopWikipediaSessions implements StreamingApplication
@Override
public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
{
- return new TempWrapper(input.getValue(), input.getWindows().get(0).getBeginTimestamp());
+ Window window = input.getWindows().iterator().next();
+ return new TempWrapper(input.getValue(), window.getBeginTimestamp());
}
}, name("TempWrapper"))
@@ -290,14 +292,15 @@ public class TopWikipediaSessions implements StreamingApplication
@Override
public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
{
- return new Tuple.WindowedTuple<KeyValPair<String, Long>>(input.getWindows().get(0), new KeyValPair<String, Long>(
- input.getValue().getKey() + " : " + input.getWindows().get(0).getBeginTimestamp() + " : " + input.getWindows().get(0).getDurationMillis(),
+ Window window = input.getWindows().iterator().next();
+ return new Tuple.WindowedTuple<KeyValPair<String, Long>>(window, new KeyValPair<String, Long>(
+ input.getValue().getKey() + " : " + window.getBeginTimestamp() + " : " + window.getDurationMillis(),
input.getValue().getValue()));
}
}
/**
- * A flapmap function that turns the result into readable format.
+ * A flatmap function that turns the result into readable format.
*/
static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String>
{
@@ -308,7 +311,8 @@ public class TopWikipediaSessions implements StreamingApplication
for (TempWrapper item : input.getValue()) {
String session = item.getValue().getKey();
long count = item.getValue().getValue();
- result.add(session + " + " + count + " : " + input.getWindows().get(0).getBeginTimestamp());
+ Window window = input.getWindows().iterator().next();
+ result.add(session + " + " + count + " : " + window.getBeginTimestamp());
}
return result;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
index ffd2a03..4fc80ea 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
@@ -125,7 +125,7 @@ public class TwitterAutoComplete implements StreamingApplication
public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
{
// TODO: Should be removed after Auto-wrapping is supported.
- return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple);
+ return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
}
}, name("TopNByKey"));
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
index 4fafa5a..9fd9495 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
@@ -110,7 +110,7 @@ public class MaxPerKeyExamples implements StreamingApplication
@Override
public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input)
{
- return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GLOBAL_WINDOW, input);
+ return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GlobalWindow.INSTANCE, input);
}
}, name("MaxPerMonth"));
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
index 89215a1..03f7ff7 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java
@@ -51,7 +51,7 @@ public interface Accumulation<InputT, AccumT, OutputT>
AccumT accumulate(AccumT accumulatedValue, InputT input);
/**
- * Merges two accumulated value into one
+ * Merges two accumulated values into one
*
* @param accumulatedValue1
* @param accumulatedValue2
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
index 404e591..4cb2b1a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
@@ -32,9 +32,18 @@ import org.apache.hadoop.classification.InterfaceStability;
* @since 3.5.0
*/
@InterfaceStability.Evolving
-public interface SessionWindowedStorage<K, V> extends WindowedKeyedStorage<K, V>
+public interface SessionWindowedStorage<K, V> extends WindowedStorage.WindowedKeyedStorage<K, V>
{
/**
+ * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the
+ * data to toWindow, and overwrite any existing data in toWindow
+ *
+ * @param fromWindow
+ * @param toWindow
+ */
+ void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow);
+
+ /**
* Given the key, the timestamp and the gap, gets the data that falls into timestamp +/- gap.
* This is used for getting the entry the data given the timestamp belongs to, and for determining whether to merge
* session windows.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
index eaf4d29..aea6bf6 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Tuple.java
@@ -18,8 +18,9 @@
*/
package org.apache.apex.malhar.lib.window;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
+import java.util.Set;
+import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceStability;
@@ -72,6 +73,27 @@ public interface Tuple<T>
{
return value.toString();
}
+
+ @Override
+ public int hashCode()
+ {
+ return value.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj instanceof PlainTuple) {
+ PlainTuple<T> other = (PlainTuple<T>)obj;
+ if (this.value == null) {
+ return other.value == null;
+ } else {
+ return this.value.equals(other.value);
+ }
+ } else {
+ return false;
+ }
+ }
}
/**
@@ -103,6 +125,23 @@ public interface Tuple<T>
{
this.timestamp = timestamp;
}
+
+ @Override
+ public int hashCode()
+ {
+ return super.hashCode() ^ (int)(timestamp & 0xffffff);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj instanceof TimestampedTuple && super.equals(obj)) {
+ TimestampedTuple<T> other = (TimestampedTuple<T>)obj;
+ return (this.timestamp == other.timestamp);
+ } else {
+ return false;
+ }
+ }
}
/**
@@ -112,7 +151,7 @@ public interface Tuple<T>
*/
class WindowedTuple<T> extends TimestampedTuple<T>
{
- private List<Window> windows = new ArrayList<>();
+ private Set<Window> windows = new TreeSet<>();
public WindowedTuple()
{
@@ -124,7 +163,13 @@ public interface Tuple<T>
this.windows.add(window);
}
- public List<Window> getWindows()
+ public WindowedTuple(Collection<? extends Window> windows, long timestamp, T value)
+ {
+ super(timestamp, value);
+ this.windows.addAll(windows);
+ }
+
+ public Collection<Window> getWindows()
{
return windows;
}
@@ -133,6 +178,23 @@ public interface Tuple<T>
{
this.windows.add(window);
}
+
+ @Override
+ public int hashCode()
+ {
+ return super.hashCode() ^ windows.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj instanceof WindowedTuple && super.equals(obj)) {
+ WindowedTuple<T> other = (WindowedTuple<T>)obj;
+ return this.windows.equals(other.windows);
+ } else {
+ return false;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
index 32a028a..50d6445 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java
@@ -18,27 +18,33 @@
*/
package org.apache.apex.malhar.lib.window;
-import java.util.Comparator;
-
import org.apache.hadoop.classification.InterfaceStability;
/**
- * This interface describes the individual window.
+ * This interface describes the classes that represent individual windows.
+ *
+ * @param <WINDOW> window type the object of this class can call compareTo
*
* @since 3.5.0
*/
@InterfaceStability.Evolving
-public interface Window
+public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WINDOW>
{
long getBeginTimestamp();
long getDurationMillis();
/**
- * Global window means there is only one window, or no window depending on how you look at it.
+ * Global window means there is only one window that spans the entire life time of the application
*/
- class GlobalWindow implements Window
+ class GlobalWindow implements Window<GlobalWindow>
{
+
+ /**
+ * The singleton global window
+ */
+ public static final GlobalWindow INSTANCE = new GlobalWindow();
+
private GlobalWindow()
{
}
@@ -57,56 +63,37 @@ public interface Window
@Override
public boolean equals(Object other)
{
- return (other instanceof GlobalWindow);
+ return this == other;
}
- }
- class DefaultComparator implements Comparator<Window>
- {
- private DefaultComparator()
+ @Override
+ public int compareTo(GlobalWindow o)
{
+ return 0;
}
@Override
- public int compare(Window o1, Window o2)
+ public String toString()
{
- if (o1.getBeginTimestamp() < o2.getBeginTimestamp()) {
- return -1;
- } else if (o1.getBeginTimestamp() > o2.getBeginTimestamp()) {
- return 1;
- } else if (o1.getDurationMillis() < o2.getDurationMillis()) {
- return -1;
- } else if (o1.getDurationMillis() > o2.getDurationMillis()) {
- return 1;
- } else if (o1 instanceof SessionWindow && o2 instanceof SessionWindow) {
- return Long.compare(((SessionWindow)o1).getKey().hashCode(), ((SessionWindow)o2).getKey().hashCode());
- } else {
- return 0;
- }
+ return "[GlobalWindow]";
}
}
/**
- * The singleton global window
- */
- GlobalWindow GLOBAL_WINDOW = new GlobalWindow();
-
- /**
- * The singleton default comparator of windows
- */
- Comparator<Window> DEFAULT_COMPARATOR = new DefaultComparator();
-
- /**
* TimeWindow is a window that represents a time slice
+ *
+ * @param <WINDOW> window type the object of this class can call compareTo
*/
- class TimeWindow implements Window
+ class TimeWindow<WINDOW extends TimeWindow<WINDOW>> implements Window<WINDOW>
{
- protected long beginTimestamp;
- protected long durationMillis;
+ protected final long beginTimestamp;
+ protected final long durationMillis;
private TimeWindow()
{
// for kryo
+ this.beginTimestamp = -1;
+ this.durationMillis = 0;
}
public TimeWindow(long beginTimestamp, long durationMillis)
@@ -148,20 +135,49 @@ public interface Window
}
}
+ @Override
+ public int hashCode()
+ {
+ int result = (int)(beginTimestamp ^ (beginTimestamp >>> 32));
+ result = 31 * result + (int)(durationMillis ^ (durationMillis >>> 32));
+ return result;
+ }
+
+ @Override
+ public int compareTo(TimeWindow o)
+ {
+ if (this.getBeginTimestamp() < o.getBeginTimestamp()) {
+ return -1;
+ } else if (this.getBeginTimestamp() > o.getBeginTimestamp()) {
+ return 1;
+ } else if (this.getDurationMillis() < o.getDurationMillis()) {
+ return -1;
+ } else if (this.getDurationMillis() > o.getDurationMillis()) {
+ return 1;
+ }
+ return 0;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "[TimeWindow " + getBeginTimestamp() + "(" + getDurationMillis() + ")]";
+ }
}
/**
* SessionWindow is a window that represents a time slice for a key, with the time slice being variable length.
*
- * @param <K>
+ * @param <K> the key type for the session
*/
- class SessionWindow<K> extends TimeWindow
+ class SessionWindow<K> extends TimeWindow<SessionWindow<K>>
{
- private K key;
+ private final K key;
private SessionWindow()
{
// for kryo
+ this.key = null;
}
public SessionWindow(K key, long beginTimestamp, long duration)
@@ -192,5 +208,32 @@ public interface Window
return false;
}
}
+
+ @Override
+ public int hashCode()
+ {
+ return (key.hashCode() << 16) | super.hashCode();
+ }
+
+ @Override
+ public int compareTo(SessionWindow<K> o)
+ {
+ int val = super.compareTo(o);
+ if (val == 0) {
+ if (this.getKey() instanceof Comparable) {
+ return ((Comparable<K>)this.getKey()).compareTo(o.getKey());
+ } else {
+ return Long.compare(this.getKey().hashCode(), o.getKey().hashCode());
+ }
+ } else {
+ return val;
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "[SessionWindow key=" + getKey() + " " + getBeginTimestamp() + "(" + getDurationMillis() + ")]";
+ }
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
index de244fb..099709d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowOption.java
@@ -63,7 +63,7 @@ public interface WindowOption
/**
* Gets the duration of the time window
*
- * @return
+ * @return the duration of the time window
*/
public Duration getDuration()
{
@@ -74,7 +74,7 @@ public interface WindowOption
* The time window should be a sliding window with the given slide duration
*
* @param duration
- * @return
+ * @return the SlidingTimeWindows
*/
public SlidingTimeWindows slideBy(Duration duration)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java
deleted file mode 100644
index d59ee40..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedKeyedStorage.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window;
-
-import java.util.Map;
-
-import org.apache.apex.malhar.lib.state.spillable.Spillable;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * This interface is for a key/value store for storing data for windowed streams.
- * The key to this key/value store is a pair of (Window, K).
- * Also, this class may go away soon as there are plans to incorporate {@link Spillable} data structures
- * in the near future.
- *
- * Note that this interface expects that the implementation takes care of checkpoint recovery.
- *
- *
- * @since 3.5.0
- */
-@InterfaceStability.Unstable
-public interface WindowedKeyedStorage<K, V> extends WindowedStorage<Map<K, V>>
-{
- /**
- * Sets the data associated with the given window and the key
- *
- * @param window
- * @param key
- * @param value
- */
- void put(Window window, K key, V value);
-
- /**
- * Gets the key/value pairs associated with the given window
- *
- * @param window
- * @return
- */
- Iterable<Map.Entry<K, V>> entrySet(Window window);
-
- /**
- * Gets the data associated with the given window and the key
- *
- * @param window
- * @param key
- * @return
- */
- V get(Window window, K key);
-
- /**
- * Removes all the data associated with the given window
- *
- * @param window
- */
- void remove(Window window);
-
- /**
- * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the
- * data to toWindow, and overwrite any existing data in toWindow
- *
- * @param fromWindow
- * @param toWindow
- */
- void migrateWindow(Window fromWindow, Window toWindow);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
index 5da531c..ccc7ae1 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedOperator.java
@@ -61,13 +61,6 @@ public interface WindowedOperator<InputT>
void setAllowedLateness(Duration allowedLateness);
/**
- * This methods sets the storage for the meta data for each window
- *
- * @param storageAgent
- */
- void setWindowStateStorage(WindowedStorage<WindowState> storageAgent);
-
- /**
* This sets the function that extracts the timestamp from the input tuple
*
* @param timestampExtractor
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
index c2b3f08..42ecdae 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
@@ -20,23 +20,19 @@ package org.apache.apex.malhar.lib.window;
import java.util.Map;
-import org.apache.apex.malhar.lib.state.spillable.Spillable;
import org.apache.hadoop.classification.InterfaceStability;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+
/**
* WindowedStorage is a key-value store with the key being the window. The implementation of this interface should
* make sure checkpointing and recovery will be done correctly.
- * Note that this interface may go away soon as there are plans to incorporate {@link Spillable} data structures in the
- * near future.
- *
- * @param <T> The type of the data that is stored per window
- *
- * TODO: Look at the possibility of integrating spillable data structure: https://issues.apache.org/jira/browse/APEXMALHAR-2026
*
* @since 3.5.0
*/
@InterfaceStability.Unstable
-public interface WindowedStorage<T> extends Iterable<Map.Entry<Window, T>>
+public interface WindowedStorage extends Component<Context.OperatorContext>
{
/**
* Returns true if the storage contains this window
@@ -53,22 +49,6 @@ public interface WindowedStorage<T> extends Iterable<Map.Entry<Window, T>>
long size();
/**
- * Sets the data associated with the given window
- *
- * @param window
- * @param value
- */
- void put(Window window, T value);
-
- /**
- * Gets the value associated with the given window
- *
- * @param window
- * @return
- */
- T get(Window window);
-
- /**
* Removes all the data associated with the given window. This does NOT mean removing the window in checkpointed state
*
* @param window
@@ -76,18 +56,72 @@ public interface WindowedStorage<T> extends Iterable<Map.Entry<Window, T>>
void remove(Window window);
/**
- * Migrate the data from one window to another. This will invalidate fromWindow in the storage and move the
- * data to toWindow, and overwrite any existing data in toWindow
+ * This interface handles plain value per window. If there is a key/value map for each window, use
+ * {@link WindowedKeyedStorage}. Also note that a single T object is assumed to be fit in memory
*
- * @param fromWindow
- * @param toWindow
+ * Note that this interface expects that the implementation takes care of checkpoint recovery.
+ *
+ * @param <T> The type of the data that is stored per window
*/
- void migrateWindow(Window fromWindow, Window toWindow);
+ interface WindowedPlainStorage<T> extends WindowedStorage
+ {
+ /**
+ * Sets the data associated with the given window
+ *
+ * @param window
+ * @param value
+ */
+ void put(Window window, T value);
+
+ /**
+ * Gets the value associated with the given window
+ *
+ * @param window
+ * @return
+ */
+ T get(Window window);
+
+ /**
+ * Returns the iterable of the entries in the storage
+ *
+ * @return
+ */
+ Iterable<Map.Entry<Window, T>> entries();
+ }
/**
- * Returns the iterable of the entries in the storage
+ * This interface is for a store that maps Windows to maps of key value pairs.
+ *
+ * Note that this interface expects that the implementation takes care of checkpoint recovery.
*
- * @return
*/
- Iterable<Map.Entry<Window, T>> entrySet();
+ interface WindowedKeyedStorage<K, V> extends WindowedStorage
+ {
+ /**
+ * Sets the data associated with the given window and the key
+ *
+ * @param window
+ * @param key
+ * @param value
+ */
+ void put(Window window, K key, V value);
+
+ /**
+ * Gets an iterable object over the key/value pairs associated with the given window
+ *
+ * @param window
+ * @return
+ */
+ Iterable<Map.Entry<K, V>> entries(Window window);
+
+ /**
+ * Gets the data associated with the given window and the key
+ *
+ * @param window
+ * @param key
+ * @return
+ */
+ V get(Window window, K key);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index 7abe9b6..f90d47d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -19,6 +19,9 @@
package org.apache.apex.malhar.lib.window.impl;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -29,6 +32,7 @@ import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.spillable.WindowListener;
import org.apache.apex.malhar.lib.window.Accumulation;
import org.apache.apex.malhar.lib.window.ControlTuple;
import org.apache.apex.malhar.lib.window.TriggerOption;
@@ -42,9 +46,11 @@ import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Function;
+import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
@@ -63,13 +69,13 @@ import com.datatorrent.common.util.BaseOperator;
*/
@InterfaceStability.Evolving
public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT extends WindowedStorage, RetractionStorageT extends WindowedStorage, AccumulationT extends Accumulation>
- extends BaseOperator implements WindowedOperator<InputT>
+ extends BaseOperator implements WindowedOperator<InputT>, Operator.CheckpointNotificationListener
{
protected WindowOption windowOption;
protected TriggerOption triggerOption;
protected long allowedLatenessMillis = -1;
- protected WindowedStorage<WindowState> windowStateMap;
+ protected WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap;
private Function<InputT, Long> timestampExtractor;
@@ -81,12 +87,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
private long lateTriggerCount;
private long lateTriggerMillis;
private long currentDerivedTimestamp = -1;
- private long windowWidthMillis;
+ private long timeIncrement;
private long fixedWatermarkMillis = -1;
+
+ private Map<String, Component<Context.OperatorContext>> components = new HashMap<>();
+
protected DataStorageT dataStorage;
protected RetractionStorageT retractionStorage;
protected AccumulationT accumulation;
+ private static final transient Collection<? extends Window> GLOBAL_WINDOW_SINGLETON_SET = Collections.singleton(Window.GlobalWindow.INSTANCE);
private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class);
public final transient DefaultInputPort<Tuple<InputT>> input = new DefaultInputPort<Tuple<InputT>>()
@@ -156,9 +166,6 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
public void setWindowOption(WindowOption windowOption)
{
this.windowOption = windowOption;
- if (this.windowOption instanceof WindowOption.GlobalWindow) {
- windowStateMap.put(Window.GLOBAL_WINDOW, new WindowState());
- }
}
@Override
@@ -216,6 +223,11 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
this.retractionStorage = storageAgent;
}
+ public void addComponent(String key, Component<Context.OperatorContext> component)
+ {
+ components.put(key, component);
+ }
+
/**
* Sets the accumulation, which basically tells the WindowedOperator what to do if a new tuple comes in and what
* to put in the pane when a trigger is fired
@@ -227,8 +239,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
this.accumulation = accumulation;
}
- @Override
- public void setWindowStateStorage(WindowedStorage<WindowState> storageAgent)
+ public void setWindowStateStorage(WindowedStorage.WindowedPlainStorage<WindowState> storageAgent)
{
this.windowStateMap = storageAgent;
}
@@ -284,12 +295,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
if (windowOption == null && input instanceof Tuple.WindowedTuple) {
// inherit the windows from upstream
return (Tuple.WindowedTuple<InputT>)input;
+ } else {
+ return new Tuple.WindowedTuple<>(assignWindows(input), extractTimestamp(input), input.getValue());
}
- Tuple.WindowedTuple<InputT> windowedTuple = new Tuple.WindowedTuple<>();
- windowedTuple.setValue(input.getValue());
- windowedTuple.setTimestamp(extractTimestamp(input));
- assignWindows(windowedTuple.getWindows(), input);
- return windowedTuple;
}
private long extractTimestamp(Tuple<InputT> tuple)
@@ -305,29 +313,31 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
}
}
- private void assignWindows(List<Window> windows, Tuple<InputT> inputTuple)
+ private Collection<? extends Window> assignWindows(Tuple<InputT> inputTuple)
{
if (windowOption instanceof WindowOption.GlobalWindow) {
- windows.add(Window.GLOBAL_WINDOW);
+ return GLOBAL_WINDOW_SINGLETON_SET;
} else {
long timestamp = extractTimestamp(inputTuple);
if (windowOption instanceof WindowOption.TimeWindows) {
-
- for (Window.TimeWindow window : getTimeWindowsForTimestamp(timestamp)) {
+ Collection<? extends Window> windows = getTimeWindowsForTimestamp(timestamp);
+ for (Window window : windows) {
if (!windowStateMap.containsWindow(window)) {
windowStateMap.put(window, new WindowState());
}
- windows.add(window);
}
+ return windows;
} else if (windowOption instanceof WindowOption.SessionWindows) {
- assignSessionWindows(windows, timestamp, inputTuple);
+ return assignSessionWindows(timestamp, inputTuple);
+ } else {
+ throw new IllegalStateException("Unsupported Window Option: " + windowOption.getClass());
}
}
}
- protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<InputT> inputTuple)
+ protected Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<InputT> inputTuple)
{
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Session window require keyed tuples");
}
/**
@@ -338,7 +348,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
* @param timestamp
* @return
*/
- private List<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp)
+ private Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp)
{
List<Window.TimeWindow> windows = new ArrayList<>();
if (windowOption instanceof WindowOption.TimeWindows) {
@@ -348,8 +358,6 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
if (windowOption instanceof WindowOption.SlidingTimeWindows) {
long slideBy = ((WindowOption.SlidingTimeWindows)windowOption).getSlideByDuration().getMillis();
// add the sliding windows front and back
- // Note: this messes up the order of the window and we might want to revisit this if the order of the windows
- // matter
for (long slideBeginTimestamp = beginTimestamp - slideBy;
slideBeginTimestamp <= timestamp && timestamp < slideBeginTimestamp + durationMillis;
slideBeginTimestamp -= slideBy) {
@@ -389,8 +397,32 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
@Override
public void setup(Context.OperatorContext context)
{
- this.windowWidthMillis = context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+ this.timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
validate();
+ windowStateMap.setup(context);
+ dataStorage.setup(context);
+ if (retractionStorage != null) {
+ retractionStorage.setup(context);
+ }
+ for (Component component : components.values()) {
+ component.setup(context);
+ }
+ if (this.windowOption instanceof WindowOption.GlobalWindow) {
+ windowStateMap.put(Window.GlobalWindow.INSTANCE, new WindowState());
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ windowStateMap.teardown();
+ dataStorage.teardown();
+ if (retractionStorage != null) {
+ retractionStorage.teardown();
+ }
+ for (Component component : components.values()) {
+ component.teardown();
+ }
}
/**
@@ -399,10 +431,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
@Override
public void beginWindow(long windowId)
{
+ for (Component component : components.values()) {
+ if (component instanceof WindowListener) {
+ ((WindowListener)component).beginWindow(windowId);
+ }
+ }
if (currentDerivedTimestamp == -1) {
- currentDerivedTimestamp = ((windowId >> 32) * 1000) + (windowId & 0xffffffffL);
+ // TODO: once we are able to get the firstWindowMillis from Apex Core API, we should use that instead
+ currentDerivedTimestamp = (windowId >> 32) * 1000;
} else {
- currentDerivedTimestamp += windowWidthMillis;
+ currentDerivedTimestamp += timeIncrement;
}
watermarkTimestamp = -1;
}
@@ -417,6 +455,12 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
// TODO: May want to revisit this if the application cares more about latency than idempotency
processWatermarkAtEndWindow();
fireTimeTriggers();
+
+ for (Component component : components.values()) {
+ if (component instanceof WindowListener) {
+ ((WindowListener)component).endWindow();
+ }
+ }
}
private void processWatermarkAtEndWindow()
@@ -429,7 +473,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
long horizon = watermarkTimestamp - allowedLatenessMillis;
- for (Iterator<Map.Entry<Window, WindowState>> it = windowStateMap.iterator(); it.hasNext(); ) {
+ for (Iterator<Map.Entry<Window, WindowState>> it = windowStateMap.entries().iterator(); it.hasNext(); ) {
Map.Entry<Window, WindowState> entry = it.next();
Window window = entry.getKey();
WindowState windowState = entry.getValue();
@@ -446,7 +490,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
// discard this window because it's too late now
it.remove();
dataStorage.remove(window);
- retractionStorage.remove(window);
+ if (retractionStorage != null) {
+ retractionStorage.remove(window);
+ }
}
}
}
@@ -457,7 +503,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
private void fireTimeTriggers()
{
if (earlyTriggerMillis > 0 || lateTriggerMillis > 0) {
- for (Map.Entry<Window, WindowState> entry : windowStateMap.entrySet()) {
+ for (Map.Entry<Window, WindowState> entry : windowStateMap.entries()) {
Window window = entry.getKey();
WindowState windowState = entry.getValue();
if (windowState.watermarkArrivalTime == -1) {
@@ -511,4 +557,33 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
dataStorage.remove(window);
}
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ for (Component component : components.values()) {
+ if (component instanceof CheckpointNotificationListener) {
+ ((CheckpointNotificationListener)component).beforeCheckpoint(windowId);
+ }
+ }
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ for (Component component : components.values()) {
+ if (component instanceof CheckpointNotificationListener) {
+ ((CheckpointNotificationListener)component).checkpointed(windowId);
+ }
+ }
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ for (Component component : components.values()) {
+ if (component instanceof CheckpointNotificationListener) {
+ ((CheckpointNotificationListener)component).committed(windowId);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
new file mode 100644
index 0000000..fdceb4d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is the in-memory implementation of {@link WindowedKeyedStorage}. Do not use this class if you have a large state that
+ * can't be fit in memory.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Unstable
+public class InMemorySessionWindowedStorage<K, V> extends InMemoryWindowedKeyedStorage<K, V>
+ implements SessionWindowedStorage<K, V>
+{
+ private Map<K, TreeSet<Window.SessionWindow<K>>> keyToWindows = new HashMap<>();
+
+ @Override
+ public void put(Window window, K key, V value)
+ {
+ super.put(window, key, value);
+ TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key);
+ if (sessionWindows == null) {
+ sessionWindows = new TreeSet<>();
+ keyToWindows.put(key, sessionWindows);
+ }
+ sessionWindows.add((Window.SessionWindow<K>)window);
+ }
+
+ @Override
+ public void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow)
+ {
+ if (containsWindow(fromWindow)) {
+ map.put(toWindow, map.remove(fromWindow));
+ }
+ }
+
+ @Override
+ public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap)
+ {
+ List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>();
+ TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key);
+ if (sessionWindows != null) {
+ Window.SessionWindow<K> refWindow = new Window.SessionWindow<>(key, timestamp, 1);
+ Window.SessionWindow<K> floor = sessionWindows.floor(refWindow);
+ if (floor != null) {
+ if (floor.getBeginTimestamp() + floor.getDurationMillis() + gap > timestamp) {
+ results.add(new AbstractMap.SimpleEntry<>(floor, map.get(floor).get(key)));
+ }
+ }
+ Window.SessionWindow<K> higher = sessionWindows.higher(refWindow);
+ if (higher != null) {
+ if (higher.getBeginTimestamp() - gap <= timestamp) {
+ results.add(new AbstractMap.SimpleEntry<>(higher, map.get(higher).get(key)));
+ }
+ }
+ }
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java
index 7dbfbb1..4b47edc 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedKeyedStorage.java
@@ -18,31 +18,23 @@
*/
package org.apache.apex.malhar.lib.window.impl;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
-import org.apache.apex.malhar.lib.state.spillable.Spillable;
-import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
import org.apache.apex.malhar.lib.window.Window;
-import org.apache.apex.malhar.lib.window.WindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This is the in-memory implementation of {@link WindowedKeyedStorage}. Do not use this class if you have a large state that
- * can't be fit in memory. Also, this class may go away soon as there are plans to incorporate {@link Spillable} data structures
- * in the near future.
+ * can't be fit in memory.
*
* @since 3.5.0
*/
@InterfaceStability.Unstable
public class InMemoryWindowedKeyedStorage<K, V> extends InMemoryWindowedStorage<Map<K, V>>
- implements WindowedKeyedStorage<K, V>, SessionWindowedStorage<K, V>
+ implements WindowedStorage.WindowedKeyedStorage<K, V>
{
@Override
public void put(Window window, K key, V value)
@@ -51,14 +43,13 @@ public class InMemoryWindowedKeyedStorage<K, V> extends InMemoryWindowedStorage<
if (map.containsKey(window)) {
kvMap = map.get(window);
} else {
- kvMap = new HashMap<K, V>();
+ kvMap = new HashMap<>();
map.put(window, kvMap);
}
kvMap.put(key, value);
}
- @Override
- public Set<Map.Entry<K, V>> entrySet(Window window)
+ public Iterable<Map.Entry<K, V>> entries(Window window)
{
if (map.containsKey(window)) {
return map.get(window).entrySet();
@@ -77,27 +68,4 @@ public class InMemoryWindowedKeyedStorage<K, V> extends InMemoryWindowedStorage<
}
}
- @Override
- public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap)
- {
- List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>();
- // TODO: this is inefficient, but this is usually not used in a real use case since it's in memory
- for (Map.Entry<Window, Map<K, V>> entry : map.entrySet()) {
- Window.SessionWindow<K> window = (Window.SessionWindow<K>)entry.getKey();
- if (key.equals(window.getKey())) {
- if (timestamp > window.getBeginTimestamp()) {
- if (window.getBeginTimestamp() + window.getDurationMillis() + gap > timestamp) {
- results.add(new AbstractMap.SimpleEntry<>(window, entry.getValue().get(key)));
- }
- } else if (timestamp < window.getBeginTimestamp()) {
- if (window.getBeginTimestamp() - gap <= timestamp) {
- results.add(new AbstractMap.SimpleEntry<>(window, entry.getValue().get(key)));
- }
- } else {
- results.add(new AbstractMap.SimpleEntry<>(window, entry.getValue().get(key)));
- }
- }
- }
- return results;
- }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
index f6de894..db18a40 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java
@@ -18,7 +18,6 @@
*/
package org.apache.apex.malhar.lib.window.impl;
-import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
@@ -27,17 +26,19 @@ import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowedStorage;
import org.apache.hadoop.classification.InterfaceStability;
+import com.datatorrent.api.Context;
+
/**
- * This is the in-memory implementation of {@link WindowedStorage}. Do not use this class if you have a large state that
+ * This is the in-memory implementation of {@link WindowedPlainStorage}. Do not use this class if you have a large state that
* can't be fit in memory. Also, this class may go away soon as there are plans to incorporate {@link Spillable} data
* structures in the near future.
*
* @since 3.5.0
*/
@InterfaceStability.Unstable
-public class InMemoryWindowedStorage<T> implements WindowedStorage<T>
+public class InMemoryWindowedStorage<T> implements WindowedStorage.WindowedPlainStorage<T>
{
- protected final TreeMap<Window, T> map = new TreeMap<>(Window.DEFAULT_COMPARATOR);
+ protected final TreeMap<Window, T> map = new TreeMap<>();
@Override
public long size()
@@ -70,22 +71,19 @@ public class InMemoryWindowedStorage<T> implements WindowedStorage<T>
}
@Override
- public void migrateWindow(Window fromWindow, Window toWindow)
+ public Iterable<Map.Entry<Window, T>> entries()
{
- if (containsWindow(fromWindow)) {
- map.put(toWindow, map.remove(fromWindow));
- }
+ return map.entrySet();
}
@Override
- public Iterable<Map.Entry<Window, T>> entrySet()
+ public void setup(Context.OperatorContext context)
{
- return map.entrySet();
}
@Override
- public Iterator<Map.Entry<Window, T>> iterator()
+ public void teardown()
{
- return map.entrySet().iterator();
}
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index 7077c96..a38207a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -19,8 +19,8 @@
package org.apache.apex.malhar.lib.window.impl;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import org.apache.apex.malhar.lib.window.Accumulation;
@@ -30,7 +30,7 @@ import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.WindowState;
-import org.apache.apex.malhar.lib.window.WindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
import org.apache.hadoop.classification.InterfaceStability;
import com.datatorrent.lib.util.KeyValPair;
@@ -48,11 +48,11 @@ import com.datatorrent.lib.util.KeyValPair;
*/
@InterfaceStability.Evolving
public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
- extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedKeyedStorage<KeyT, AccumT>, WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<InputValT, AccumT, OutputValT>>
+ extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<InputValT, AccumT, OutputValT>>
{
@Override
- protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<KeyValPair<KeyT, InputValT>> inputTuple)
+ protected Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<KeyValPair<KeyT, InputValT>> inputTuple)
{
KeyT key = inputTuple.getValue().getKey();
WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows)windowOption;
@@ -126,7 +126,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
default:
throw new IllegalStateException("There are more than two sessions matching one timestamp");
}
- windows.add(sessionWindowToAssign);
+ return Collections.<Window.SessionWindow>singleton(sessionWindowToAssign);
}
@Override
@@ -147,7 +147,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
@Override
public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
{
- for (Map.Entry<KeyT, AccumT> entry : dataStorage.entrySet(window)) {
+ for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) {
OutputValT outputVal = accumulation.getOutput(entry.getValue());
if (fireOnlyUpdatedPanes) {
OutputValT oldValue = retractionStorage.get(window, entry.getKey());
@@ -168,7 +168,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
throw new UnsupportedOperationException();
}
- for (Map.Entry<KeyT, OutputValT> entry : retractionStorage.entrySet(window)) {
+ for (Map.Entry<KeyT, OutputValT> entry : retractionStorage.entries(window)) {
output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue()))));
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
index d195004..7275d88 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability;
*/
@InterfaceStability.Evolving
public class WindowedOperatorImpl<InputT, AccumT, OutputT>
- extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage<AccumT>, WindowedStorage<OutputT>, Accumulation<InputT, AccumT, OutputT>>
+ extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, Accumulation<InputT, AccumT, OutputT>>
{
@Override
public void accumulateTuple(Tuple.WindowedTuple<InputT> tuple)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index f8f9d8a..7396994 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -18,9 +18,8 @@
*/
package org.apache.apex.malhar.lib.window;
-import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import javax.validation.ValidationException;
@@ -29,6 +28,7 @@ import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.apex.malhar.lib.window.impl.InMemorySessionWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
@@ -112,8 +112,8 @@ public class WindowedOperatorTest
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
windowedOperator.setAllowedLateness(Duration.millis(1000));
- WindowedStorage<MutableLong> dataStorage = new InMemoryWindowedStorage<>();
- WindowedStorage<WindowState> windowStateStorage = new InMemoryWindowedStorage<>();
+ WindowedStorage.WindowedPlainStorage<MutableLong> dataStorage = new InMemoryWindowedStorage<>();
+ WindowedStorage.WindowedPlainStorage<WindowState> windowStateStorage = new InMemoryWindowedStorage<>();
windowedOperator.setDataStorage(dataStorage);
windowedOperator.setWindowStateStorage(windowStateStorage);
@@ -124,7 +124,7 @@ public class WindowedOperatorTest
Assert.assertEquals("There should be exactly one window in the storage", 1, dataStorage.size());
Assert.assertEquals("There should be exactly one window in the storage", 1, windowStateStorage.size());
- Map.Entry<Window, WindowState> entry = windowStateStorage.entrySet().iterator().next();
+ Map.Entry<Window, WindowState> entry = windowStateStorage.entries().iterator().next();
Window window = entry.getKey();
WindowState windowState = entry.getValue();
Assert.assertEquals(-1, windowState.watermarkArrivalTime);
@@ -135,7 +135,7 @@ public class WindowedOperatorTest
windowedOperator.processWatermark(new WatermarkImpl(1200));
windowedOperator.endWindow();
- Assert.assertTrue(windowState.watermarkArrivalTime > 0);
+ Assert.assertTrue(windowState.watermarkArrivalTime >= 0);
Assert.assertEquals("We should get one watermark tuple", 1, controlSink.getCount(false));
windowedOperator.beginWindow(2);
@@ -285,9 +285,9 @@ public class WindowedOperatorTest
windowedOperator.setWindowOption(new WindowOption.GlobalWindow());
windowedOperator.setup(context);
Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
- List<Window> windows = windowedValue.getWindows();
+ Collection<? extends Window> windows = windowedValue.getWindows();
Assert.assertEquals(1, windows.size());
- Assert.assertEquals(Window.GLOBAL_WINDOW, windows.get(0));
+ Assert.assertEquals(Window.GlobalWindow.INSTANCE, windows.iterator().next());
}
@Test
@@ -299,10 +299,11 @@ public class WindowedOperatorTest
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
windowedOperator.setup(context);
Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
- List<Window> windows = windowedValue.getWindows();
+ Collection<? extends Window> windows = windowedValue.getWindows();
Assert.assertEquals(1, windows.size());
- Assert.assertEquals(1000, windows.get(0).getBeginTimestamp());
- Assert.assertEquals(1000, windows.get(0).getDurationMillis());
+ Window window = windows.iterator().next();
+ Assert.assertEquals(1000, window.getBeginTimestamp());
+ Assert.assertEquals(1000, window.getDurationMillis());
}
@Test
@@ -314,9 +315,8 @@ public class WindowedOperatorTest
windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200)));
windowedOperator.setup(context);
Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1600L, 2L));
- List<Window> windows = windowedValue.getWindows();
+ Collection<? extends Window> windows = windowedValue.getWindows();
Window[] winArray = windows.toArray(new Window[]{});
- Arrays.sort(winArray, Window.DEFAULT_COMPARATOR);
Assert.assertEquals(5, winArray.length);
Assert.assertEquals(800, winArray[0].getBeginTimestamp());
Assert.assertEquals(1000, winArray[0].getDurationMillis());
@@ -336,6 +336,8 @@ public class WindowedOperatorTest
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
new Attribute.AttributeMap.DefaultAttributeMap());
KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator();
+ windowedOperator.setDataStorage(new InMemorySessionWindowedStorage<String, MutableLong>());
+ windowedOperator.setRetractionStorage(new InMemorySessionWindowedStorage<String, Long>());
windowedOperator.setWindowOption(new WindowOption.SessionWindows(Duration.millis(2000)));
windowedOperator.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1).accumulatingAndRetractingFiredPanes().firingOnlyUpdatedPanes());
CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new CollectorTestSink();
@@ -348,7 +350,7 @@ public class WindowedOperatorTest
Assert.assertEquals(1, sink.getCount(false));
Tuple.WindowedTuple<KeyValPair<String, Long>> out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
Assert.assertEquals(1, out.getWindows().size());
- Window.SessionWindow<String> window1 = (Window.SessionWindow<String>)out.getWindows().get(0);
+ Window.SessionWindow<String> window1 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
Assert.assertEquals(1100L, window1.getBeginTimestamp());
Assert.assertEquals(1, window1.getDurationMillis());
Assert.assertEquals("a", window1.getKey());
@@ -364,13 +366,13 @@ public class WindowedOperatorTest
// retraction trigger
out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
Assert.assertEquals(1, out.getWindows().size());
- Assert.assertEquals(window1, out.getWindows().get(0));
+ Assert.assertEquals(window1, out.getWindows().iterator().next());
Assert.assertEquals("a", out.getValue().getKey());
Assert.assertEquals(-2L, out.getValue().getValue().longValue());
// normal trigger
out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1);
- Window.SessionWindow<String> window2 = (Window.SessionWindow<String>)out.getWindows().get(0);
+ Window.SessionWindow<String> window2 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
Assert.assertEquals(1100L, window2.getBeginTimestamp());
Assert.assertEquals(901, window2.getDurationMillis());
@@ -384,7 +386,7 @@ public class WindowedOperatorTest
Assert.assertEquals(1, sink.getCount(false));
out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
Assert.assertEquals(1, out.getWindows().size());
- Window.SessionWindow<String> window3 = (Window.SessionWindow<String>)out.getWindows().get(0);
+ Window.SessionWindow<String> window3 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
Assert.assertEquals(5000L, window3.getBeginTimestamp());
Assert.assertEquals(1, window3.getDurationMillis());
Assert.assertEquals("a", out.getValue().getKey());
@@ -400,19 +402,19 @@ public class WindowedOperatorTest
// retraction of the two old windows
out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
Assert.assertEquals(1, out.getWindows().size());
- Assert.assertEquals(window2, out.getWindows().get(0));
+ Assert.assertEquals(window2, out.getWindows().iterator().next());
Assert.assertEquals("a", out.getValue().getKey());
Assert.assertEquals(-5L, out.getValue().getValue().longValue());
out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1);
Assert.assertEquals(1, out.getWindows().size());
- Assert.assertEquals(window3, out.getWindows().get(0));
+ Assert.assertEquals(window3, out.getWindows().iterator().next());
Assert.assertEquals("a", out.getValue().getKey());
Assert.assertEquals(-4L, out.getValue().getValue().longValue());
// normal trigger
out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(2);
Assert.assertEquals(1, out.getWindows().size());
- Window.SessionWindow<String> window4 = (Window.SessionWindow<String>)out.getWindows().get(0);
+ Window.SessionWindow<String> window4 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
Assert.assertEquals(1100L, window4.getBeginTimestamp());
Assert.assertEquals(3901, window4.getDurationMillis());
Assert.assertEquals("a", out.getValue().getKey());
@@ -428,7 +430,7 @@ public class WindowedOperatorTest
new Attribute.AttributeMap.DefaultAttributeMap());
KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator();
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
- WindowedKeyedStorage<String, MutableLong> dataStorage = new InMemoryWindowedKeyedStorage<>();
+ WindowedStorage.WindowedKeyedStorage<String, MutableLong> dataStorage = new InMemoryWindowedKeyedStorage<>();
windowedOperator.setDataStorage(dataStorage);
windowedOperator.setup(context);
windowedOperator.beginWindow(1);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f9da0ee/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
index 04f42b3..f9a4ed8 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
@@ -18,10 +18,7 @@
*/
package org.apache.apex.malhar.stream.api.util;
-import java.util.List;
-
import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.Window;
/**
* The tuple util will be used to extract fields that are used as key or value<br>
@@ -38,14 +35,8 @@ public class TupleUtil
{
if (t instanceof Tuple.WindowedTuple) {
- Tuple.WindowedTuple<O> newT = new Tuple.WindowedTuple<>();
- List<Window> wins = ((Tuple.WindowedTuple)t).getWindows();
- for (Window w : wins) {
- newT.addWindow(w);
- }
- newT.setValue(newValue);
- ((Tuple.WindowedTuple)t).setTimestamp(((Tuple.WindowedTuple)t).getTimestamp());
- return newT;
+ Tuple.WindowedTuple windowedTuple = (Tuple.WindowedTuple)t;
+ return new Tuple.WindowedTuple<>(windowedTuple.getWindows(), windowedTuple.getTimestamp(), newValue);
} else if (t instanceof Tuple.TimestampedTuple) {
return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)t).getTimestamp(), newValue);
} else {