You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2017/01/18 18:39:03 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2359 #resolve #comment
Optimise fire trigger to avoid go through all data
Repository: apex-malhar
Updated Branches:
refs/heads/master 0885bfad2 -> 4cbbb7507
APEXMALHAR-2359 #resolve #comment Optimise fire trigger to avoid go through all data
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/875e47c7
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/875e47c7
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/875e47c7
Branch: refs/heads/master
Commit: 875e47c7a74b7740c28b8fddd12bc990e9f8ae0a
Parents: ca6995c
Author: brightchen <br...@datatorrent.com>
Authored: Tue Nov 29 15:05:09 2016 -0800
Committer: brightchen <br...@datatorrent.com>
Committed: Tue Jan 17 20:55:43 2017 -0800
----------------------------------------------------------------------
.../AbstractWindowedOperatorBenchmarkApp.java | 5 ++
.../KeyedWindowedOperatorBenchmarkApp.java | 24 ++++++
.../window/impl/AbstractWindowedOperator.java | 11 ++-
.../window/impl/KeyedWindowedOperatorImpl.java | 77 +++++++++++++++++---
4 files changed, 104 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/875e47c7/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
index 09f7653..64af9f9 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
@@ -106,6 +106,7 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
windowedOperator.setDataStorage(createDataStorage(sccImpl));
windowedOperator.setRetractionStorage(createRetractionStorage(sccImpl));
windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage());
+ setUpdatedKeyStorage(windowedOperator, conf, sccImpl);
windowedOperator.setAccumulation(createAccumulation());
windowedOperator.setAllowedLateness(Duration.millis(ALLOWED_LATENESS));
@@ -121,6 +122,10 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
}
}
+ protected void setUpdatedKeyStorage(O windowedOperator, Configuration conf, SpillableComplexComponentImpl sccImpl)
+ {
+ }
+
protected abstract WindowedStorage createDataStorage(SpillableComplexComponentImpl sccImpl);
protected abstract WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/875e47c7/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
index 7b2085d..5a9c955 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
@@ -22,17 +22,23 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableSetMultimapImpl;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
import org.apache.apex.malhar.lib.window.Accumulation;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Tuple.TimestampedTuple;
+import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowedStorage;
import org.apache.apex.malhar.lib.window.accumulation.Count;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
+import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.lib.fileaccess.TFileImpl;
import com.datatorrent.lib.util.KeyValPair;
public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator>
@@ -51,6 +57,12 @@ public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorB
dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL);
}
+ @Override
+ protected void setUpdatedKeyStorage(MyKeyedWindowedOperator windowedOperator, Configuration conf, SpillableComplexComponentImpl sccImpl)
+ {
+ windowedOperator.setUpdatedKeyStorage(createUpdatedDataStorage(conf, sccImpl));
+ }
+
protected static class MyKeyedWindowedOperator extends KeyedWindowedOperatorImpl
{
private static final Logger logger = LoggerFactory.getLogger(MyKeyedWindowedOperator.class);
@@ -124,6 +136,18 @@ public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorB
return dataStorage;
}
+ protected SpillableSetMultimapImpl<Window, String> createUpdatedDataStorage(Configuration conf,
+ SpillableComplexComponentImpl sccImpl)
+ {
+ String basePath = getStoreBasePath(conf);
+ ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore();
+ ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
+
+ SpillableSetMultimapImpl<Window, String> dataStorage = new SpillableSetMultimapImpl<Window, String>(store,
+ new byte[] {(byte)1}, 0, new GenericSerde<Window>(), new GenericSerde<String>());
+ return dataStorage;
+ }
+
@Override
protected WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/875e47c7/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index e8ff622..4ba81b3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -83,9 +83,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
protected long currentWatermark = -1;
private boolean triggerAtWatermark;
protected long earlyTriggerCount;
- private long earlyTriggerMillis;
+ protected long earlyTriggerMillis;
protected long lateTriggerCount;
- private long lateTriggerMillis;
+ protected long lateTriggerMillis;
private long currentDerivedTimestamp = -1;
private long timeIncrement;
protected long fixedWatermarkMillis = -1;
@@ -543,11 +543,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
}
}
+ protected boolean isFiringOnlyUpdatedPanes()
+ {
+ return triggerOption.isFiringOnlyUpdatedPanes();
+ }
+
@Override
public void fireTrigger(Window window, WindowState windowState)
{
if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
- fireRetractionTrigger(window, triggerOption.isFiringOnlyUpdatedPanes());
+ fireRetractionTrigger(window, isFiringOnlyUpdatedPanes());
}
fireNormalTrigger(window, triggerOption.isFiringOnlyUpdatedPanes());
windowState.lastTriggerFiredTime = currentDerivedTimestamp;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/875e47c7/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index deb718b..2bfff03 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
+import org.apache.apex.malhar.lib.state.spillable.SpillableSetMultimapImpl;
import org.apache.apex.malhar.lib.window.Accumulation;
import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
import org.apache.apex.malhar.lib.window.TriggerOption;
@@ -33,6 +34,7 @@ import org.apache.apex.malhar.lib.window.WindowState;
import org.apache.apex.malhar.lib.window.WindowedStorage;
import org.apache.hadoop.classification.InterfaceStability;
+import com.datatorrent.api.Context;
import com.datatorrent.lib.util.KeyValPair;
/**
@@ -50,6 +52,18 @@ import com.datatorrent.lib.util.KeyValPair;
public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<? super InputValT, AccumT, OutputValT>>
{
+ private SpillableSetMultimapImpl<Window, KeyT> updatedKeyStorage;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ if (useUpdatedKeyStorage()) {
+ updatedKeyStorage.getStore().setup(context);
+ updatedKeyStorage.setup(context);
+ }
+
+ super.setup(context);
+ }
@Override
protected <T> Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<T> inputTuple)
@@ -135,6 +149,20 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
}
@Override
+ public void endWindow()
+ {
+ super.endWindow();
+ if (useUpdatedKeyStorage()) {
+ updatedKeyStorage.endWindow();
+ }
+ }
+
+ private boolean useUpdatedKeyStorage()
+ {
+ return updatedKeyStorage != null && isFiringOnlyUpdatedPanes();
+ }
+
+ @Override
public void accumulateTuple(Tuple.WindowedTuple<KeyValPair<KeyT, InputValT>> tuple)
{
KeyValPair<KeyT, InputValT> kvData = tuple.getValue();
@@ -145,24 +173,43 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
if (accum == null) {
accum = accumulation.defaultAccumulatedValue();
}
- dataStorage.put(window, key, accumulation.accumulate(accum, kvData.getValue()));
+
+ InputValT inputValue = kvData.getValue();
+ AccumT newValue = accumulation.accumulate(accum, inputValue);
+ if ((earlyTriggerMillis > 0 || lateTriggerMillis > 0 || earlyTriggerCount > 0 || lateTriggerCount > 0) && useUpdatedKeyStorage()) {
+ updatedKeyStorage.put(window, key);
+ }
+
+ dataStorage.put(window, key, newValue);
}
}
@Override
public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
{
- for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) {
- OutputValT outputVal = accumulation.getOutput(entry.getValue());
- if (fireOnlyUpdatedPanes && retractionStorage != null) {
- OutputValT oldValue = retractionStorage.get(window, entry.getKey());
- if (oldValue != null && oldValue.equals(outputVal)) {
- continue;
+ if (useUpdatedKeyStorage()) {
+ for (KeyT key : updatedKeyStorage.get(window)) {
+ OutputValT outputVal = accumulation.getOutput(dataStorage.get(window, key));
+ if (retractionStorage != null) {
+ OutputValT oldValue = retractionStorage.get(window, key);
+ if (oldValue != null && oldValue.equals(outputVal)) {
+ continue;
+ }
+ }
+ output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(key, outputVal)));
+ if (retractionStorage != null) {
+ retractionStorage.put(window, key, outputVal);
}
}
- output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal)));
- if (retractionStorage != null) {
- retractionStorage.put(window, entry.getKey(), outputVal);
+ updatedKeyStorage.removeAll(window);
+ } else {
+ for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) {
+ OutputValT outputVal = accumulation.getOutput(entry.getValue());
+
+ output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal)));
+ if (retractionStorage != null) {
+ retractionStorage.put(window, entry.getKey(), outputVal);
+ }
}
}
}
@@ -187,4 +234,14 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
}
}
+ public SpillableSetMultimapImpl<Window, KeyT> getUpdatedKeyStorage()
+ {
+ return updatedKeyStorage;
+ }
+
+ public void setUpdatedKeyStorage(SpillableSetMultimapImpl<Window, KeyT> updatedKeyStorage)
+ {
+ this.updatedKeyStorage = updatedKeyStorage;
+ }
+
}
[2/2] apex-malhar git commit: Merge commit 'refs/pull/518/head' of
github.com:apache/apex-malhar
Posted by da...@apache.org.
Merge commit 'refs/pull/518/head' of github.com:apache/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4cbbb750
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4cbbb750
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4cbbb750
Branch: refs/heads/master
Commit: 4cbbb7507b6efc35e5e9b8058bd63a69d389c4ec
Parents: 0885bfa 875e47c
Author: David Yan <da...@apache.org>
Authored: Wed Jan 18 10:38:38 2017 -0800
Committer: David Yan <da...@apache.org>
Committed: Wed Jan 18 10:38:38 2017 -0800
----------------------------------------------------------------------
.../AbstractWindowedOperatorBenchmarkApp.java | 5 ++
.../KeyedWindowedOperatorBenchmarkApp.java | 24 ++++++
.../window/impl/AbstractWindowedOperator.java | 11 ++-
.../window/impl/KeyedWindowedOperatorImpl.java | 77 +++++++++++++++++---
4 files changed, 104 insertions(+), 13 deletions(-)
----------------------------------------------------------------------