You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2017/01/18 18:39:03 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2359 #resolve #comment Optimise fire trigger to avoid go through all data

Repository: apex-malhar
Updated Branches:
  refs/heads/master 0885bfad2 -> 4cbbb7507


APEXMALHAR-2359 #resolve #comment Optimise fire trigger to avoid go through all data


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/875e47c7
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/875e47c7
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/875e47c7

Branch: refs/heads/master
Commit: 875e47c7a74b7740c28b8fddd12bc990e9f8ae0a
Parents: ca6995c
Author: brightchen <br...@datatorrent.com>
Authored: Tue Nov 29 15:05:09 2016 -0800
Committer: brightchen <br...@datatorrent.com>
Committed: Tue Jan 17 20:55:43 2017 -0800

----------------------------------------------------------------------
 .../AbstractWindowedOperatorBenchmarkApp.java   |  5 ++
 .../KeyedWindowedOperatorBenchmarkApp.java      | 24 ++++++
 .../window/impl/AbstractWindowedOperator.java   | 11 ++-
 .../window/impl/KeyedWindowedOperatorImpl.java  | 77 +++++++++++++++++---
 4 files changed, 104 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/875e47c7/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
index 09f7653..64af9f9 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
@@ -106,6 +106,7 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
       windowedOperator.setDataStorage(createDataStorage(sccImpl));
       windowedOperator.setRetractionStorage(createRetractionStorage(sccImpl));
       windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage());
+      setUpdatedKeyStorage(windowedOperator, conf, sccImpl);
       windowedOperator.setAccumulation(createAccumulation());
 
       windowedOperator.setAllowedLateness(Duration.millis(ALLOWED_LATENESS));
@@ -121,6 +122,10 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
     }
   }
 
+  protected void setUpdatedKeyStorage(O windowedOperator, Configuration conf, SpillableComplexComponentImpl sccImpl)
+  {
+  }
+
   protected abstract WindowedStorage createDataStorage(SpillableComplexComponentImpl sccImpl);
 
   protected abstract WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/875e47c7/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
index 7b2085d..5a9c955 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
@@ -22,17 +22,23 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableSetMultimapImpl;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
 import org.apache.apex.malhar.lib.window.Accumulation;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.Tuple.TimestampedTuple;
+import org.apache.apex.malhar.lib.window.Window;
 import org.apache.apex.malhar.lib.window.WindowedStorage;
 import org.apache.apex.malhar.lib.window.accumulation.Count;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
 import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
 import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
+import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.lib.fileaccess.TFileImpl;
 import com.datatorrent.lib.util.KeyValPair;
 
 public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator>
@@ -51,6 +57,12 @@ public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorB
     dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL);
   }
 
+  @Override
+  protected void setUpdatedKeyStorage(MyKeyedWindowedOperator windowedOperator, Configuration conf, SpillableComplexComponentImpl sccImpl)
+  {
+    windowedOperator.setUpdatedKeyStorage(createUpdatedDataStorage(conf, sccImpl));
+  }
+
   protected static class MyKeyedWindowedOperator extends KeyedWindowedOperatorImpl
   {
     private static final Logger logger = LoggerFactory.getLogger(MyKeyedWindowedOperator.class);
@@ -124,6 +136,18 @@ public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorB
     return dataStorage;
   }
 
+  protected SpillableSetMultimapImpl<Window, String> createUpdatedDataStorage(Configuration conf,
+      SpillableComplexComponentImpl sccImpl)
+  {
+    String basePath = getStoreBasePath(conf);
+    ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore();
+    ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
+
+    SpillableSetMultimapImpl<Window, String> dataStorage = new SpillableSetMultimapImpl<Window, String>(store,
+        new byte[] {(byte)1}, 0, new GenericSerde<Window>(), new GenericSerde<String>());
+    return dataStorage;
+  }
+
   @Override
   protected WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl)
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/875e47c7/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index e8ff622..4ba81b3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -83,9 +83,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   protected long currentWatermark = -1;
   private boolean triggerAtWatermark;
   protected long earlyTriggerCount;
-  private long earlyTriggerMillis;
+  protected long earlyTriggerMillis;
   protected long lateTriggerCount;
-  private long lateTriggerMillis;
+  protected long lateTriggerMillis;
   private long currentDerivedTimestamp = -1;
   private long timeIncrement;
   protected long fixedWatermarkMillis = -1;
@@ -543,11 +543,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     }
   }
 
+  protected boolean isFiringOnlyUpdatedPanes()
+  {
+    return triggerOption.isFiringOnlyUpdatedPanes();
+  }
+
   @Override
   public void fireTrigger(Window window, WindowState windowState)
   {
     if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
-      fireRetractionTrigger(window, triggerOption.isFiringOnlyUpdatedPanes());
+      fireRetractionTrigger(window, isFiringOnlyUpdatedPanes());
     }
     fireNormalTrigger(window, triggerOption.isFiringOnlyUpdatedPanes());
     windowState.lastTriggerFiredTime = currentDerivedTimestamp;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/875e47c7/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index deb718b..2bfff03 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.apex.malhar.lib.state.spillable.SpillableSetMultimapImpl;
 import org.apache.apex.malhar.lib.window.Accumulation;
 import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
 import org.apache.apex.malhar.lib.window.TriggerOption;
