You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/04/21 23:04:46 UTC

[1/2] apex-core git commit: APEXCORE-649 Provide snapshot of DAG to plugins instead of actual DAG

Repository: apex-core
Updated Branches:
  refs/heads/master 6cb3e3510 -> cfe9cefed


APEXCORE-649 Provide snapshot of DAG to plugins instead of actual DAG


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d705ed43
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d705ed43
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d705ed43

Branch: refs/heads/master
Commit: d705ed433f3bc6750e4ff693196490d9e9b07061
Parents: 25e4c4c
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Mon Mar 27 11:21:16 2017 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Fri Apr 21 12:09:30 2017 +0530

----------------------------------------------------------------------
 .../stram/StreamingAppMasterService.java        |   2 +
 .../stram/StreamingContainerManager.java        |   2 +
 .../plugin/AbstractApexPluginDispatcher.java    | 216 +++++++++++++++++++
 .../AbstractDAGExecutionPluginContext.java      |   9 +-
 .../engine/plugin/ApexPluginDispatcher.java     |   8 +
 .../apex/engine/plugin/ApexPluginManager.java   | 191 ----------------
 .../plugin/DefaultApexPluginDispatcher.java     |   4 +-
 .../apache/apex/engine/plugin/DebugPlugin.java  |  29 +--
 .../apache/apex/engine/plugin/PluginTests.java  |  25 +--
 9 files changed, 242 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 4e4f501..2e88114 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -115,6 +115,7 @@ import com.datatorrent.stram.webapp.AppInfo;
 import com.datatorrent.stram.webapp.StramWebApp;
 
 import static java.lang.Thread.sleep;
