You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2015/11/05 08:47:59 UTC

incubator-reef git commit: [REEF-893] Different comm groups should use different threads to handle group comm messages

Repository: incubator-reef
Updated Branches:
  refs/heads/master ad831a1d1 -> 2cf169d78


[REEF-893] Different comm groups should use different threads to handle group comm messages

This addressed the issue by
  * Change `groupCommMessageStage` field in `GroupCommDriverImpl` to use `SyncStage`
  * Each `CommunicationGroupDriverImpl` uses `SingleThreadStage` for message handling
  * Add test to examine corner case which fails with current implementation

JIRA:
  [REEF-893](https://issues.apache.org/jira/browse/REEF-893)

Pull request:
  This closes #609


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2cf169d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2cf169d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2cf169d7

Branch: refs/heads/master
Commit: 2cf169d782d730abb611036accc38d7b9cc568cd
Parents: ad831a1
Author: Gyeongin Yu <gy...@gmail.com>
Authored: Fri Oct 30 21:06:45 2015 +0900
Committer: Dongjoon Hyun <do...@apache.org>
Committed: Thu Nov 5 16:41:47 2015 +0900

----------------------------------------------------------------------
 .../driver/CommunicationGroupDriverFactory.java |  29 ++-
 .../driver/CommunicationGroupDriverImpl.java    |  16 +-
 .../group/impl/driver/GroupCommDriverImpl.java  |  11 +-
 .../impl/driver/GroupCommMessageHandler.java    |  22 ++-
 .../org/apache/reef/tests/group/MasterTask.java |  47 +++++
 .../tests/group/MultipleCommGroupsDriver.java   | 194 +++++++++++++++++++
 .../org/apache/reef/tests/group/SlaveTask.java  |  47 +++++
 .../apache/reef/tests/group/package-info.java   |  22 +++
 .../reef/tests/group/GroupCommTestSuite.java    |  29 +++
 .../tests/group/TestMultipleCommGroups.java     |  74 +++++++
 .../apache/reef/tests/group/package-info.java   |  22 +++
 11 files changed, 494 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
index 652d733..99ed305 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
@@ -53,16 +53,19 @@ public final class CommunicationGroupDriverFactory {
       @Parameter(GroupCommFailedTaskHandler.class)
           final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler,
       @Parameter(GroupCommFailedEvalHandler.class)
-          final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler) {
+          final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler,
+      final GroupCommMessageHandler groupCommMessageHandler) {
     injector = Tang.Factory.getTang().newInjector();
     injector.bindVolatileParameter(GroupCommSenderStage.class, senderStage);
     injector.bindVolatileParameter(DriverIdentifier.class, driverId);
     injector.bindVolatileParameter(GroupCommRunningTaskHandler.class, groupCommRunningTaskHandler);
     injector.bindVolatileParameter(GroupCommFailedTaskHandler.class, groupCommFailedTaskHandler);
     injector.bindVolatileParameter(GroupCommFailedEvalHandler.class, groupCommFailedEvaluatorHandler);
+    injector.bindVolatileInstance(GroupCommMessageHandler.class, groupCommMessageHandler);
   }
 
   /**
+   * @deprecated in 0.14.
    * Instantiates a new CommunicationGroupDriver instance.
    * @param groupName specified name of the communication group
    * @param topologyClass topology implementation
@@ -72,6 +75,7 @@ public final class CommunicationGroupDriverFactory {
    * @return CommunicationGroupDriver instance
    * @throws InjectionException
    */
+  @Deprecated
   public CommunicationGroupDriver getNewInstance(
       final Class<? extends Name<String>> groupName,
       final Class<? extends Topology> topologyClass,
@@ -87,4 +91,27 @@ public final class CommunicationGroupDriverFactory {
     newInjector.bindVolatileParameter(TreeTopologyFanOut.class, customFanOut);
     return newInjector.getInstance(CommunicationGroupDriver.class);
   }
+
+  /**
+   * Instantiates a new CommunicationGroupDriver instance.
+   * @param groupName specified name of the communication group
+   * @param topologyClass topology implementation
+   * @param numberOfTasks minimum number of tasks needed in this group before start
+   * @param customFanOut fanOut for TreeTopology
+   * @return CommunicationGroupDriver instance
+   * @throws InjectionException
+   */
+  public CommunicationGroupDriver getNewInstance(
+      final Class<? extends Name<String>> groupName,
+      final Class<? extends Topology> topologyClass,
+      final int numberOfTasks,
+      final int customFanOut) throws InjectionException {
+
+    final Injector newInjector = injector.forkInjector();
+    newInjector.bindVolatileParameter(CommGroupNameClass.class, groupName);
+    newInjector.bindVolatileParameter(TopologyClass.class, topologyClass);
+    newInjector.bindVolatileParameter(CommGroupNumTask.class, numberOfTasks);
+    newInjector.bindVolatileParameter(TreeTopologyFanOut.class, customFanOut);
+    return newInjector.getInstance(CommunicationGroupDriver.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
index c55ed3f..8312920 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
@@ -48,6 +48,7 @@ import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.tang.formats.ConfigurationSerializer;
 import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.impl.SingleThreadStage;
 
 import javax.inject.Inject;
 import java.util.*;
@@ -103,8 +104,10 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
     this.confSerializer = confSerializer;
     this.allTasksAdded = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
 
-    registerHandlers(groupCommRunningTaskHandler, groupCommFailedTaskHandler,
-        groupCommFailedEvaluatorHandler, commGroupMessageHandler);
+    groupCommRunningTaskHandler.addHandler(new TopologyRunningTaskHandler(this));
+    groupCommFailedTaskHandler.addHandler(new TopologyFailedTaskHandler(this));
+    groupCommFailedEvaluatorHandler.addHandler(new TopologyFailedEvaluatorHandler(this));
+    commGroupMessageHandler.addHandler(new TopologyMessageHandler(this));
     final Injector injector = Tang.Factory.getTang().newInjector();
     injector.bindVolatileParameter(CommGroupNameClass.class, groupName);
     injector.bindVolatileParameter(GroupCommSenderStage.class, senderStage);
@@ -129,8 +132,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
           final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler,
       @Parameter(GroupCommFailedEvalHandler.class)
           final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler,
-      @Parameter(CommGroupMessageHandler.class)
-          final BroadcastingEventHandler<GroupCommunicationMessage> commGroupMessageHandler,
+          final GroupCommMessageHandler groupCommMessageHandler,
       @Parameter(DriverIdentifier.class) final String driverId,
       @Parameter(CommGroupNumTask.class) final int numberOfTasks,
       final TopologyFactory topologyFactory,
@@ -142,7 +144,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
     this.allTasksAdded = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
 
     registerHandlers(groupCommRunningTaskHandler, groupCommFailedTaskHandler,
-        groupCommFailedEvaluatorHandler, commGroupMessageHandler);
+        groupCommFailedEvaluatorHandler, groupCommMessageHandler);
     this.topologyFactory = topologyFactory;
     this.topologyClass = topologyClass;
   }
@@ -151,11 +153,11 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
       final BroadcastingEventHandler<RunningTask> runningTaskHandler,
       final BroadcastingEventHandler<FailedTask> failedTaskHandler,
       final BroadcastingEventHandler<FailedEvaluator> failedEvaluatorHandler,
-      final BroadcastingEventHandler<GroupCommunicationMessage> groupCommMessageHandler) {
+      final GroupCommMessageHandler groupCommMessageHandler) {
     runningTaskHandler.addHandler(new TopologyRunningTaskHandler(this));
     failedTaskHandler.addHandler(new TopologyFailedTaskHandler(this));
     failedEvaluatorHandler.addHandler(new TopologyFailedEvaluatorHandler(this));
-    groupCommMessageHandler.addHandler(new TopologyMessageHandler(this));
+    groupCommMessageHandler.addHandler(groupName, new SingleThreadStage<>(new TopologyMessageHandler(this), 100 * 100));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
index 5a6a7a9..cb6b3ca 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
@@ -56,7 +56,6 @@ import org.apache.reef.wake.EStage;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.IdentifierFactory;
 import org.apache.reef.wake.impl.LoggingEventHandler;
-import org.apache.reef.wake.impl.SingleThreadStage;
 import org.apache.reef.wake.impl.SyncStage;
 import org.apache.reef.wake.impl.ThreadPoolStage;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
@@ -148,7 +147,7 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
     this.groupCommFailedEvaluatorStage = new SyncStage<>("GroupCommFailedEvaluatorStage",
         groupCommFailedEvaluatorHandler);
     this.groupCommMessageHandler = new GroupCommMessageHandler();
-    this.groupCommMessageStage = new SingleThreadStage<>("GroupCommMessageStage", groupCommMessageHandler, 100 * 1000);
+    this.groupCommMessageStage = new SyncStage<>("GroupCommMessageStage", groupCommMessageHandler);
 
     final Configuration nameResolverConf = Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
         .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, nameServiceAddr)
@@ -182,6 +181,7 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
     injector.bindVolatileParameter(GroupCommRunningTaskHandler.class, groupCommRunningTaskHandler);
     injector.bindVolatileParameter(GroupCommFailedTaskHandler.class, groupCommFailedTaskHandler);
     injector.bindVolatileParameter(GroupCommFailedEvalHandler.class, groupCommFailedEvaluatorHandler);
+    injector.bindVolatileInstance(GroupCommMessageHandler.class, groupCommMessageHandler);
 
     try {
       commGroupDriverFactory = injector.getInstance(CommunicationGroupDriverFactory.class);
@@ -210,19 +210,16 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
     LOG.entering("GroupCommDriverImpl", "newCommunicationGroup",
         new Object[]{Utils.simpleName(groupName), numberOfTasks});
 
-    final BroadcastingEventHandler<GroupCommunicationMessage> commGroupMessageHandler
-        = new BroadcastingEventHandler<>();
     final CommunicationGroupDriver commGroupDriver;
     try {
-      commGroupDriver = commGroupDriverFactory.getNewInstance(
-          groupName, topologyClass, commGroupMessageHandler, numberOfTasks, customFanOut);
+      commGroupDriver
+          = commGroupDriverFactory.getNewInstance(groupName, topologyClass, numberOfTasks, customFanOut);
     } catch (final InjectionException e) {
       LOG.log(Level.WARNING, "Cannot inject new CommunicationGroupDriver");
       throw new RuntimeException(e);
     }
 
     commGroupDrivers.put(groupName, commGroupDriver);
-    groupCommMessageHandler.addHandler(groupName, commGroupMessageHandler);
     LOG.exiting("GroupCommDriverImpl", "newCommunicationGroup",
         "Created communication group: " + Utils.simpleName(groupName));
     return commGroupDriver;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommMessageHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommMessageHandler.java
index 06547de..2735b52 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommMessageHandler.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommMessageHandler.java
@@ -22,7 +22,10 @@ import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
 import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
 import org.apache.reef.io.network.group.impl.utils.Utils;
 import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.AbstractEStage;
 import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.SingleThreadStage;
+import org.apache.reef.wake.impl.SyncStage;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -35,13 +38,24 @@ public class GroupCommMessageHandler implements EventHandler<GroupCommunicationM
 
   private static final Logger LOG = Logger.getLogger(GroupCommMessageHandler.class.getName());
 
-  private final Map<Class<? extends Name<String>>, BroadcastingEventHandler<GroupCommunicationMessage>>
-      commGroupMessageHandlers = new HashMap<>();
+  private final Map<Class<? extends Name<String>>, AbstractEStage<GroupCommunicationMessage>>
+      commGroupMessageStages = new HashMap<>();
 
+  /**
+   * @deprecated in 0.14.
+   */
+  @Deprecated
   public void addHandler(final Class<? extends Name<String>> groupName,
                          final BroadcastingEventHandler<GroupCommunicationMessage> handler) {
     LOG.entering("GroupCommMessageHandler", "addHandler", new Object[]{Utils.simpleName(groupName), handler});
-    commGroupMessageHandlers.put(groupName, handler);
+    commGroupMessageStages.put(groupName, new SyncStage<>(groupName.getName(), handler));
+    LOG.exiting("GroupCommMessageHandler", "addHandler", Utils.simpleName(groupName));
+  }
+
+  public void addHandler(final Class<? extends Name<String>> groupName,
+                         final SingleThreadStage<GroupCommunicationMessage> stage) {
+    LOG.entering("GroupCommMessageHandler", "addHandler", new Object[]{Utils.simpleName(groupName), stage});
+    commGroupMessageStages.put(groupName, stage);
     LOG.exiting("GroupCommMessageHandler", "addHandler", Utils.simpleName(groupName));
   }
 
@@ -49,7 +63,7 @@ public class GroupCommMessageHandler implements EventHandler<GroupCommunicationM
   public void onNext(final GroupCommunicationMessage msg) {
     LOG.entering("GroupCommMessageHandler", "onNext", msg);
     final Class<? extends Name<String>> groupName = Utils.getClass(msg.getGroupname());
-    commGroupMessageHandlers.get(groupName).onNext(msg);
+    commGroupMessageStages.get(groupName).onNext(msg);
     LOG.exiting("GroupCommMessageHandler", "onNext", msg);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MasterTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MasterTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MasterTask.java
new file mode 100644
index 0000000..239d903
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MasterTask.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group;
+
+import org.apache.reef.io.network.group.api.task.CommunicationGroupClient;
+import org.apache.reef.io.network.group.api.task.GroupCommClient;
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ * Master task used for testing multiple communication groups.
+ */
+public final class MasterTask implements Task {
+
+  private CommunicationGroupClient commGroupClient;
+
+  @Inject
+  private MasterTask(final GroupCommClient groupCommClient) {
+    this.commGroupClient = groupCommClient.getCommunicationGroup(MultipleCommGroupsDriver.Group1.class);
+    if (commGroupClient == null) {
+      commGroupClient = groupCommClient.getCommunicationGroup(MultipleCommGroupsDriver.Group2.class);
+    }
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) throws Exception {
+    commGroupClient.getBroadcastSender(MultipleCommGroupsDriver.BroadcastOperatorName.class).send("Hello");
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MultipleCommGroupsDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MultipleCommGroupsDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MultipleCommGroupsDriver.java
new file mode 100644
index 0000000..3582746
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MultipleCommGroupsDriver.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
+import org.apache.reef.io.network.group.api.driver.GroupCommDriver;
+import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.io.serialization.SerializableCodec;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver used for testing multiple communication groups.
+ */
+@DriverSide
+@Unit
+public final class MultipleCommGroupsDriver {
+  private static final Logger LOG = Logger.getLogger(MultipleCommGroupsDriver.class.getName());
+
+  private final EvaluatorRequestor requestor;
+  private final GroupCommDriver groupCommDriver;
+
+  private final String[][] taskIds;
+  private final AtomicInteger[] taskCounter;
+  private final List<CommunicationGroupDriver> commGroupDriverList;
+  private final List<ActiveContext> activeContextsToBeHandled;
+
+  @Inject
+  private MultipleCommGroupsDriver(final EvaluatorRequestor requestor,
+                                   final GroupCommDriver groupCommDriver) {
+    this.requestor = requestor;
+    this.groupCommDriver = groupCommDriver;
+    taskIds = new String[][]{
+        {"MasterTask-1", "SlaveTask-1-1", "SlaveTask-1-2", "SlaveTask-1-3"},
+        {"MasterTask-2", "SlaveTask-2-1"}
+    };
+    taskCounter = new AtomicInteger[]{new AtomicInteger(0), new AtomicInteger(0)};
+    commGroupDriverList = new ArrayList<>(2);
+    activeContextsToBeHandled = new ArrayList<>(2);
+    initializeCommGroups();
+  }
+
+  private void initializeCommGroups() {
+    commGroupDriverList.add(groupCommDriver.newCommunicationGroup(Group1.class, 4));
+    commGroupDriverList.add(groupCommDriver.newCommunicationGroup(Group2.class, 2));
+    commGroupDriverList.get(0).addBroadcast(BroadcastOperatorName.class,
+        BroadcastOperatorSpec.newBuilder()
+            .setSenderId(taskIds[0][0])
+            .setDataCodecClass(SerializableCodec.class)
+            .build());
+    commGroupDriverList.get(1).addBroadcast(BroadcastOperatorName.class,
+        BroadcastOperatorSpec.newBuilder()
+            .setSenderId(taskIds[1][0])
+            .setDataCodecClass(SerializableCodec.class)
+            .build());
+  }
+
+  final class StartHandler implements EventHandler<StartTime> {
+
+    @Override
+    public void onNext(final StartTime startTime) {
+      requestor.submit(EvaluatorRequest.newBuilder()
+          .setNumber(4)
+          .setMemory(128)
+          .build());
+    }
+  }
+
+  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+
+    @Override
+    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+      LOG.log(Level.INFO, "Evaluator allocated {0}", allocatedEvaluator);
+      allocatedEvaluator.submitContextAndService(
+          groupCommDriver.getContextConfiguration(), groupCommDriver.getServiceConfiguration());
+    }
+  }
+
+  final class ContextActiveHandler implements EventHandler<ActiveContext> {
+    private final AtomicInteger contextCounter = new AtomicInteger(0);
+
+    @Override
+    public void onNext(final ActiveContext activeContext) {
+      final int count = contextCounter.getAndIncrement();
+
+      if (count <= 1) {
+        LOG.log(Level.INFO, "{0} will be handled after tasks in Group1 started", activeContext);
+        activeContextsToBeHandled.add(activeContext);
+      } else {
+        // Add task to Group1
+        submitTask(activeContext, 0);
+      }
+    }
+  }
+
+  final class TaskRunningHandler implements EventHandler<RunningTask> {
+    private final AtomicInteger runningTaskCounter = new AtomicInteger(0);
+
+    @Override
+    public void onNext(final RunningTask runningTask) {
+      LOG.log(Level.INFO, "{0} has started", runningTask);
+      final int count = runningTaskCounter.getAndIncrement();
+      // After two tasks has started, submit tasks to the active contexts in activeContextsToBeHandled
+      if (count == 1) {
+        for (final ActiveContext activeContext : activeContextsToBeHandled) {
+          // Add task to Group2
+          submitTask(activeContext, 1);
+        }
+      }
+    }
+  }
+
+  private void submitTask(final ActiveContext activeContext, final int groupIndex) {
+    final String taskId = taskIds[groupIndex][taskCounter[groupIndex].getAndIncrement()];
+    LOG.log(Level.INFO, "Got active context {0}. Submit {1}", new Object[]{activeContext, taskId});
+    final Configuration partialTaskConf;
+    if (taskId.equals(taskIds[groupIndex][0])) {
+      partialTaskConf = TaskConfiguration.CONF
+          .set(TaskConfiguration.IDENTIFIER, taskId)
+          .set(TaskConfiguration.TASK, MasterTask.class)
+          .build();
+    } else {
+      partialTaskConf = TaskConfiguration.CONF
+          .set(TaskConfiguration.IDENTIFIER, taskId)
+          .set(TaskConfiguration.TASK, SlaveTask.class)
+          .build();
+    }
+    commGroupDriverList.get(groupIndex).addTask(partialTaskConf);
+    activeContext.submitTask(groupCommDriver.getTaskConfiguration(partialTaskConf));
+  }
+
+  final class TaskCompletedHandler implements EventHandler<CompletedTask> {
+    private final AtomicInteger completedTaskCounter = new AtomicInteger(0);
+
+    @Override
+    public void onNext(final CompletedTask completedTask) {
+      final int count = completedTaskCounter.getAndIncrement();
+      LOG.log(Level.INFO, "{0} has completed.", completedTask);
+      if (count <= 1) {
+        // Add task to Group1
+        submitTask(completedTask.getActiveContext(), 0);
+      } else {
+        completedTask.getActiveContext().close();
+      }
+    }
+  }
+
+  @NamedParameter()
+  final class Group1 implements Name<String> {
+  }
+
+  @NamedParameter()
+  final class Group2 implements Name<String> {
+  }
+
+  @NamedParameter()
+  final class BroadcastOperatorName implements Name<String> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/SlaveTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/SlaveTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/SlaveTask.java
new file mode 100644
index 0000000..cc8a76b
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/SlaveTask.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group;
+
+import org.apache.reef.io.network.group.api.task.CommunicationGroupClient;
+import org.apache.reef.io.network.group.api.task.GroupCommClient;
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ * Slave task used for testing multiple communication groups.
+ */
+public final class SlaveTask implements Task {
+
+  private CommunicationGroupClient commGroupClient;
+
+  @Inject
+  private SlaveTask(final GroupCommClient groupCommClient) {
+    this.commGroupClient = groupCommClient.getCommunicationGroup(MultipleCommGroupsDriver.Group1.class);
+    if (commGroupClient == null) {
+      commGroupClient = groupCommClient.getCommunicationGroup(MultipleCommGroupsDriver.Group2.class);
+    }
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) throws Exception {
+    commGroupClient.getBroadcastReceiver(MultipleCommGroupsDriver.BroadcastOperatorName.class).receive();
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/package-info.java
new file mode 100644
index 0000000..c67ea9e
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests for group communication.
+ */
+package org.apache.reef.tests.group;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java
new file mode 100644
index 0000000..8e990cb
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    TestMultipleCommGroups.class
+    })
+public final class GroupCommTestSuite {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/TestMultipleCommGroups.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/TestMultipleCommGroups.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/TestMultipleCommGroups.java
new file mode 100644
index 0000000..5dad294
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/TestMultipleCommGroups.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.io.network.group.impl.driver.GroupCommService;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.util.EnvironmentUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Launch Group Communication test using multiple communication groups.
+ */
+public class TestMultipleCommGroups {
+  private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment();
+
+  /**
+   * Set up the test environment.
+   */
+  @Before
+  public void setUp() throws Exception {
+    this.testEnvironment.setUp();
+  }
+
+  /**
+   * Tear down the test environment.
+   */
+  @After
+  public void tearDown() throws Exception {
+    this.testEnvironment.tearDown();
+  }
+
+  /**
+   * Run the MultipleCommGroups test.
+   */
+  @Test
+  public void testMultipleCommGroups() {
+    final Configuration driverConf = DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(MultipleCommGroupsDriver.class))
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_MultipleCommGroups")
+        .set(DriverConfiguration.ON_DRIVER_STARTED, MultipleCommGroupsDriver.StartHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, MultipleCommGroupsDriver.EvaluatorAllocatedHandler.class)
+        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, MultipleCommGroupsDriver.ContextActiveHandler.class)
+        .set(DriverConfiguration.ON_TASK_RUNNING, MultipleCommGroupsDriver.TaskRunningHandler.class)
+        .set(DriverConfiguration.ON_TASK_COMPLETED, MultipleCommGroupsDriver.TaskCompletedHandler.class)
+        .build();
+    final Configuration groupCommConf = GroupCommService.getConfiguration();
+    final LauncherStatus state = this.testEnvironment.run(Configurations.merge(driverConf, groupCommConf));
+    Assert.assertTrue("Job state after execution: " + state, state.isSuccess());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/package-info.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/package-info.java
new file mode 100644
index 0000000..c67ea9e
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests for group communication.
+ */
+package org.apache.reef.tests.group;