@@ -33,6 +34,7 @@ import org.apache.apex.malhar.lib.window.WindowState;
 import org.apache.apex.malhar.lib.window.WindowedStorage;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.lib.util.KeyValPair;
 
 /**
@@ -50,6 +52,18 @@ import com.datatorrent.lib.util.KeyValPair;
 public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
     extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<? super InputValT, AccumT, OutputValT>>
 {
+  private SpillableSetMultimapImpl<Window, KeyT> updatedKeyStorage;
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    if (useUpdatedKeyStorage()) {
+      updatedKeyStorage.getStore().setup(context);
+      updatedKeyStorage.setup(context);
+    }
+
+    super.setup(context);
+  }
 
   @Override
   protected <T> Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<T> inputTuple)
@@ -135,6 +149,20 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
   }
 
   @Override
+  public void endWindow()
+  {
+    super.endWindow();
+    if (useUpdatedKeyStorage()) {
+      updatedKeyStorage.endWindow();
+    }
+  }
+
+  private boolean useUpdatedKeyStorage()
+  {
+    return updatedKeyStorage != null && isFiringOnlyUpdatedPanes();
+  }
+
+  @Override
   public void accumulateTuple(Tuple.WindowedTuple<KeyValPair<KeyT, InputValT>> tuple)
   {
     KeyValPair<KeyT, InputValT> kvData = tuple.getValue();
@@ -145,24 +173,43 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
       if (accum == null) {
         accum = accumulation.defaultAccumulatedValue();
       }
-      dataStorage.put(window, key, accumulation.accumulate(accum, kvData.getValue()));
+
+      InputValT inputValue = kvData.getValue();
+      AccumT newValue = accumulation.accumulate(accum, inputValue);
+      if ((earlyTriggerMillis > 0 || lateTriggerMillis > 0 || earlyTriggerCount > 0 || lateTriggerCount > 0) && useUpdatedKeyStorage()) {
+        updatedKeyStorage.put(window, key);
+      }
+
+      dataStorage.put(window, key, newValue);
     }
   }
 
   @Override
   public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
   {
-    for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) {
-      OutputValT outputVal = accumulation.getOutput(entry.getValue());
-      if (fireOnlyUpdatedPanes && retractionStorage != null) {
-        OutputValT oldValue = retractionStorage.get(window, entry.getKey());
-        if (oldValue != null && oldValue.equals(outputVal)) {
-          continue;
+    if (useUpdatedKeyStorage()) {
+      for (KeyT key : updatedKeyStorage.get(window)) {
+        OutputValT outputVal = accumulation.getOutput(dataStorage.get(window, key));
+        if (retractionStorage != null) {
+          OutputValT oldValue = retractionStorage.get(window, key);
+          if (oldValue != null && oldValue.equals(outputVal)) {
+            continue;
+          }
+        }
+        output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(key, outputVal)));
+        if (retractionStorage != null) {
+          retractionStorage.put(window, key, outputVal);
         }
       }
-      output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal)));
-      if (retractionStorage != null) {
-        retractionStorage.put(window, entry.getKey(), outputVal);
+      updatedKeyStorage.removeAll(window);
+    } else {
+      for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) {
+        OutputValT outputVal = accumulation.getOutput(entry.getValue());
+
+        output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal)));
+        if (retractionStorage != null) {
+          retractionStorage.put(window, entry.getKey(), outputVal);
+        }
       }
     }
   }
@@ -187,4 +234,14 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
     }
   }
 
+  public SpillableSetMultimapImpl<Window, KeyT> getUpdatedKeyStorage()
+  {
+    return updatedKeyStorage;
+  }
+
+  public void setUpdatedKeyStorage(SpillableSetMultimapImpl<Window, KeyT> updatedKeyStorage)
+  {
+    this.updatedKeyStorage = updatedKeyStorage;
+  }
+
 }


[2/2] apex-malhar git commit: Merge commit 'refs/pull/518/head' of github.com:apache/apex-malhar

Posted by da...@apache.org.
Merge commit 'refs/pull/518/head' of github.com:apache/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4cbbb750
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4cbbb750
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4cbbb750

Branch: refs/heads/master
Commit: 4cbbb7507b6efc35e5e9b8058bd63a69d389c4ec
Parents: 0885bfa 875e47c
Author: David Yan <da...@apache.org>
Authored: Wed Jan 18 10:38:38 2017 -0800
Committer: David Yan <da...@apache.org>
Committed: Wed Jan 18 10:38:38 2017 -0800

----------------------------------------------------------------------
 .../AbstractWindowedOperatorBenchmarkApp.java   |  5 ++
 .../KeyedWindowedOperatorBenchmarkApp.java      | 24 ++++++
 .../window/impl/AbstractWindowedOperator.java   | 11 ++-
 .../window/impl/KeyedWindowedOperatorImpl.java  | 77 +++++++++++++++++---
 4 files changed, 104 insertions(+), 13 deletions(-)
----------------------------------------------------------------------