+import static org.apache.apex.engine.plugin.ApexPluginDispatcher.DAG_CHANGE_EVENT;
 
 /**
  * Streaming Application Master
@@ -599,6 +600,7 @@ public class StreamingAppMasterService extends CompositeService
     apexPluginDispatcher = new DefaultApexPluginDispatcher(locator, appContext, dnmgr, stats);
     dnmgr.apexPluginDispatcher = apexPluginDispatcher;
     addService(apexPluginDispatcher);
+    apexPluginDispatcher.dispatch(DAG_CHANGE_EVENT, dnmgr.getLogicalPlan());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 92fce54..6ec5267 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -181,6 +181,7 @@ import net.engio.mbassy.bus.config.BusConfiguration;
 import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.COMMIT_EVENT;
 import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.HEARTBEAT;
 import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.STRAM_EVENT;
+import static org.apache.apex.engine.plugin.ApexPluginDispatcher.DAG_CHANGE_EVENT;
 
 /**
  * Tracks topology provisioning/allocation to containers<p>
@@ -3081,6 +3082,7 @@ public class StreamingContainerManager implements PlanContext
         recordEventAsync(new StramEvent.ChangeLogicalPlanEvent(request));
       }
       pm.applyChanges(StreamingContainerManager.this);
+      apexPluginDispatcher.dispatch(DAG_CHANGE_EVENT, plan.getLogicalPlan());
       LOG.info("Plan changes applied: {}", requests);
       return null;
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
new file mode 100644
index 0000000..2b96632
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.plugin;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
+import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler;
+import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
+import org.apache.apex.engine.api.plugin.PluginLocator;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.stram.StramAppContext;
+import com.datatorrent.stram.StreamingContainerManager;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.webapp.AppInfo;
+
+/**
+ * A default implementation for ApexPluginDispatcher. It handles common tasks, such as handler
+ * registrations. Actual dispatching is left for classes extending from it.
+ */
+public abstract class AbstractApexPluginDispatcher extends AbstractService implements ApexPluginDispatcher
+{
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractApexPluginDispatcher.class);
+  protected final Collection<DAGExecutionPlugin> plugins = Lists.newArrayList();
+  protected final StramAppContext appContext;
+  protected final StreamingContainerManager dmgr;
+  private final PluginLocator locator;
+  private final AppInfo.AppStats stats;
+  protected Configuration launchConfig;
+  protected FileContext fileContext;
+  protected final Map<DAGExecutionPlugin, PluginInfo> pluginInfoMap = new HashMap<>();
+  private volatile DAG clonedDAG = null;
+
+  public AbstractApexPluginDispatcher(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats)
+  {
+    super(AbstractApexPluginDispatcher.class.getName());
+    this.locator = locator;
+    this.appContext = context;
+    this.dmgr = dmgr;
+    this.stats = stats;
+    LOG.debug("Creating apex service ");
+  }
+
+  private Configuration readLaunchConfiguration() throws IOException
+  {
+    Path appPath = new Path(appContext.getApplicationPath());
+    Path  configFilePath = new Path(appPath, LogicalPlan.LAUNCH_CONFIG_FILE_NAME);
+    try {
+      LOG.debug("Reading launch configuration file ");
+      URI uri = appPath.toUri();
+      Configuration config = new YarnConfiguration();
+      fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config);
+      FSDataInputStream is = fileContext.open(configFilePath);
+      config.addResource(is);
+      LOG.debug("Read launch configuration");
+      return config;
+    } catch (FileNotFoundException ex) {
+      LOG.warn("Configuration file not found {}", configFilePath);
+      return new Configuration();
+    }
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception
+  {
+    super.serviceInit(conf);
+    this.launchConfig = readLaunchConfiguration();
+    if (locator != null) {
+      Collection<DAGExecutionPlugin> plugins = locator.discoverPlugins(this.launchConfig);
+      if (plugins != null) {
+        this.plugins.addAll(plugins);
+        for (DAGExecutionPlugin plugin : plugins) {
+          LOG.info("Detected plugin {}", plugin);
+        }
+      }
+    }
+
+    for (DAGExecutionPlugin plugin : plugins) {
+      plugin.setup(new PluginManagerImpl(plugin));
+    }
+  }
+
+  @Override
+  protected void serviceStop() throws Exception
+  {
+    for (DAGExecutionPlugin plugin : plugins) {
+      plugin.teardown();
+    }
+    super.serviceStop();
+  }
+
+  /**
+   * Keeps information about plugin and its registrations. Dispatcher use this
+   * information while delivering events to plugin.
+   */
+  protected class PluginInfo
+  {
+    private final DAGExecutionPlugin plugin;
+    private final Map<RegistrationType<?>, Handler<?>> registrationMap = new HashMap<>();
+
+    <T> void put(RegistrationType<T> registrationType, Handler<T> handler)
+    {
+      registrationMap.put(registrationType, handler);
+    }
+
+    <T> Handler<T> get(RegistrationType<T> registrationType)
+    {
+      return (Handler<T>)registrationMap.get(registrationType);
+    }
+
+    public PluginInfo(DAGExecutionPlugin plugin)
+    {
+      this.plugin = plugin;
+    }
+
+    public DAGExecutionPlugin getPlugin()
+    {
+      return plugin;
+    }
+  }
+
+  PluginInfo getPluginInfo(DAGExecutionPlugin plugin)
+  {
+    PluginInfo pInfo = pluginInfoMap.get(plugin);
+    if (pInfo == null) {
+      pInfo = new PluginInfo(plugin);
+      pluginInfoMap.put(plugin, pInfo);
+    }
+    return pInfo;
+  }
+
+  private <T> void register(RegistrationType<T> type, Handler<T> handler, DAGExecutionPlugin owner)
+  {
+    PluginInfo pInfo = getPluginInfo(owner);
+    pInfo.put(type, handler);
+  }
+
+  /**
+   * A wrapper PluginManager to track registration from a plugin. with this plugin
+   * don't need to pass explicit owner argument during registration.
+   */
+  private class PluginManagerImpl extends AbstractDAGExecutionPluginContext
+  {
+    private final DAGExecutionPlugin owner;
+
+    PluginManagerImpl(DAGExecutionPlugin plugin)
+    {
+      super(appContext, dmgr, stats, launchConfig);
+      this.owner = plugin;
+    }
+
+    @Override
+    public <T> void register(RegistrationType<T> type, Handler<T> handler)
+    {
+      AbstractApexPluginDispatcher.this.register(type, handler, owner);
+    }
+
+    @Override
+    public DAG getDAG()
+    {
+      return clonedDAG;
+    }
+  }
+
+  /**
+   * Dispatch events to plugins.
+   * @param registrationType
+   * @param data
+   * @param <T>
+   */
+  protected abstract <T> void dispatchEvent(RegistrationType<T> registrationType, T data);
+
+  @Override
+  public <T> void dispatch(RegistrationType<T> registrationType, T data)
+  {
+    if (registrationType == ApexPluginDispatcher.DAG_CHANGE_EVENT) {
+      clonedDAG = SerializationUtils.clone((DAG)data);
+    } else {
+      dispatchEvent(registrationType, data);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
index 19ef91a..b17d5f8 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
@@ -53,6 +53,9 @@ public abstract class AbstractDAGExecutionPluginContext implements DAGExecutionP
   }
 
   @Override
+  public abstract DAG getDAG();
+
+  @Override
   public StramAppContext getApplicationContext()
   {
     return appContext;
@@ -65,12 +68,6 @@ public abstract class AbstractDAGExecutionPluginContext implements DAGExecutionP
   }
 
   @Override
-  public DAG getDAG()
-  {
-    return dnmgr.getLogicalPlan();
-  }
-
-  @Override
   public String getOperatorName(int id)
   {
     PTOperator ptOperator = dnmgr.getPhysicalPlan().getAllOperators().get(id);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
index c6ef54d..234195f 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
@@ -21,7 +21,15 @@ package org.apache.apex.engine.plugin;
 import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
 import org.apache.hadoop.service.Service;
 
+import com.datatorrent.api.DAG;
+
 public interface ApexPluginDispatcher extends Service
 {
+
+  /**
+   * This is internal event, which is not delivered to the plugins.
+   */
+  RegistrationType<DAG> DAG_CHANGE_EVENT = new RegistrationType<>();
+
   <T> void dispatch(RegistrationType<T> registrationType, T data);
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java
deleted file mode 100644
index 9f070a1..0000000
--- a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.engine.plugin;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
-import org.apache.apex.engine.api.plugin.PluginLocator;
-import org.apache.commons.digester.plugins.PluginContext;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.stram.StramAppContext;
-import com.datatorrent.stram.StreamingContainerManager;
-import com.datatorrent.stram.plan.logical.LogicalPlan;
-import com.datatorrent.stram.webapp.AppInfo;
-
-/**
- * A default implementation for ApexPluginDispatcher. It handler common tasks such as per handler
- * registration. actual dispatching is left for classes extending from it.
- */
-public abstract class ApexPluginManager extends AbstractService
-{
-  private static final Logger LOG = LoggerFactory.getLogger(ApexPluginManager.class);
-  protected final Collection<DAGExecutionPlugin> plugins = Lists.newArrayList();
-  protected final StramAppContext appContext;
-  protected final StreamingContainerManager dmgr;
-  private final PluginLocator locator;
-  private final AppInfo.AppStats stats;
-  protected Configuration launchConfig;
-  protected FileContext fileContext;
-  protected final Map<DAGExecutionPlugin, PluginInfo> pluginInfoMap = new HashMap<>();
-  protected PluginContext pluginContext;
-
-  public ApexPluginManager(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats)
-  {
-    super(ApexPluginManager.class.getName());
-    this.locator = locator;
-    this.appContext = context;
-    this.dmgr = dmgr;
-    this.stats = stats;
-    LOG.debug("Creating apex service ");
-  }
-
-  private Configuration readLaunchConfiguration() throws IOException
-  {
-    Path appPath = new Path(appContext.getApplicationPath());
-    Path  configFilePath = new Path(appPath, LogicalPlan.LAUNCH_CONFIG_FILE_NAME);
-    try {
-      LOG.debug("Reading launch configuration file ");
-      URI uri = appPath.toUri();
-      Configuration config = new YarnConfiguration();
-      fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config);
-      FSDataInputStream is = fileContext.open(configFilePath);
-      config.addResource(is);
-      LOG.debug("Read launch configuration");
-      return config;
-    } catch (FileNotFoundException ex) {
-      LOG.warn("Configuration file not found {}", configFilePath);
-      return new Configuration();
-    }
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception
-  {
-    super.serviceInit(conf);
-    this.launchConfig = readLaunchConfiguration();
-    if (locator != null) {
-      Collection<DAGExecutionPlugin> plugins = locator.discoverPlugins(this.launchConfig);
-      if (plugins != null) {
-        this.plugins.addAll(plugins);
-        for (DAGExecutionPlugin plugin : plugins) {
-          LOG.info("Detected plugin {}", plugin);
-        }
-      }
-    }
-
-    for (DAGExecutionPlugin plugin : plugins) {
-      plugin.setup(new PluginManagerImpl(plugin));
-    }
-  }
-
-  @Override
-  protected void serviceStop() throws Exception
-  {
-    for (DAGExecutionPlugin plugin : plugins) {
-      plugin.teardown();
-    }
-    super.serviceStop();
-  }
-
-  /**
-   * Keeps information about plugin and its registrations. Dispatcher use this
-   * information while delivering events to plugin.
-   */
-  class PluginInfo
-  {
-    private final DAGExecutionPlugin plugin;
-    private final Map<RegistrationType<?>, Handler<?>> registrationMap = new HashMap<>();
-
-    <T> void put(RegistrationType<T> registrationType, Handler<T> handler)
-    {
-      registrationMap.put(registrationType, handler);
-    }
-
-    <T> Handler<T> get(RegistrationType<T> registrationType)
-    {
-      return (Handler<T>)registrationMap.get(registrationType);
-    }
-
-    public PluginInfo(DAGExecutionPlugin plugin)
-    {
-      this.plugin = plugin;
-    }
-
-    public DAGExecutionPlugin getPlugin()
-    {
-      return plugin;
-    }
-  }
-
-  PluginInfo getPluginInfo(DAGExecutionPlugin plugin)
-  {
-    PluginInfo pInfo = pluginInfoMap.get(plugin);
-    if (pInfo == null) {
-      pInfo = new PluginInfo(plugin);
-      pluginInfoMap.put(plugin, pInfo);
-    }
-    return pInfo;
-  }
-
-  public <T> void register(RegistrationType<T> type, Handler<T> handler, DAGExecutionPlugin owner)
-  {
-    PluginInfo pInfo = getPluginInfo(owner);
-    pInfo.put(type, handler);
-  }
-
-  /**
-   * A wrapper PluginManager to track registration from a plugin. with this plugin
-   * don't need to pass explicit owner argument during registration.
-   */
-  class PluginManagerImpl extends AbstractDAGExecutionPluginContext
-  {
-    private final DAGExecutionPlugin owner;
-
-    PluginManagerImpl(DAGExecutionPlugin plugin)
-    {
-      super(appContext, dmgr, stats, launchConfig);
-      this.owner = plugin;
-    }
-
-    @Override
-    public <T> void register(RegistrationType<T> type, Handler<T> handler)
-    {
-      ApexPluginManager.this.register(type, handler, owner);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
index 0eee85e..bea011c 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
@@ -43,7 +43,7 @@ import com.datatorrent.stram.webapp.AppInfo;
  * service to process the event asynchronously. A separate task {@link DefaultApexPluginDispatcher.ProcessEventTask}
  * is created to process an event and then submitted to the executor for execution.
  */
-public class DefaultApexPluginDispatcher extends ApexPluginManager implements ApexPluginDispatcher
+public class DefaultApexPluginDispatcher extends AbstractApexPluginDispatcher
 {
   private static final Logger LOG = LoggerFactory.getLogger(DefaultApexPluginDispatcher.class);
   private int qsize = 4098;
@@ -56,7 +56,7 @@ public class DefaultApexPluginDispatcher extends ApexPluginManager implements Ap
   }
 
   @Override
-  public <T> void dispatch(RegistrationType<T> registrationType, T data)
+  protected <T> void dispatchEvent(RegistrationType<T> registrationType, T data)
   {
     if (executorService != null) {
       executorService.submit(new ProcessEventTask<>(registrationType, data));

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
index 4a64b10..5b8ca11 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
@@ -18,9 +18,8 @@
  */
 package org.apache.apex.engine.plugin;
 
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
 import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext;
@@ -38,8 +37,7 @@ public class DebugPlugin implements DAGExecutionPlugin
   private int eventCount = 0;
   private int heartbeatCount = 0;
   private int commitCount = 0;
-  private final Lock lock = new ReentrantLock();
-  final Condition events  = lock.newCondition();
+  CountDownLatch latch = new CountDownLatch(3);
 
   @Override
   public void setup(DAGExecutionPluginContext context)
@@ -49,10 +47,8 @@ public class DebugPlugin implements DAGExecutionPlugin
       @Override
       public void handle(StramEvent stramEvent)
       {
-        lock();
         eventCount++;
-        events.signal();
-        unlock();
+        latch.countDown();
       }
     });
 
@@ -61,10 +57,8 @@ public class DebugPlugin implements DAGExecutionPlugin
       @Override
       public void handle(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat)
       {
-        lock();
         heartbeatCount++;
-        events.signal();
-        unlock();
+        latch.countDown();
       }
     });
 
@@ -73,10 +67,8 @@ public class DebugPlugin implements DAGExecutionPlugin
       @Override
       public void handle(Long aLong)
       {
-        lock();
         commitCount++;
-        events.signal();
-        unlock();
+        latch.countDown();
       }
     });
   }
@@ -102,13 +94,8 @@ public class DebugPlugin implements DAGExecutionPlugin
     return commitCount;
   }
 
