You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/10/19 19:41:49 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2307 #resolve delete the
windows properly after merge or extend
Repository: apex-malhar
Updated Branches:
refs/heads/master 2f308aa21 -> 2cf8bade8
APEXMALHAR-2307 #resolve delete the windows properly after merge or extend
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2cf8bade
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2cf8bade
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2cf8bade
Branch: refs/heads/master
Commit: 2cf8bade8f2b4505a915ff3cfca79424f5ec2df1
Parents: 5131bee
Author: David Yan <da...@datatorrent.com>
Authored: Tue Oct 18 18:18:58 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Wed Oct 19 12:36:20 2016 -0700
----------------------------------------------------------------------
.../impl/InMemorySessionWindowedStorage.java | 56 ++++++++++++++------
.../impl/SpillableSessionWindowedStorage.java | 27 +++++-----
2 files changed, 51 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2cf8bade/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
index 0247cbc..b696937 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
@@ -24,12 +24,18 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.hadoop.classification.InterfaceStability;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SortedSetMultimap;
+
/**
* This is the in-memory implementation of {@link WindowedKeyedStorage}. Do not use this class if you have a large state that
* can't be fit in memory.
@@ -40,7 +46,14 @@ import org.apache.hadoop.classification.InterfaceStability;
public class InMemorySessionWindowedStorage<K, V> extends InMemoryWindowedKeyedStorage<K, V>
implements SessionWindowedStorage<K, V>
{
- private Map<K, TreeSet<Window.SessionWindow<K>>> keyToWindows = new HashMap<>();
+ private SortedSetMultimap<K, Window.SessionWindow<K>> keyToWindows = Multimaps.newSortedSetMultimap(new HashMap<K, Collection<Window.SessionWindow<K>>>(), new Supplier<SortedSet<Window.SessionWindow<K>>>()
+ {
+ @Override
+ public SortedSet<Window.SessionWindow<K>> get()
+ {
+ return new TreeSet<>();
+ }
+ });
@Override
public void put(Window window, K key, V value)
@@ -48,37 +61,46 @@ public class InMemorySessionWindowedStorage<K, V> extends InMemoryWindowedKeyedS
@SuppressWarnings("unchecked")
Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window;
super.put(window, key, value);
- TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key);
- if (sessionWindows == null) {
- sessionWindows = new TreeSet<>();
- keyToWindows.put(key, sessionWindows);
- }
- sessionWindows.add(sessionWindow);
+ keyToWindows.put(key, sessionWindow);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void remove(Window window)
+ {
+ super.remove(window);
+ Window.SessionWindow<K> sessionWindow = (Window.SessionWindow<K>)window;
+ keyToWindows.remove(sessionWindow.getKey(), sessionWindow);
}
@Override
public void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow)
{
- if (containsWindow(fromWindow)) {
- map.put(toWindow, map.remove(fromWindow));
+ if (!containsWindow(fromWindow)) {
+ throw new NoSuchElementException();
}
+ map.put(toWindow, map.remove(fromWindow));
+ keyToWindows.remove(fromWindow.getKey(), fromWindow);
+ keyToWindows.put(toWindow.getKey(), toWindow);
}
@Override
public Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap)
{
List<Map.Entry<Window.SessionWindow<K>, V>> results = new ArrayList<>();
- TreeSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key);
+ SortedSet<Window.SessionWindow<K>> sessionWindows = keyToWindows.get(key);
if (sessionWindows != null) {
- Window.SessionWindow<K> refWindow = new Window.SessionWindow<>(key, timestamp, 1);
- Window.SessionWindow<K> floor = sessionWindows.floor(refWindow);
- if (floor != null) {
- if (floor.getBeginTimestamp() + floor.getDurationMillis() > timestamp) {
- results.add(new AbstractMap.SimpleEntry<>(floor, map.get(floor).get(key)));
+ Window.SessionWindow<K> refWindow = new Window.SessionWindow<>(key, timestamp, gap);
+ SortedSet<Window.SessionWindow<K>> headSet = sessionWindows.headSet(refWindow);
+ if (!headSet.isEmpty()) {
+ Window.SessionWindow<K> lower = headSet.last();
+ if (lower.getBeginTimestamp() + lower.getDurationMillis() > timestamp) {
+ results.add(new AbstractMap.SimpleEntry<>(lower, map.get(lower).get(key)));
}
}
- Window.SessionWindow<K> higher = sessionWindows.higher(refWindow);
- if (higher != null) {
+ SortedSet<Window.SessionWindow<K>> tailSet = sessionWindows.tailSet(refWindow);
+ if (!tailSet.isEmpty()) {
+ Window.SessionWindow<K> higher = tailSet.first();
if (higher.getBeginTimestamp() - gap <= timestamp) {
results.add(new AbstractMap.SimpleEntry<>(higher, map.get(higher).get(key)));
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2cf8bade/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
index 5d39930..da44fb1 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.apex.malhar.lib.state.spillable.Spillable;
@@ -79,23 +80,19 @@ public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeye
@Override
public void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow)
{
- Set<K> keys = windowToKeysMap.get(fromWindow);
- if (keys == null) {
- return;
- }
- windowKeyToValueMap.remove(toWindow);
- for (K key : keys) {
- windowToKeysMap.put(toWindow, key);
- ImmutablePair<Window, K> oldKey = new ImmutablePair<Window, K>(fromWindow, key);
- ImmutablePair<Window, K> newKey = new ImmutablePair<Window, K>(toWindow, key);
-
- V value = windowKeyToValueMap.get(oldKey);
- windowKeyToValueMap.remove(oldKey);
- windowKeyToValueMap.put(newKey, value);
- keyToWindowsMap.remove(key, fromWindow);
- keyToWindowsMap.put(key, toWindow);
+ K key = fromWindow.getKey();
+ ImmutablePair<Window, K> oldKey = new ImmutablePair<Window, K>(fromWindow, key);
+ ImmutablePair<Window, K> newKey = new ImmutablePair<Window, K>(toWindow, key);
+ V value = windowKeyToValueMap.get(oldKey);
+ if (value == null) {
+ throw new NoSuchElementException();
}
+ windowKeyToValueMap.remove(oldKey);
+ windowKeyToValueMap.put(newKey, value);
+ keyToWindowsMap.remove(key, fromWindow);
+ keyToWindowsMap.put(key, toWindow);
windowToKeysMap.removeAll(fromWindow);
+ windowToKeysMap.put(toWindow, key);
}
@Override
[2/2] apex-malhar git commit: APEXMALHAR-2305 #resolve Mirror the
proto-session window behavior described in the streaming 102 blog
Posted by hs...@apache.org.
APEXMALHAR-2305 #resolve Mirror the proto-session window behavior described in the streaming 102 blog
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5131bee0
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5131bee0
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5131bee0
Branch: refs/heads/master
Commit: 5131bee0ed0c519294d87f2c460556d0a63f284e
Parents: 2f308aa
Author: David Yan <da...@datatorrent.com>
Authored: Tue Oct 18 15:41:28 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Wed Oct 19 12:36:20 2016 -0700
----------------------------------------------------------------------
.../apex/malhar/lib/window/SessionWindowedStorage.java | 6 +++---
.../lib/window/impl/InMemorySessionWindowedStorage.java | 2 +-
.../lib/window/impl/KeyedWindowedOperatorImpl.java | 11 ++++++-----
.../lib/window/impl/SpillableSessionWindowedStorage.java | 2 +-
.../apex/malhar/lib/window/WindowedOperatorTest.java | 8 ++++----
5 files changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
index 3e25d15..b2accbb 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/SessionWindowedStorage.java
@@ -44,10 +44,10 @@ public interface SessionWindowedStorage<K, V> extends WindowedStorage.WindowedKe
void migrateWindow(Window.SessionWindow<K> fromWindow, Window.SessionWindow<K> toWindow);
/**
- * Given the key, the timestamp and the gap, gets the data that falls into timestamp +/- gap.
- * This is used for getting the entry the data given the timestamp belongs to, and for determining whether to merge
+ * Given the key, the timestamp and the gap, gets the windows that overlaps with timestamp to (timestamp + gap).
+ * This is used for getting the windows the timestamp belongs to, and for determining whether to merge
* session windows.
- * This should only return at most two entries if sessions have been merged appropriately.
+ * This should only return at most two windows if sessions have been merged appropriately.
*
* @param key the key
* @param timestamp the timestamp
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
index 906b1b9..0247cbc 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemorySessionWindowedStorage.java
@@ -73,7 +73,7 @@ public class InMemorySessionWindowedStorage<K, V> extends InMemoryWindowedKeyedS
Window.SessionWindow<K> refWindow = new Window.SessionWindow<>(key, timestamp, 1);
Window.SessionWindow<K> floor = sessionWindows.floor(refWindow);
if (floor != null) {
- if (floor.getBeginTimestamp() + floor.getDurationMillis() + gap > timestamp) {
+ if (floor.getBeginTimestamp() + floor.getDurationMillis() > timestamp) {
results.add(new AbstractMap.SimpleEntry<>(floor, map.get(floor).get(key)));
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index a33133b..b01fe61 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -60,12 +60,13 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
KeyT key = ((KeyValPair<KeyT, ?>)inputTuple.getValue()).getKey();
WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows)windowOption;
SessionWindowedStorage<KeyT, AccumT> sessionStorage = (SessionWindowedStorage<KeyT, AccumT>)dataStorage;
- Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries = sessionStorage.getSessionEntries(key, timestamp, sessionWindowOption.getMinGap().getMillis());
+ long minGapMillis = sessionWindowOption.getMinGap().getMillis();
+ Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries = sessionStorage.getSessionEntries(key, timestamp, minGapMillis);
Window.SessionWindow<KeyT> sessionWindowToAssign;
switch (sessionEntries.size()) {
case 0: {
// There are no existing windows within the minimum gap. Create a new session window
- Window.SessionWindow<KeyT> sessionWindow = new Window.SessionWindow<>(key, timestamp, 1);
+ Window.SessionWindow<KeyT> sessionWindow = new Window.SessionWindow<>(key, timestamp, minGapMillis);
windowStateMap.put(sessionWindow, new WindowState());
sessionWindowToAssign = sessionWindow;
break;
@@ -74,7 +75,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
// There is already one existing window within the minimum gap. See whether we need to extend the time of that window
Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry = sessionEntries.iterator().next();
Window.SessionWindow<KeyT> sessionWindow = sessionWindowEntry.getKey();
- if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp < sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) {
+ if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp + minGapMillis <= sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) {
// The session window already covers the event
sessionWindowToAssign = sessionWindow;
} else {
@@ -86,7 +87,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
}
// create a new session window that covers the timestamp
long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), timestamp);
- long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis(), timestamp + 1);
+ long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis(), timestamp + minGapMillis);
Window.SessionWindow<KeyT> newSessionWindow =
new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp);
windowStateMap.remove(sessionWindow);
@@ -97,7 +98,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
break;
}
case 2: {
- // There are two windows that fall within the minimum gap of the timestamp. We need to merge the two windows
+ // There are two windows that overlap the proto-session window of the timestamp. We need to merge the two windows
Iterator<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> iterator = sessionEntries.iterator();
Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry1 = iterator.next();
Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry2 = iterator.next();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
index 8779739..5d39930 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
@@ -106,7 +106,7 @@ public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeye
if (sessionWindows != null) {
for (Window.SessionWindow<K> window : sessionWindows) {
if (timestamp > window.getBeginTimestamp()) {
- if (window.getBeginTimestamp() + window.getDurationMillis() + gap > timestamp) {
+ if (window.getBeginTimestamp() + window.getDurationMillis() > timestamp) {
results.add(new AbstractMap.SimpleEntry<>(window, windowKeyToValueMap.get(new ImmutablePair<Window, K>(window, key))));
}
} else if (timestamp < window.getBeginTimestamp()) {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5131bee0/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index 15aba82..4a1cef0 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -438,7 +438,7 @@ public class WindowedOperatorTest
Assert.assertEquals(1, out.getWindows().size());
Window.SessionWindow<String> window1 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
Assert.assertEquals(1100L, window1.getBeginTimestamp());
- Assert.assertEquals(1, window1.getDurationMillis());
+ Assert.assertEquals(2000, window1.getDurationMillis());
Assert.assertEquals("a", window1.getKey());
Assert.assertEquals("a", out.getValue().getKey());
Assert.assertEquals(2L, out.getValue().getValue().longValue());
@@ -461,7 +461,7 @@ public class WindowedOperatorTest
Window.SessionWindow<String> window2 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
Assert.assertEquals(1100L, window2.getBeginTimestamp());
- Assert.assertEquals(901, window2.getDurationMillis());
+ Assert.assertEquals(2900, window2.getDurationMillis());
Assert.assertEquals("a", out.getValue().getKey());
Assert.assertEquals(5L, out.getValue().getValue().longValue());
sink.clear();
@@ -474,7 +474,7 @@ public class WindowedOperatorTest
Assert.assertEquals(1, out.getWindows().size());
Window.SessionWindow<String> window3 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
Assert.assertEquals(5000L, window3.getBeginTimestamp());
- Assert.assertEquals(1, window3.getDurationMillis());
+ Assert.assertEquals(2000, window3.getDurationMillis());
Assert.assertEquals("a", out.getValue().getKey());
Assert.assertEquals(4L, out.getValue().getValue().longValue());
sink.clear();
@@ -510,7 +510,7 @@ public class WindowedOperatorTest
Assert.assertEquals(1, out.getWindows().size());
Window.SessionWindow<String> window4 = (Window.SessionWindow<String>)out.getWindows().iterator().next();
Assert.assertEquals(1100L, window4.getBeginTimestamp());
- Assert.assertEquals(3901, window4.getDurationMillis());
+ Assert.assertEquals(5900, window4.getDurationMillis());
Assert.assertEquals("a", out.getValue().getKey());
Assert.assertEquals(12L, out.getValue().getValue().longValue());