You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/04/21 23:04:46 UTC
[1/2] apex-core git commit: APEXCORE-649 Provide snapshot of DAG to
plugins instead of actual DAG
Repository: apex-core
Updated Branches:
refs/heads/master 6cb3e3510 -> cfe9cefed
APEXCORE-649 Provide snapshot of DAG to plugins instead of actual DAG
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d705ed43
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d705ed43
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d705ed43
Branch: refs/heads/master
Commit: d705ed433f3bc6750e4ff693196490d9e9b07061
Parents: 25e4c4c
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Mon Mar 27 11:21:16 2017 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Fri Apr 21 12:09:30 2017 +0530
----------------------------------------------------------------------
.../stram/StreamingAppMasterService.java | 2 +
.../stram/StreamingContainerManager.java | 2 +
.../plugin/AbstractApexPluginDispatcher.java | 216 +++++++++++++++++++
.../AbstractDAGExecutionPluginContext.java | 9 +-
.../engine/plugin/ApexPluginDispatcher.java | 8 +
.../apex/engine/plugin/ApexPluginManager.java | 191 ----------------
.../plugin/DefaultApexPluginDispatcher.java | 4 +-
.../apache/apex/engine/plugin/DebugPlugin.java | 29 +--
.../apache/apex/engine/plugin/PluginTests.java | 25 +--
9 files changed, 242 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 4e4f501..2e88114 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -115,6 +115,7 @@ import com.datatorrent.stram.webapp.AppInfo;
import com.datatorrent.stram.webapp.StramWebApp;
import static java.lang.Thread.sleep;
+import static org.apache.apex.engine.plugin.ApexPluginDispatcher.DAG_CHANGE_EVENT;
/**
* Streaming Application Master
@@ -599,6 +600,7 @@ public class StreamingAppMasterService extends CompositeService
apexPluginDispatcher = new DefaultApexPluginDispatcher(locator, appContext, dnmgr, stats);
dnmgr.apexPluginDispatcher = apexPluginDispatcher;
addService(apexPluginDispatcher);
+ apexPluginDispatcher.dispatch(DAG_CHANGE_EVENT, dnmgr.getLogicalPlan());
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 92fce54..6ec5267 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -181,6 +181,7 @@ import net.engio.mbassy.bus.config.BusConfiguration;
import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.COMMIT_EVENT;
import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.HEARTBEAT;
import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.STRAM_EVENT;
+import static org.apache.apex.engine.plugin.ApexPluginDispatcher.DAG_CHANGE_EVENT;
/**
* Tracks topology provisioning/allocation to containers<p>
@@ -3081,6 +3082,7 @@ public class StreamingContainerManager implements PlanContext
recordEventAsync(new StramEvent.ChangeLogicalPlanEvent(request));
}
pm.applyChanges(StreamingContainerManager.this);
+ apexPluginDispatcher.dispatch(DAG_CHANGE_EVENT, plan.getLogicalPlan());
LOG.info("Plan changes applied: {}", requests);
return null;
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
new file mode 100644
index 0000000..2b96632
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
+import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler;
+import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
+import org.apache.apex.engine.api.plugin.PluginLocator;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.stram.StramAppContext;
+import com.datatorrent.stram.StreamingContainerManager;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.webapp.AppInfo;
+
+/**
+ * A default implementation for ApexPluginDispatcher. It handles common tasks, such as handler
+ * registrations. Actual dispatching is left for classes extending from it.
+ */
+public abstract class AbstractApexPluginDispatcher extends AbstractService implements ApexPluginDispatcher
+{
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractApexPluginDispatcher.class);
+ protected final Collection<DAGExecutionPlugin> plugins = Lists.newArrayList();
+ protected final StramAppContext appContext;
+ protected final StreamingContainerManager dmgr;
+ private final PluginLocator locator;
+ private final AppInfo.AppStats stats;
+ protected Configuration launchConfig;
+ protected FileContext fileContext;
+ protected final Map<DAGExecutionPlugin, PluginInfo> pluginInfoMap = new HashMap<>();
+ private volatile DAG clonedDAG = null;
+
+ public AbstractApexPluginDispatcher(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats)
+ {
+ super(AbstractApexPluginDispatcher.class.getName());
+ this.locator = locator;
+ this.appContext = context;
+ this.dmgr = dmgr;
+ this.stats = stats;
+ LOG.debug("Creating apex service ");
+ }
+
+ private Configuration readLaunchConfiguration() throws IOException
+ {
+ Path appPath = new Path(appContext.getApplicationPath());
+ Path configFilePath = new Path(appPath, LogicalPlan.LAUNCH_CONFIG_FILE_NAME);
+ try {
+ LOG.debug("Reading launch configuration file ");
+ URI uri = appPath.toUri();
+ Configuration config = new YarnConfiguration();
+ fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config);
+ FSDataInputStream is = fileContext.open(configFilePath);
+ config.addResource(is);
+ LOG.debug("Read launch configuration");
+ return config;
+ } catch (FileNotFoundException ex) {
+ LOG.warn("Configuration file not found {}", configFilePath);
+ return new Configuration();
+ }
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception
+ {
+ super.serviceInit(conf);
+ this.launchConfig = readLaunchConfiguration();
+ if (locator != null) {
+ Collection<DAGExecutionPlugin> plugins = locator.discoverPlugins(this.launchConfig);
+ if (plugins != null) {
+ this.plugins.addAll(plugins);
+ for (DAGExecutionPlugin plugin : plugins) {
+ LOG.info("Detected plugin {}", plugin);
+ }
+ }
+ }
+
+ for (DAGExecutionPlugin plugin : plugins) {
+ plugin.setup(new PluginManagerImpl(plugin));
+ }
+ }
+
+ @Override
+ protected void serviceStop() throws Exception
+ {
+ for (DAGExecutionPlugin plugin : plugins) {
+ plugin.teardown();
+ }
+ super.serviceStop();
+ }
+
+ /**
+ * Keeps information about plugin and its registrations. Dispatcher use this
+ * information while delivering events to plugin.
+ */
+ protected class PluginInfo
+ {
+ private final DAGExecutionPlugin plugin;
+ private final Map<RegistrationType<?>, Handler<?>> registrationMap = new HashMap<>();
+
+ <T> void put(RegistrationType<T> registrationType, Handler<T> handler)
+ {
+ registrationMap.put(registrationType, handler);
+ }
+
+ <T> Handler<T> get(RegistrationType<T> registrationType)
+ {
+ return (Handler<T>)registrationMap.get(registrationType);
+ }
+
+ public PluginInfo(DAGExecutionPlugin plugin)
+ {
+ this.plugin = plugin;
+ }
+
+ public DAGExecutionPlugin getPlugin()
+ {
+ return plugin;
+ }
+ }
+
+ PluginInfo getPluginInfo(DAGExecutionPlugin plugin)
+ {
+ PluginInfo pInfo = pluginInfoMap.get(plugin);
+ if (pInfo == null) {
+ pInfo = new PluginInfo(plugin);
+ pluginInfoMap.put(plugin, pInfo);
+ }
+ return pInfo;
+ }
+
+ private <T> void register(RegistrationType<T> type, Handler<T> handler, DAGExecutionPlugin owner)
+ {
+ PluginInfo pInfo = getPluginInfo(owner);
+ pInfo.put(type, handler);
+ }
+
+ /**
+ * A wrapper PluginManager to track registration from a plugin. with this plugin
+ * don't need to pass explicit owner argument during registration.
+ */
+ private class PluginManagerImpl extends AbstractDAGExecutionPluginContext
+ {
+ private final DAGExecutionPlugin owner;
+
+ PluginManagerImpl(DAGExecutionPlugin plugin)
+ {
+ super(appContext, dmgr, stats, launchConfig);
+ this.owner = plugin;
+ }
+
+ @Override
+ public <T> void register(RegistrationType<T> type, Handler<T> handler)
+ {
+ AbstractApexPluginDispatcher.this.register(type, handler, owner);
+ }
+
+ @Override
+ public DAG getDAG()
+ {
+ return clonedDAG;
+ }
+ }
+
+ /**
+ * Dispatch events to plugins.
+ * @param registrationType
+ * @param data
+ * @param <T>
+ */
+ protected abstract <T> void dispatchEvent(RegistrationType<T> registrationType, T data);
+
+ @Override
+ public <T> void dispatch(RegistrationType<T> registrationType, T data)
+ {
+ if (registrationType == ApexPluginDispatcher.DAG_CHANGE_EVENT) {
+ clonedDAG = SerializationUtils.clone((DAG)data);
+ } else {
+ dispatchEvent(registrationType, data);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
index 19ef91a..b17d5f8 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
@@ -53,6 +53,9 @@ public abstract class AbstractDAGExecutionPluginContext implements DAGExecutionP
}
@Override
+ public abstract DAG getDAG();
+
+ @Override
public StramAppContext getApplicationContext()
{
return appContext;
@@ -65,12 +68,6 @@ public abstract class AbstractDAGExecutionPluginContext implements DAGExecutionP
}
@Override
- public DAG getDAG()
- {
- return dnmgr.getLogicalPlan();
- }
-
- @Override
public String getOperatorName(int id)
{
PTOperator ptOperator = dnmgr.getPhysicalPlan().getAllOperators().get(id);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
index c6ef54d..234195f 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
@@ -21,7 +21,15 @@ package org.apache.apex.engine.plugin;
import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
import org.apache.hadoop.service.Service;
+import com.datatorrent.api.DAG;
+
public interface ApexPluginDispatcher extends Service
{
+
+ /**
+ * This is internal event, which is not delivered to the plugins.
+ */
+ RegistrationType<DAG> DAG_CHANGE_EVENT = new RegistrationType<>();
+
<T> void dispatch(RegistrationType<T> registrationType, T data);
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java
deleted file mode 100644
index 9f070a1..0000000
--- a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.engine.plugin;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
-import org.apache.apex.engine.api.plugin.PluginLocator;
-import org.apache.commons.digester.plugins.PluginContext;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.stram.StramAppContext;
-import com.datatorrent.stram.StreamingContainerManager;
-import com.datatorrent.stram.plan.logical.LogicalPlan;
-import com.datatorrent.stram.webapp.AppInfo;
-
-/**
- * A default implementation for ApexPluginDispatcher. It handler common tasks such as per handler
- * registration. actual dispatching is left for classes extending from it.
- */
-public abstract class ApexPluginManager extends AbstractService
-{
- private static final Logger LOG = LoggerFactory.getLogger(ApexPluginManager.class);
- protected final Collection<DAGExecutionPlugin> plugins = Lists.newArrayList();
- protected final StramAppContext appContext;
- protected final StreamingContainerManager dmgr;
- private final PluginLocator locator;
- private final AppInfo.AppStats stats;
- protected Configuration launchConfig;
- protected FileContext fileContext;
- protected final Map<DAGExecutionPlugin, PluginInfo> pluginInfoMap = new HashMap<>();
- protected PluginContext pluginContext;
-
- public ApexPluginManager(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats)
- {
- super(ApexPluginManager.class.getName());
- this.locator = locator;
- this.appContext = context;
- this.dmgr = dmgr;
- this.stats = stats;
- LOG.debug("Creating apex service ");
- }
-
- private Configuration readLaunchConfiguration() throws IOException
- {
- Path appPath = new Path(appContext.getApplicationPath());
- Path configFilePath = new Path(appPath, LogicalPlan.LAUNCH_CONFIG_FILE_NAME);
- try {
- LOG.debug("Reading launch configuration file ");
- URI uri = appPath.toUri();
- Configuration config = new YarnConfiguration();
- fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config);
- FSDataInputStream is = fileContext.open(configFilePath);
- config.addResource(is);
- LOG.debug("Read launch configuration");
- return config;
- } catch (FileNotFoundException ex) {
- LOG.warn("Configuration file not found {}", configFilePath);
- return new Configuration();
- }
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception
- {
- super.serviceInit(conf);
- this.launchConfig = readLaunchConfiguration();
- if (locator != null) {
- Collection<DAGExecutionPlugin> plugins = locator.discoverPlugins(this.launchConfig);
- if (plugins != null) {
- this.plugins.addAll(plugins);
- for (DAGExecutionPlugin plugin : plugins) {
- LOG.info("Detected plugin {}", plugin);
- }
- }
- }
-
- for (DAGExecutionPlugin plugin : plugins) {
- plugin.setup(new PluginManagerImpl(plugin));
- }
- }
-
- @Override
- protected void serviceStop() throws Exception
- {
- for (DAGExecutionPlugin plugin : plugins) {
- plugin.teardown();
- }
- super.serviceStop();
- }
-
- /**
- * Keeps information about plugin and its registrations. Dispatcher use this
- * information while delivering events to plugin.
- */
- class PluginInfo
- {
- private final DAGExecutionPlugin plugin;
- private final Map<RegistrationType<?>, Handler<?>> registrationMap = new HashMap<>();
-
- <T> void put(RegistrationType<T> registrationType, Handler<T> handler)
- {
- registrationMap.put(registrationType, handler);
- }
-
- <T> Handler<T> get(RegistrationType<T> registrationType)
- {
- return (Handler<T>)registrationMap.get(registrationType);
- }
-
- public PluginInfo(DAGExecutionPlugin plugin)
- {
- this.plugin = plugin;
- }
-
- public DAGExecutionPlugin getPlugin()
- {
- return plugin;
- }
- }
-
- PluginInfo getPluginInfo(DAGExecutionPlugin plugin)
- {
- PluginInfo pInfo = pluginInfoMap.get(plugin);
- if (pInfo == null) {
- pInfo = new PluginInfo(plugin);
- pluginInfoMap.put(plugin, pInfo);
- }
- return pInfo;
- }
-
- public <T> void register(RegistrationType<T> type, Handler<T> handler, DAGExecutionPlugin owner)
- {
- PluginInfo pInfo = getPluginInfo(owner);
- pInfo.put(type, handler);
- }
-
- /**
- * A wrapper PluginManager to track registration from a plugin. with this plugin
- * don't need to pass explicit owner argument during registration.
- */
- class PluginManagerImpl extends AbstractDAGExecutionPluginContext
- {
- private final DAGExecutionPlugin owner;
-
- PluginManagerImpl(DAGExecutionPlugin plugin)
- {
- super(appContext, dmgr, stats, launchConfig);
- this.owner = plugin;
- }
-
- @Override
- public <T> void register(RegistrationType<T> type, Handler<T> handler)
- {
- ApexPluginManager.this.register(type, handler, owner);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
index 0eee85e..bea011c 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
@@ -43,7 +43,7 @@ import com.datatorrent.stram.webapp.AppInfo;
* service to process the event asynchronously. A separate task {@link DefaultApexPluginDispatcher.ProcessEventTask}
* is created to process an event and then submitted to the executor for execution.
*/
-public class DefaultApexPluginDispatcher extends ApexPluginManager implements ApexPluginDispatcher
+public class DefaultApexPluginDispatcher extends AbstractApexPluginDispatcher
{
private static final Logger LOG = LoggerFactory.getLogger(DefaultApexPluginDispatcher.class);
private int qsize = 4098;
@@ -56,7 +56,7 @@ public class DefaultApexPluginDispatcher extends ApexPluginManager implements Ap
}
@Override
- public <T> void dispatch(RegistrationType<T> registrationType, T data)
+ protected <T> void dispatchEvent(RegistrationType<T> registrationType, T data)
{
if (executorService != null) {
executorService.submit(new ProcessEventTask<>(registrationType, data));
http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
index 4a64b10..5b8ca11 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
@@ -18,9 +18,8 @@
*/
package org.apache.apex.engine.plugin;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext;
@@ -38,8 +37,7 @@ public class DebugPlugin implements DAGExecutionPlugin
private int eventCount = 0;
private int heartbeatCount = 0;
private int commitCount = 0;
- private final Lock lock = new ReentrantLock();
- final Condition events = lock.newCondition();
+ CountDownLatch latch = new CountDownLatch(3);
@Override
public void setup(DAGExecutionPluginContext context)
@@ -49,10 +47,8 @@ public class DebugPlugin implements DAGExecutionPlugin
@Override
public void handle(StramEvent stramEvent)
{
- lock();
eventCount++;
- events.signal();
- unlock();
+ latch.countDown();
}
});
@@ -61,10 +57,8 @@ public class DebugPlugin implements DAGExecutionPlugin
@Override
public void handle(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat)
{
- lock();
heartbeatCount++;
- events.signal();
- unlock();
+ latch.countDown();
}
});
@@ -73,10 +67,8 @@ public class DebugPlugin implements DAGExecutionPlugin
@Override
public void handle(Long aLong)
{
- lock();
commitCount++;
- events.signal();
- unlock();
+ latch.countDown();
}
});
}
@@ -102,13 +94,8 @@ public class DebugPlugin implements DAGExecutionPlugin
return commitCount;
}
- void lock()
+ public void waitForEventDelivery(long timeout) throws InterruptedException
{
- this.lock.lock();
- }
-
- void unlock()
- {
- this.lock.unlock();
+ latch.await(timeout, TimeUnit.SECONDS);
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
index fda607f..84dc4ba 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
@@ -19,7 +19,6 @@
package org.apache.apex.engine.plugin;
import java.util.Collection;
-import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
@@ -87,7 +86,6 @@ public class PluginTests
ApexPluginDispatcher pluginManager = new DefaultApexPluginDispatcher(locator,
new StramTestSupport.TestAppContext(new Attribute.AttributeMap.DefaultAttributeMap()), null, null);
pluginManager.init(new Configuration());
- int count = debugPlugin.getEventCount();
pluginManager.dispatch(STRAM_EVENT, new StramEvent(StramEvent.LogLevel.DEBUG)
{
@Override
@@ -96,30 +94,9 @@ public class PluginTests
return "TestEvent";
}
});
-
- debugPlugin.lock();
- while (debugPlugin.getEventCount() == count) {
- debugPlugin.events.await(5, TimeUnit.SECONDS);
- }
- debugPlugin.unlock();
-
- Assert.assertEquals("Total stram event received ", debugPlugin.getEventCount(), 1);
-
- count = debugPlugin.getCommitCount();
pluginManager.dispatch(COMMIT_EVENT, new Long(1234));
- debugPlugin.lock();
- while (debugPlugin.getCommitCount() == count) {
- debugPlugin.events.await(5, TimeUnit.SECONDS);
- }
- debugPlugin.unlock();
-
- count = debugPlugin.getHeartbeatCount();
pluginManager.dispatch(HEARTBEAT, new StreamingContainerUmbilicalProtocol.ContainerHeartbeat());
- debugPlugin.lock();
- while (debugPlugin.getHeartbeatCount() == count) {
- debugPlugin.events.await(5, TimeUnit.SECONDS);
- }
- debugPlugin.unlock();
+ debugPlugin.waitForEventDelivery(10);
pluginManager.stop();
Assert.assertEquals(1, debugPlugin.getEventCount());
[2/2] apex-core git commit: Merge branch 'APEXCORE-649' of
github.com:tushargosavi/apex-core
Posted by pr...@apache.org.
Merge branch 'APEXCORE-649' of github.com:tushargosavi/apex-core
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/cfe9cefe
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/cfe9cefe
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/cfe9cefe
Branch: refs/heads/master
Commit: cfe9cefed1400f83dc95a17a1f1f8de921d552b1
Parents: 6cb3e35 d705ed4
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Apr 21 14:51:47 2017 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Fri Apr 21 14:51:47 2017 -0700
----------------------------------------------------------------------
.../stram/StreamingAppMasterService.java | 2 +
.../stram/StreamingContainerManager.java | 2 +
.../plugin/AbstractApexPluginDispatcher.java | 216 +++++++++++++++++++
.../AbstractDAGExecutionPluginContext.java | 9 +-
.../engine/plugin/ApexPluginDispatcher.java | 8 +
.../apex/engine/plugin/ApexPluginManager.java | 191 ----------------
.../plugin/DefaultApexPluginDispatcher.java | 4 +-
.../apache/apex/engine/plugin/DebugPlugin.java | 29 +--
.../apache/apex/engine/plugin/PluginTests.java | 25 +--
9 files changed, 242 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/cfe9cefe/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------