-  void lock()
+  public void waitForEventDelivery(long timeout) throws InterruptedException
   {
-    this.lock.lock();
-  }
-
-  void unlock()
-  {
-    this.lock.unlock();
+    latch.await(timeout, TimeUnit.SECONDS);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
index fda607f..84dc4ba 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
@@ -19,7 +19,6 @@
 package org.apache.apex.engine.plugin;
 
 import java.util.Collection;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -87,7 +86,6 @@ public class PluginTests
     ApexPluginDispatcher pluginManager = new DefaultApexPluginDispatcher(locator,
         new StramTestSupport.TestAppContext(new Attribute.AttributeMap.DefaultAttributeMap()), null, null);
     pluginManager.init(new Configuration());
-    int count = debugPlugin.getEventCount();
     pluginManager.dispatch(STRAM_EVENT, new StramEvent(StramEvent.LogLevel.DEBUG)
     {
       @Override
@@ -96,30 +94,9 @@ public class PluginTests
         return "TestEvent";
       }
     });
-
-    debugPlugin.lock();
-    while (debugPlugin.getEventCount() == count) {
-      debugPlugin.events.await(5, TimeUnit.SECONDS);
-    }
-    debugPlugin.unlock();
-
-    Assert.assertEquals("Total stram event received ", debugPlugin.getEventCount(), 1);
-
-    count = debugPlugin.getCommitCount();
     pluginManager.dispatch(COMMIT_EVENT, new Long(1234));
