You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by tu...@apache.org on 2017/05/16 12:54:31 UTC
apex-core git commit: APEXCORE-720 Update cloned LogicalPlan in
Context before discovery of plugins
Repository: apex-core
Updated Branches:
refs/heads/master e92741474 -> 899f4cb0a
APEXCORE-720 Update cloned LogicalPlan in Context before discovery of plugins
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/899f4cb0
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/899f4cb0
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/899f4cb0
Branch: refs/heads/master
Commit: 899f4cb0a01d799c8a3f6905ee98429bbde2bc82
Parents: e927414
Author: Chinmay Kolhatkar <ch...@apache.org>
Authored: Fri May 12 16:22:24 2017 +0530
Committer: Chinmay Kolhatkar <ch...@apache.org>
Committed: Tue May 16 16:58:24 2017 +0530
----------------------------------------------------------------------
.../apex/engine/plugin/AbstractApexPluginDispatcher.java | 10 ++++------
.../java/org/apache/apex/engine/plugin/DebugPlugin.java | 10 ++++++++++
.../java/org/apache/apex/engine/plugin/PluginTests.java | 5 ++++-
3 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/899f4cb0/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
index 5e468f5..a4aca46 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
@@ -170,12 +170,10 @@ public abstract class AbstractApexPluginDispatcher extends AbstractService imple
@Override
public void dispatch(Event event)
{
- if (!plugins.isEmpty()) {
- if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
- clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag);
- } else if (event instanceof DAGExecutionEvent) {
- dispatchExecutionEvent((DAGExecutionEvent)event);
- }
+ if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
+ clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag);
+ } else if (!plugins.isEmpty() && (event instanceof DAGExecutionEvent)) {
+ dispatchExecutionEvent((DAGExecutionEvent)event);
}
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/899f4cb0/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
index 833d69f..654a4ce 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
+import com.datatorrent.api.DAG;
+
import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.COMMIT_EVENT;
import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.HEARTBEAT_EVENT;
import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.STRAM_EVENT;
@@ -39,10 +41,13 @@ public class DebugPlugin implements DAGExecutionPlugin<DAGExecutionPlugin.Contex
private int heartbeatCount = 0;
private int commitCount = 0;
CountDownLatch latch = new CountDownLatch(3);
+ private Context context;
@Override
public void setup(DAGExecutionPlugin.Context context)
{
+ this.context = context;
+
context.register(STRAM_EVENT, new EventHandler<DAGExecutionEvent.StramExecutionEvent>()
{
@Override
@@ -102,4 +107,9 @@ public class DebugPlugin implements DAGExecutionPlugin<DAGExecutionPlugin.Contex
{
latch.await(timeout, TimeUnit.SECONDS);
}
+
+ public DAG getLogicalPlan()
+ {
+ return context.getDAG();
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/899f4cb0/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
index 140dc65..34589b0 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Attribute;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
public class PluginTests
@@ -93,12 +94,14 @@ public class PluginTests
}));
pluginManager.dispatch(new DAGExecutionEvent.CommitExecutionEvent(1234));
pluginManager.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(new StreamingContainerUmbilicalProtocol.ContainerHeartbeat()));
+ LogicalPlan plan = new LogicalPlan();
+ pluginManager.dispatch(new ApexPluginDispatcher.DAGChangeEvent(plan));
debugPlugin.waitForEventDelivery(10);
pluginManager.stop();
Assert.assertEquals(1, debugPlugin.getEventCount());
Assert.assertEquals(1, debugPlugin.getHeartbeatCount());
Assert.assertEquals(1, debugPlugin.getCommitCount());
+ Assert.assertEquals(plan, debugPlugin.getLogicalPlan());
}
-
}