You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2015/11/01 04:20:53 UTC
incubator-reef git commit: [REEF-844] Allow group comm user to choose
topology implementation
Repository: incubator-reef
Updated Branches:
refs/heads/master c001de6e7 -> 89e74cba8
[REEF-844] Allow group comm user to choose topology implementation
This addressed the issue by
* Add `newCommunicationGroup` api with topology argument in `GroupCommDriver`
JIRA:
[REEF-844](https://issues.apache.org/jira/browse/REEF-844)
Pull request:
This closes #590
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/89e74cba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/89e74cba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/89e74cba
Branch: refs/heads/master
Commit: 89e74cba8e1ce6ecd74d7b7fe804b5353208d22b
Parents: c001de6
Author: Gyeongin Yu <gy...@gmail.com>
Authored: Mon Oct 26 15:32:01 2015 +0900
Committer: Dongjoon Hyun <do...@apache.org>
Committed: Sun Nov 1 11:57:02 2015 +0900
----------------------------------------------------------------------
.../group/api/driver/GroupCommDriver.java | 16 ++++++++++
.../impl/config/parameters/TopologyClass.java | 32 ++++++++++++++++++++
.../driver/CommunicationGroupDriverFactory.java | 5 +++
.../driver/CommunicationGroupDriverImpl.java | 14 ++++++---
.../group/impl/driver/GroupCommDriverImpl.java | 15 +++++++--
5 files changed, 74 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/89e74cba/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
index d984c24..473bbc1 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
@@ -60,6 +60,22 @@ public interface GroupCommDriver {
int customFanOut);
/**
+ * Create a new communication group with the specified name, topology implementation,
+ * the minimum number of tasks needed in this group before
+ * communication can start, and a custom fanOut.
+ *
+ * @param groupName
+ * @param topologyClass
+ * @param numberOfTasks
+ * @param customFanOut
+ * @return
+ */
+ CommunicationGroupDriver newCommunicationGroup(Class<? extends Name<String>> groupName,
+ Class<? extends Topology> topologyClass,
+ int numberOfTasks,
+ int customFanOut);
+
+ /**
* Tests whether the activeContext is a context configured.
* using the Group Communication Service
*
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/89e74cba/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TopologyClass.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TopologyClass.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TopologyClass.java
new file mode 100644
index 0000000..9b7fa6d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TopologyClass.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.io.network.group.api.driver.Topology;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * NamedParameter for topology implementation.
+ */
+@NamedParameter(doc = "NamedParameter for topology implementation")
+public final class TopologyClass implements Name<Class<? extends Topology>> {
+ private TopologyClass() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/89e74cba/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
index d61cfaf..652d733 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
@@ -23,6 +23,7 @@ import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.FailedTask;
import org.apache.reef.driver.task.RunningTask;
import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
+import org.apache.reef.io.network.group.api.driver.Topology;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.config.parameters.*;
import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
@@ -64,6 +65,8 @@ public final class CommunicationGroupDriverFactory {
/**
* Instantiates a new CommunicationGroupDriver instance.
* @param groupName specified name of the communication group
+ * @param topologyClass topology implementation
+ * @param commGroupMessageHandler message handler for the communication group
* @param numberOfTasks minimum number of tasks needed in this group before start
* @param customFanOut fanOut for TreeTopology
* @return CommunicationGroupDriver instance
@@ -71,12 +74,14 @@ public final class CommunicationGroupDriverFactory {
*/
public CommunicationGroupDriver getNewInstance(
final Class<? extends Name<String>> groupName,
+ final Class<? extends Topology> topologyClass,
final BroadcastingEventHandler<GroupCommunicationMessage> commGroupMessageHandler,
final int numberOfTasks,
final int customFanOut) throws InjectionException {
final Injector newInjector = injector.forkInjector();
newInjector.bindVolatileParameter(CommGroupNameClass.class, groupName);
+ newInjector.bindVolatileParameter(TopologyClass.class, topologyClass);
newInjector.bindVolatileParameter(CommGroupMessageHandler.class, commGroupMessageHandler);
newInjector.bindVolatileParameter(CommGroupNumTask.class, numberOfTasks);
newInjector.bindVolatileParameter(TreeTopologyFanOut.class, customFanOut);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/89e74cba/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
index f2a1c1a..c55ed3f 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
@@ -83,6 +83,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
private final SetMap<MsgKey, IndexedMsg> msgQue = new SetMap<>();
private final TopologyFactory topologyFactory;
+ private final Class<? extends Topology> topologyClass;
/**
* @Deprecated in 0.14. Use Tang to obtain an instance of this instead.
@@ -115,6 +116,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
} catch (final InjectionException e) {
throw new RuntimeException(e);
}
+ this.topologyClass = TreeTopology.class;
}
@Inject
@@ -131,7 +133,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
final BroadcastingEventHandler<GroupCommunicationMessage> commGroupMessageHandler,
@Parameter(DriverIdentifier.class) final String driverId,
@Parameter(CommGroupNumTask.class) final int numberOfTasks,
- final TopologyFactory topologyFactory) {
+ final TopologyFactory topologyFactory,
+ @Parameter(TopologyClass.class) final Class<? extends Topology> topologyClass) {
super();
this.groupName = groupName;
this.driverId = driverId;
@@ -141,6 +144,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
registerHandlers(groupCommRunningTaskHandler, groupCommFailedTaskHandler,
groupCommFailedEvaluatorHandler, commGroupMessageHandler);
this.topologyFactory = topologyFactory;
+ this.topologyClass = topologyClass;
}
private void registerHandlers(
@@ -166,7 +170,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
final Topology topology;
try {
- topology = topologyFactory.getNewInstance(operatorName, TreeTopology.class);
+ topology = topologyFactory.getNewInstance(operatorName, topologyClass);
} catch (final InjectionException e) {
LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName);
throw new RuntimeException(e);
@@ -193,7 +197,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
final Topology topology;
try {
- topology = topologyFactory.getNewInstance(operatorName, TreeTopology.class);
+ topology = topologyFactory.getNewInstance(operatorName, topologyClass);
} catch (final InjectionException e) {
LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName);
throw new RuntimeException(e);
@@ -219,7 +223,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
final Topology topology;
try {
- topology = topologyFactory.getNewInstance(operatorName, TreeTopology.class);
+ topology = topologyFactory.getNewInstance(operatorName, topologyClass);
} catch (final InjectionException e) {
LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName);
throw new RuntimeException(e);
@@ -245,7 +249,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
final Topology topology;
try {
- topology = topologyFactory.getNewInstance(operatorName, TreeTopology.class);
+ topology = topologyFactory.getNewInstance(operatorName, topologyClass);
} catch (final InjectionException e) {
LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName);
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/89e74cba/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
index 6c1e9ab..ea84367 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
@@ -28,6 +28,7 @@ import org.apache.reef.driver.task.RunningTask;
import org.apache.reef.io.network.Message;
import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
import org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver;
+import org.apache.reef.io.network.group.api.driver.Topology;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessageCodec;
import org.apache.reef.io.network.group.impl.config.parameters.*;
@@ -192,12 +193,20 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
@Override
public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName,
final int numberOfTasks) {
- return newCommunicationGroup(groupName, numberOfTasks, fanOut);
+ return newCommunicationGroup(groupName, TreeTopology.class, numberOfTasks, fanOut);
}
@Override
public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName,
final int numberOfTasks, final int customFanOut) {
+ return newCommunicationGroup(groupName, TreeTopology.class, numberOfTasks, customFanOut);
+ }
+
+ // TODO[JIRA REEF-391]: Allow different topology implementations for different operations in the same CommGroup.
+ @Override
+ public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName,
+ final Class<? extends Topology> topologyClass,
+ final int numberOfTasks, final int customFanOut) {
LOG.entering("GroupCommDriverImpl", "newCommunicationGroup",
new Object[]{Utils.simpleName(groupName), numberOfTasks});
@@ -205,8 +214,8 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
= new BroadcastingEventHandler<>();
final CommunicationGroupDriver commGroupDriver;
try {
- commGroupDriver
- = commGroupDriverFactory.getNewInstance(groupName, commGroupMessageHandler, numberOfTasks, customFanOut);
+ commGroupDriver = commGroupDriverFactory.getNewInstance(
+ groupName, topologyClass, commGroupMessageHandler, numberOfTasks, customFanOut);
} catch (final InjectionException e) {
LOG.log(Level.WARNING, "Cannot inject new CommunicationGroupDriver");
throw new RuntimeException(e);