-    debugPlugin.lock();
-    while (debugPlugin.getCommitCount() == count) {
-      debugPlugin.events.await(5, TimeUnit.SECONDS);
-    }
-    debugPlugin.unlock();
-
-    count = debugPlugin.getHeartbeatCount();
     pluginManager.dispatch(HEARTBEAT, new StreamingContainerUmbilicalProtocol.ContainerHeartbeat());
-    debugPlugin.lock();
-    while (debugPlugin.getHeartbeatCount() == count) {
-      debugPlugin.events.await(5, TimeUnit.SECONDS);
-    }
-    debugPlugin.unlock();
+    debugPlugin.waitForEventDelivery(10);
     pluginManager.stop();
 
     Assert.assertEquals(1, debugPlugin.getEventCount());


[2/2] apex-core git commit: Merge branch 'APEXCORE-649' of github.com:tushargosavi/apex-core

Posted by pr...@apache.org.
Merge branch 'APEXCORE-649' of github.com:tushargosavi/apex-core


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/cfe9cefe
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/cfe9cefe
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/cfe9cefe

Branch: refs/heads/master
Commit: cfe9cefed1400f83dc95a17a1f1f8de921d552b1
Parents: 6cb3e35 d705ed4
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Apr 21 14:51:47 2017 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Fri Apr 21 14:51:47 2017 -0700

----------------------------------------------------------------------
 .../stram/StreamingAppMasterService.java        |   2 +
 .../stram/StreamingContainerManager.java        |   2 +
 .../plugin/AbstractApexPluginDispatcher.java    | 216 +++++++++++++++++++
 .../AbstractDAGExecutionPluginContext.java      |   9 +-
 .../engine/plugin/ApexPluginDispatcher.java     |   8 +
 .../apex/engine/plugin/ApexPluginManager.java   | 191 ----------------
 .../plugin/DefaultApexPluginDispatcher.java     |   4 +-
 .../apache/apex/engine/plugin/DebugPlugin.java  |  29 +--
 .../apache/apex/engine/plugin/PluginTests.java  |  25 +--
 9 files changed, 242 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/cfe9cefe/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------