You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2015/11/05 08:47:59 UTC
incubator-reef git commit: [REEF-893] Different comm groups should
use different threads to handle group comm messages
Repository: incubator-reef
Updated Branches:
refs/heads/master ad831a1d1 -> 2cf169d78
[REEF-893] Different comm groups should use different threads to handle group comm messages
This addressed the issue by
* Change `groupCommMessageStage` field in `GroupCommDriverImpl` to use `SyncStage`
* Each `CommunicationGroupDriverImpl` uses `SingleThreadStage` for message handling
* Add test to examine corner case which fails with current implementation
JIRA:
[REEF-893](https://issues.apache.org/jira/browse/REEF-893)
Pull request:
This closes #609
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2cf169d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2cf169d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2cf169d7
Branch: refs/heads/master
Commit: 2cf169d782d730abb611036accc38d7b9cc568cd
Parents: ad831a1
Author: Gyeongin Yu <gy...@gmail.com>
Authored: Fri Oct 30 21:06:45 2015 +0900
Committer: Dongjoon Hyun <do...@apache.org>
Committed: Thu Nov 5 16:41:47 2015 +0900
----------------------------------------------------------------------
.../driver/CommunicationGroupDriverFactory.java | 29 ++-
.../driver/CommunicationGroupDriverImpl.java | 16 +-
.../group/impl/driver/GroupCommDriverImpl.java | 11 +-
.../impl/driver/GroupCommMessageHandler.java | 22 ++-
.../org/apache/reef/tests/group/MasterTask.java | 47 +++++
.../tests/group/MultipleCommGroupsDriver.java | 194 +++++++++++++++++++
.../org/apache/reef/tests/group/SlaveTask.java | 47 +++++
.../apache/reef/tests/group/package-info.java | 22 +++
.../reef/tests/group/GroupCommTestSuite.java | 29 +++
.../tests/group/TestMultipleCommGroups.java | 74 +++++++
.../apache/reef/tests/group/package-info.java | 22 +++
11 files changed, 494 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
index 652d733..99ed305 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
@@ -53,16 +53,19 @@ public final class CommunicationGroupDriverFactory {
@Parameter(GroupCommFailedTaskHandler.class)
final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler,
@Parameter(GroupCommFailedEvalHandler.class)
- final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler) {
+ final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler,
+ final GroupCommMessageHandler groupCommMessageHandler) {
injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(GroupCommSenderStage.class, senderStage);
injector.bindVolatileParameter(DriverIdentifier.class, driverId);
injector.bindVolatileParameter(GroupCommRunningTaskHandler.class, groupCommRunningTaskHandler);
injector.bindVolatileParameter(GroupCommFailedTaskHandler.class, groupCommFailedTaskHandler);
injector.bindVolatileParameter(GroupCommFailedEvalHandler.class, groupCommFailedEvaluatorHandler);
+ injector.bindVolatileInstance(GroupCommMessageHandler.class, groupCommMessageHandler);
}
/**
+ * @deprecated in 0.14.
* Instantiates a new CommunicationGroupDriver instance.
* @param groupName specified name of the communication group
* @param topologyClass topology implementation
@@ -72,6 +75,7 @@ public final class CommunicationGroupDriverFactory {
* @return CommunicationGroupDriver instance
* @throws InjectionException
*/
+ @Deprecated
public CommunicationGroupDriver getNewInstance(
final Class<? extends Name<String>> groupName,
final Class<? extends Topology> topologyClass,
@@ -87,4 +91,27 @@ public final class CommunicationGroupDriverFactory {
newInjector.bindVolatileParameter(TreeTopologyFanOut.class, customFanOut);
return newInjector.getInstance(CommunicationGroupDriver.class);
}
+
+ /**
+ * Instantiates a new CommunicationGroupDriver instance.
+ * @param groupName specified name of the communication group
+ * @param topologyClass topology implementation
+ * @param numberOfTasks minimum number of tasks needed in this group before start
+ * @param customFanOut fanOut for TreeTopology
+ * @return CommunicationGroupDriver instance
+ * @throws InjectionException
+ */
+ public CommunicationGroupDriver getNewInstance(
+ final Class<? extends Name<String>> groupName,
+ final Class<? extends Topology> topologyClass,
+ final int numberOfTasks,
+ final int customFanOut) throws InjectionException {
+
+ final Injector newInjector = injector.forkInjector();
+ newInjector.bindVolatileParameter(CommGroupNameClass.class, groupName);
+ newInjector.bindVolatileParameter(TopologyClass.class, topologyClass);
+ newInjector.bindVolatileParameter(CommGroupNumTask.class, numberOfTasks);
+ newInjector.bindVolatileParameter(TreeTopologyFanOut.class, customFanOut);
+ return newInjector.getInstance(CommunicationGroupDriver.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
index c55ed3f..8312920 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
@@ -48,6 +48,7 @@ import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.impl.SingleThreadStage;
import javax.inject.Inject;
import java.util.*;
@@ -103,8 +104,10 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
this.confSerializer = confSerializer;
this.allTasksAdded = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
- registerHandlers(groupCommRunningTaskHandler, groupCommFailedTaskHandler,
- groupCommFailedEvaluatorHandler, commGroupMessageHandler);
+ groupCommRunningTaskHandler.addHandler(new TopologyRunningTaskHandler(this));
+ groupCommFailedTaskHandler.addHandler(new TopologyFailedTaskHandler(this));
+ groupCommFailedEvaluatorHandler.addHandler(new TopologyFailedEvaluatorHandler(this));
+ commGroupMessageHandler.addHandler(new TopologyMessageHandler(this));
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(CommGroupNameClass.class, groupName);
injector.bindVolatileParameter(GroupCommSenderStage.class, senderStage);
@@ -129,8 +132,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler,
@Parameter(GroupCommFailedEvalHandler.class)
final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler,
- @Parameter(CommGroupMessageHandler.class)
- final BroadcastingEventHandler<GroupCommunicationMessage> commGroupMessageHandler,
+ final GroupCommMessageHandler groupCommMessageHandler,
@Parameter(DriverIdentifier.class) final String driverId,
@Parameter(CommGroupNumTask.class) final int numberOfTasks,
final TopologyFactory topologyFactory,
@@ -142,7 +144,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
this.allTasksAdded = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
registerHandlers(groupCommRunningTaskHandler, groupCommFailedTaskHandler,
- groupCommFailedEvaluatorHandler, commGroupMessageHandler);
+ groupCommFailedEvaluatorHandler, groupCommMessageHandler);
this.topologyFactory = topologyFactory;
this.topologyClass = topologyClass;
}
@@ -151,11 +153,11 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
final BroadcastingEventHandler<RunningTask> runningTaskHandler,
final BroadcastingEventHandler<FailedTask> failedTaskHandler,
final BroadcastingEventHandler<FailedEvaluator> failedEvaluatorHandler,
- final BroadcastingEventHandler<GroupCommunicationMessage> groupCommMessageHandler) {
+ final GroupCommMessageHandler groupCommMessageHandler) {
runningTaskHandler.addHandler(new TopologyRunningTaskHandler(this));
failedTaskHandler.addHandler(new TopologyFailedTaskHandler(this));
failedEvaluatorHandler.addHandler(new TopologyFailedEvaluatorHandler(this));
- groupCommMessageHandler.addHandler(new TopologyMessageHandler(this));
+ groupCommMessageHandler.addHandler(groupName, new SingleThreadStage<>(new TopologyMessageHandler(this), 100 * 100));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
index 5a6a7a9..cb6b3ca 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
@@ -56,7 +56,6 @@ import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.impl.LoggingEventHandler;
-import org.apache.reef.wake.impl.SingleThreadStage;
import org.apache.reef.wake.impl.SyncStage;
import org.apache.reef.wake.impl.ThreadPoolStage;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
@@ -148,7 +147,7 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
this.groupCommFailedEvaluatorStage = new SyncStage<>("GroupCommFailedEvaluatorStage",
groupCommFailedEvaluatorHandler);
this.groupCommMessageHandler = new GroupCommMessageHandler();
- this.groupCommMessageStage = new SingleThreadStage<>("GroupCommMessageStage", groupCommMessageHandler, 100 * 1000);
+ this.groupCommMessageStage = new SyncStage<>("GroupCommMessageStage", groupCommMessageHandler);
final Configuration nameResolverConf = Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
.set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, nameServiceAddr)
@@ -182,6 +181,7 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
injector.bindVolatileParameter(GroupCommRunningTaskHandler.class, groupCommRunningTaskHandler);
injector.bindVolatileParameter(GroupCommFailedTaskHandler.class, groupCommFailedTaskHandler);
injector.bindVolatileParameter(GroupCommFailedEvalHandler.class, groupCommFailedEvaluatorHandler);
+ injector.bindVolatileInstance(GroupCommMessageHandler.class, groupCommMessageHandler);
try {
commGroupDriverFactory = injector.getInstance(CommunicationGroupDriverFactory.class);
@@ -210,19 +210,16 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
LOG.entering("GroupCommDriverImpl", "newCommunicationGroup",
new Object[]{Utils.simpleName(groupName), numberOfTasks});
- final BroadcastingEventHandler<GroupCommunicationMessage> commGroupMessageHandler
- = new BroadcastingEventHandler<>();
final CommunicationGroupDriver commGroupDriver;
try {
- commGroupDriver = commGroupDriverFactory.getNewInstance(
- groupName, topologyClass, commGroupMessageHandler, numberOfTasks, customFanOut);
+ commGroupDriver
+ = commGroupDriverFactory.getNewInstance(groupName, topologyClass, numberOfTasks, customFanOut);
} catch (final InjectionException e) {
LOG.log(Level.WARNING, "Cannot inject new CommunicationGroupDriver");
throw new RuntimeException(e);
}
commGroupDrivers.put(groupName, commGroupDriver);
- groupCommMessageHandler.addHandler(groupName, commGroupMessageHandler);
LOG.exiting("GroupCommDriverImpl", "newCommunicationGroup",
"Created communication group: " + Utils.simpleName(groupName));
return commGroupDriver;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommMessageHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommMessageHandler.java
index 06547de..2735b52 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommMessageHandler.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommMessageHandler.java
@@ -22,7 +22,10 @@ import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.AbstractEStage;
import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.SingleThreadStage;
+import org.apache.reef.wake.impl.SyncStage;
import java.util.HashMap;
import java.util.Map;
@@ -35,13 +38,24 @@ public class GroupCommMessageHandler implements EventHandler<GroupCommunicationM
private static final Logger LOG = Logger.getLogger(GroupCommMessageHandler.class.getName());
- private final Map<Class<? extends Name<String>>, BroadcastingEventHandler<GroupCommunicationMessage>>
- commGroupMessageHandlers = new HashMap<>();
+ private final Map<Class<? extends Name<String>>, AbstractEStage<GroupCommunicationMessage>>
+ commGroupMessageStages = new HashMap<>();
+ /**
+ * @deprecated in 0.14.
+ */
+ @Deprecated
public void addHandler(final Class<? extends Name<String>> groupName,
final BroadcastingEventHandler<GroupCommunicationMessage> handler) {
LOG.entering("GroupCommMessageHandler", "addHandler", new Object[]{Utils.simpleName(groupName), handler});
- commGroupMessageHandlers.put(groupName, handler);
+ commGroupMessageStages.put(groupName, new SyncStage<>(groupName.getName(), handler));
+ LOG.exiting("GroupCommMessageHandler", "addHandler", Utils.simpleName(groupName));
+ }
+
+ public void addHandler(final Class<? extends Name<String>> groupName,
+ final SingleThreadStage<GroupCommunicationMessage> stage) {
+ LOG.entering("GroupCommMessageHandler", "addHandler", new Object[]{Utils.simpleName(groupName), stage});
+ commGroupMessageStages.put(groupName, stage);
LOG.exiting("GroupCommMessageHandler", "addHandler", Utils.simpleName(groupName));
}
@@ -49,7 +63,7 @@ public class GroupCommMessageHandler implements EventHandler<GroupCommunicationM
public void onNext(final GroupCommunicationMessage msg) {
LOG.entering("GroupCommMessageHandler", "onNext", msg);
final Class<? extends Name<String>> groupName = Utils.getClass(msg.getGroupname());
- commGroupMessageHandlers.get(groupName).onNext(msg);
+ commGroupMessageStages.get(groupName).onNext(msg);
LOG.exiting("GroupCommMessageHandler", "onNext", msg);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MasterTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MasterTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MasterTask.java
new file mode 100644
index 0000000..239d903
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MasterTask.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group;
+
+import org.apache.reef.io.network.group.api.task.CommunicationGroupClient;
+import org.apache.reef.io.network.group.api.task.GroupCommClient;
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ * Master task used for testing multiple communication groups.
+ */
+public final class MasterTask implements Task {
+
+ private CommunicationGroupClient commGroupClient;
+
+ @Inject
+ private MasterTask(final GroupCommClient groupCommClient) {
+ this.commGroupClient = groupCommClient.getCommunicationGroup(MultipleCommGroupsDriver.Group1.class);
+ if (commGroupClient == null) {
+ commGroupClient = groupCommClient.getCommunicationGroup(MultipleCommGroupsDriver.Group2.class);
+ }
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) throws Exception {
+ commGroupClient.getBroadcastSender(MultipleCommGroupsDriver.BroadcastOperatorName.class).send("Hello");
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MultipleCommGroupsDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MultipleCommGroupsDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MultipleCommGroupsDriver.java
new file mode 100644
index 0000000..3582746
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/MultipleCommGroupsDriver.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
+import org.apache.reef.io.network.group.api.driver.GroupCommDriver;
+import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.io.serialization.SerializableCodec;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver used for testing multiple communication groups.
+ */
+@DriverSide
+@Unit
+public final class MultipleCommGroupsDriver {
+ private static final Logger LOG = Logger.getLogger(MultipleCommGroupsDriver.class.getName());
+
+ private final EvaluatorRequestor requestor;
+ private final GroupCommDriver groupCommDriver;
+
+ private final String[][] taskIds;
+ private final AtomicInteger[] taskCounter;
+ private final List<CommunicationGroupDriver> commGroupDriverList;
+ private final List<ActiveContext> activeContextsToBeHandled;
+
+ @Inject
+ private MultipleCommGroupsDriver(final EvaluatorRequestor requestor,
+ final GroupCommDriver groupCommDriver) {
+ this.requestor = requestor;
+ this.groupCommDriver = groupCommDriver;
+ taskIds = new String[][]{
+ {"MasterTask-1", "SlaveTask-1-1", "SlaveTask-1-2", "SlaveTask-1-3"},
+ {"MasterTask-2", "SlaveTask-2-1"}
+ };
+ taskCounter = new AtomicInteger[]{new AtomicInteger(0), new AtomicInteger(0)};
+ commGroupDriverList = new ArrayList<>(2);
+ activeContextsToBeHandled = new ArrayList<>(2);
+ initializeCommGroups();
+ }
+
+ private void initializeCommGroups() {
+ commGroupDriverList.add(groupCommDriver.newCommunicationGroup(Group1.class, 4));
+ commGroupDriverList.add(groupCommDriver.newCommunicationGroup(Group2.class, 2));
+ commGroupDriverList.get(0).addBroadcast(BroadcastOperatorName.class,
+ BroadcastOperatorSpec.newBuilder()
+ .setSenderId(taskIds[0][0])
+ .setDataCodecClass(SerializableCodec.class)
+ .build());
+ commGroupDriverList.get(1).addBroadcast(BroadcastOperatorName.class,
+ BroadcastOperatorSpec.newBuilder()
+ .setSenderId(taskIds[1][0])
+ .setDataCodecClass(SerializableCodec.class)
+ .build());
+ }
+
+ final class StartHandler implements EventHandler<StartTime> {
+
+ @Override
+ public void onNext(final StartTime startTime) {
+ requestor.submit(EvaluatorRequest.newBuilder()
+ .setNumber(4)
+ .setMemory(128)
+ .build());
+ }
+ }
+
+ final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+
+ @Override
+ public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+ LOG.log(Level.INFO, "Evaluator allocated {0}", allocatedEvaluator);
+ allocatedEvaluator.submitContextAndService(
+ groupCommDriver.getContextConfiguration(), groupCommDriver.getServiceConfiguration());
+ }
+ }
+
+ final class ContextActiveHandler implements EventHandler<ActiveContext> {
+ private final AtomicInteger contextCounter = new AtomicInteger(0);
+
+ @Override
+ public void onNext(final ActiveContext activeContext) {
+ final int count = contextCounter.getAndIncrement();
+
+ if (count <= 1) {
+ LOG.log(Level.INFO, "{0} will be handled after tasks in Group1 started", activeContext);
+ activeContextsToBeHandled.add(activeContext);
+ } else {
+ // Add task to Group1
+ submitTask(activeContext, 0);
+ }
+ }
+ }
+
+ final class TaskRunningHandler implements EventHandler<RunningTask> {
+ private final AtomicInteger runningTaskCounter = new AtomicInteger(0);
+
+ @Override
+ public void onNext(final RunningTask runningTask) {
+ LOG.log(Level.INFO, "{0} has started", runningTask);
+ final int count = runningTaskCounter.getAndIncrement();
+ // After two tasks has started, submit tasks to the active contexts in activeContextsToBeHandled
+ if (count == 1) {
+ for (final ActiveContext activeContext : activeContextsToBeHandled) {
+ // Add task to Group2
+ submitTask(activeContext, 1);
+ }
+ }
+ }
+ }
+
+ private void submitTask(final ActiveContext activeContext, final int groupIndex) {
+ final String taskId = taskIds[groupIndex][taskCounter[groupIndex].getAndIncrement()];
+ LOG.log(Level.INFO, "Got active context {0}. Submit {1}", new Object[]{activeContext, taskId});
+ final Configuration partialTaskConf;
+ if (taskId.equals(taskIds[groupIndex][0])) {
+ partialTaskConf = TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, taskId)
+ .set(TaskConfiguration.TASK, MasterTask.class)
+ .build();
+ } else {
+ partialTaskConf = TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, taskId)
+ .set(TaskConfiguration.TASK, SlaveTask.class)
+ .build();
+ }
+ commGroupDriverList.get(groupIndex).addTask(partialTaskConf);
+ activeContext.submitTask(groupCommDriver.getTaskConfiguration(partialTaskConf));
+ }
+
+ final class TaskCompletedHandler implements EventHandler<CompletedTask> {
+ private final AtomicInteger completedTaskCounter = new AtomicInteger(0);
+
+ @Override
+ public void onNext(final CompletedTask completedTask) {
+ final int count = completedTaskCounter.getAndIncrement();
+ LOG.log(Level.INFO, "{0} has completed.", completedTask);
+ if (count <= 1) {
+ // Add task to Group1
+ submitTask(completedTask.getActiveContext(), 0);
+ } else {
+ completedTask.getActiveContext().close();
+ }
+ }
+ }
+
+ @NamedParameter()
+ final class Group1 implements Name<String> {
+ }
+
+ @NamedParameter()
+ final class Group2 implements Name<String> {
+ }
+
+ @NamedParameter()
+ final class BroadcastOperatorName implements Name<String> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/SlaveTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/SlaveTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/SlaveTask.java
new file mode 100644
index 0000000..cc8a76b
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/SlaveTask.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group;
+
+import org.apache.reef.io.network.group.api.task.CommunicationGroupClient;
+import org.apache.reef.io.network.group.api.task.GroupCommClient;
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ * Slave task used for testing multiple communication groups.
+ */
+public final class SlaveTask implements Task {
+
+ private CommunicationGroupClient commGroupClient;
+
+ @Inject
+ private SlaveTask(final GroupCommClient groupCommClient) {
+ this.commGroupClient = groupCommClient.getCommunicationGroup(MultipleCommGroupsDriver.Group1.class);
+ if (commGroupClient == null) {
+ commGroupClient = groupCommClient.getCommunicationGroup(MultipleCommGroupsDriver.Group2.class);
+ }
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) throws Exception {
+ commGroupClient.getBroadcastReceiver(MultipleCommGroupsDriver.BroadcastOperatorName.class).receive();
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/package-info.java
new file mode 100644
index 0000000..c67ea9e
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests for group communication.
+ */
+package org.apache.reef.tests.group;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java
new file mode 100644
index 0000000..8e990cb
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ TestMultipleCommGroups.class
+ })
+public final class GroupCommTestSuite {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/TestMultipleCommGroups.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/TestMultipleCommGroups.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/TestMultipleCommGroups.java
new file mode 100644
index 0000000..5dad294
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/TestMultipleCommGroups.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.io.network.group.impl.driver.GroupCommService;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.util.EnvironmentUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Launch Group Communication test using multiple communication groups.
+ */
+public class TestMultipleCommGroups {
+ private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment();
+
+ /**
+ * Set up the test environment.
+ */
+ @Before
+ public void setUp() throws Exception {
+ this.testEnvironment.setUp();
+ }
+
+ /**
+ * Tear down the test environment.
+ */
+ @After
+ public void tearDown() throws Exception {
+ this.testEnvironment.tearDown();
+ }
+
+ /**
+ * Run the MultipleCommGroups test.
+ */
+ @Test
+ public void testMultipleCommGroups() {
+ final Configuration driverConf = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(MultipleCommGroupsDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_MultipleCommGroups")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, MultipleCommGroupsDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, MultipleCommGroupsDriver.EvaluatorAllocatedHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, MultipleCommGroupsDriver.ContextActiveHandler.class)
+ .set(DriverConfiguration.ON_TASK_RUNNING, MultipleCommGroupsDriver.TaskRunningHandler.class)
+ .set(DriverConfiguration.ON_TASK_COMPLETED, MultipleCommGroupsDriver.TaskCompletedHandler.class)
+ .build();
+ final Configuration groupCommConf = GroupCommService.getConfiguration();
+ final LauncherStatus state = this.testEnvironment.run(Configurations.merge(driverConf, groupCommConf));
+ Assert.assertTrue("Job state after execution: " + state, state.isSuccess());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cf169d7/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/package-info.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/package-info.java
new file mode 100644
index 0000000..c67ea9e
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests for group communication.
+ */
+package org.apache.reef.tests.group;