You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by tu...@apache.org on 2017/05/16 12:54:31 UTC

apex-core git commit: APEXCORE-720 Update cloned LogicalPlan in Context before discovery of plugins

Repository: apex-core
Updated Branches:
  refs/heads/master e92741474 -> 899f4cb0a


APEXCORE-720 Update cloned LogicalPlan in Context before discovery of plugins


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/899f4cb0
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/899f4cb0
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/899f4cb0

Branch: refs/heads/master
Commit: 899f4cb0a01d799c8a3f6905ee98429bbde2bc82
Parents: e927414
Author: Chinmay Kolhatkar <ch...@apache.org>
Authored: Fri May 12 16:22:24 2017 +0530
Committer: Chinmay Kolhatkar <ch...@apache.org>
Committed: Tue May 16 16:58:24 2017 +0530

----------------------------------------------------------------------
 .../apex/engine/plugin/AbstractApexPluginDispatcher.java  | 10 ++++------
 .../java/org/apache/apex/engine/plugin/DebugPlugin.java   | 10 ++++++++++
 .../java/org/apache/apex/engine/plugin/PluginTests.java   |  5 ++++-
 3 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/899f4cb0/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
index 5e468f5..a4aca46 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
@@ -170,12 +170,10 @@ public abstract class AbstractApexPluginDispatcher extends AbstractService imple
   @Override
   public void dispatch(Event event)
   {
-    if (!plugins.isEmpty()) {
-      if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
-        clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag);
-      } else if (event instanceof DAGExecutionEvent) {
-        dispatchExecutionEvent((DAGExecutionEvent)event);
-      }
+    if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
+      clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag);
+    } else if (!plugins.isEmpty() && (event instanceof DAGExecutionEvent)) {
+      dispatchExecutionEvent((DAGExecutionEvent)event);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/899f4cb0/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
index 833d69f..654a4ce 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
 import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
 
+import com.datatorrent.api.DAG;
+
 import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.COMMIT_EVENT;
 import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.HEARTBEAT_EVENT;
 import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.STRAM_EVENT;
@@ -39,10 +41,13 @@ public class DebugPlugin implements DAGExecutionPlugin<DAGExecutionPlugin.Contex
   private int heartbeatCount = 0;
   private int commitCount = 0;
   CountDownLatch latch = new CountDownLatch(3);
+  private Context context;
 
   @Override
   public void setup(DAGExecutionPlugin.Context context)
   {
+    this.context = context;
+
     context.register(STRAM_EVENT, new EventHandler<DAGExecutionEvent.StramExecutionEvent>()
     {
       @Override
@@ -102,4 +107,9 @@ public class DebugPlugin implements DAGExecutionPlugin<DAGExecutionPlugin.Contex
   {
     latch.await(timeout, TimeUnit.SECONDS);
   }
+
+  public DAG getLogicalPlan()
+  {
+    return context.getDAG();
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/899f4cb0/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
index 140dc65..34589b0 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import com.datatorrent.api.Attribute;
 import com.datatorrent.stram.api.StramEvent;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.support.StramTestSupport;
 
 public class PluginTests
@@ -93,12 +94,14 @@ public class PluginTests
     }));
     pluginManager.dispatch(new DAGExecutionEvent.CommitExecutionEvent(1234));
     pluginManager.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(new StreamingContainerUmbilicalProtocol.ContainerHeartbeat()));
+    LogicalPlan plan = new LogicalPlan();
+    pluginManager.dispatch(new ApexPluginDispatcher.DAGChangeEvent(plan));
     debugPlugin.waitForEventDelivery(10);
     pluginManager.stop();
 
     Assert.assertEquals(1, debugPlugin.getEventCount());
     Assert.assertEquals(1, debugPlugin.getHeartbeatCount());
     Assert.assertEquals(1, debugPlugin.getCommitCount());
+    Assert.assertEquals(plan, debugPlugin.getLogicalPlan());
   }
-
 }