You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2015/11/01 04:20:53 UTC

incubator-reef git commit: [REEF-844] Allow group comm user to choose topology implementation

Repository: incubator-reef
Updated Branches:
  refs/heads/master c001de6e7 -> 89e74cba8


[REEF-844] Allow group comm user to choose topology implementation

This addressed the issue by
  * Add `newCommunicationGroup` api with topology argument in `GroupCommDriver`

JIRA:
  [REEF-844](https://issues.apache.org/jira/browse/REEF-844)

Pull request:
  This closes #590


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/89e74cba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/89e74cba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/89e74cba

Branch: refs/heads/master
Commit: 89e74cba8e1ce6ecd74d7b7fe804b5353208d22b
Parents: c001de6
Author: Gyeongin Yu <gy...@gmail.com>
Authored: Mon Oct 26 15:32:01 2015 +0900
Committer: Dongjoon Hyun <do...@apache.org>
Committed: Sun Nov 1 11:57:02 2015 +0900

----------------------------------------------------------------------
 .../group/api/driver/GroupCommDriver.java       | 16 ++++++++++
 .../impl/config/parameters/TopologyClass.java   | 32 ++++++++++++++++++++
 .../driver/CommunicationGroupDriverFactory.java |  5 +++
 .../driver/CommunicationGroupDriverImpl.java    | 14 ++++++---
 .../group/impl/driver/GroupCommDriverImpl.java  | 15 +++++++--
 5 files changed, 74 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/89e74cba/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
index d984c24..473bbc1 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
@@ -60,6 +60,22 @@ public interface GroupCommDriver {
       int customFanOut);
 
   /**
+   * Create a new communication group with the specified name, topology implementation,
+   * the minimum number of tasks needed in this group before
+   * communication can start, and a custom fanOut.
+   *
+   * @param groupName
+   * @param topologyClass
+   * @param numberOfTasks
+   * @param customFanOut
+   * @return
+   */
+  CommunicationGroupDriver newCommunicationGroup(Class<? extends Name<String>> groupName,
+                                                 Class<? extends Topology> topologyClass,
+                                                 int numberOfTasks,
+                                                 int customFanOut);
+
+  /**
    * Tests whether the activeContext is a context configured.
    * using the Group Communication Service
    *

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/89e74cba/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TopologyClass.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TopologyClass.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TopologyClass.java
new file mode 100644
index 0000000..9b7fa6d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TopologyClass.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.io.network.group.api.driver.Topology;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * NamedParameter for topology implementation.
+ */
+@NamedParameter(doc = "NamedParameter for topology implementation")
+public final class TopologyClass implements Name<Class<? extends Topology>> {
+  private TopologyClass() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/89e74cba/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
index d61cfaf..652d733 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverFactory.java
@@ -23,6 +23,7 @@ import org.apache.reef.driver.parameters.DriverIdentifier;
 import org.apache.reef.driver.task.FailedTask;
 import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
+import org.apache.reef.io.network.group.api.driver.Topology;
 import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
 import org.apache.reef.io.network.group.impl.config.parameters.*;
 import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
@@ -64,6 +65,8 @@ public final class CommunicationGroupDriverFactory {
   /**
    * Instantiates a new CommunicationGroupDriver instance.
    * @param groupName specified name of the communication group
+   * @param topologyClass topology implementation
+   * @param commGroupMessageHandler message handler for the communication group
    * @param numberOfTasks minimum number of tasks needed in this group before start
    * @param customFanOut fanOut for TreeTopology
    * @return CommunicationGroupDriver instance
@@ -71,12 +74,14 @@ public final class CommunicationGroupDriverFactory {
    */
   public CommunicationGroupDriver getNewInstance(
       final Class<? extends Name<String>> groupName,
+      final Class<? extends Topology> topologyClass,
       final BroadcastingEventHandler<GroupCommunicationMessage> commGroupMessageHandler,
       final int numberOfTasks,
       final int customFanOut) throws InjectionException {
 
     final Injector newInjector = injector.forkInjector();
     newInjector.bindVolatileParameter(CommGroupNameClass.class, groupName);
+    newInjector.bindVolatileParameter(TopologyClass.class, topologyClass);
     newInjector.bindVolatileParameter(CommGroupMessageHandler.class, commGroupMessageHandler);
     newInjector.bindVolatileParameter(CommGroupNumTask.class, numberOfTasks);
     newInjector.bindVolatileParameter(TreeTopologyFanOut.class, customFanOut);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/89e74cba/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
index f2a1c1a..c55ed3f 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
@@ -83,6 +83,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
   private final SetMap<MsgKey, IndexedMsg> msgQue = new SetMap<>();
 
   private final TopologyFactory topologyFactory;
+  private final Class<? extends Topology> topologyClass;
 
   /**
    * @Deprecated in 0.14. Use Tang to obtain an instance of this instead.
@@ -115,6 +116,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
     } catch (final InjectionException e) {
       throw new RuntimeException(e);
     }
+    this.topologyClass = TreeTopology.class;
   }
 
   @Inject
@@ -131,7 +133,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
           final BroadcastingEventHandler<GroupCommunicationMessage> commGroupMessageHandler,
       @Parameter(DriverIdentifier.class) final String driverId,
       @Parameter(CommGroupNumTask.class) final int numberOfTasks,
-      final TopologyFactory topologyFactory) {
+      final TopologyFactory topologyFactory,
+      @Parameter(TopologyClass.class) final Class<? extends Topology> topologyClass) {
     super();
     this.groupName = groupName;
     this.driverId = driverId;
@@ -141,6 +144,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
     registerHandlers(groupCommRunningTaskHandler, groupCommFailedTaskHandler,
         groupCommFailedEvaluatorHandler, commGroupMessageHandler);
     this.topologyFactory = topologyFactory;
+    this.topologyClass = topologyClass;
   }
 
   private void registerHandlers(
@@ -166,7 +170,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
 
     final Topology topology;
     try {
-      topology = topologyFactory.getNewInstance(operatorName, TreeTopology.class);
+      topology = topologyFactory.getNewInstance(operatorName, topologyClass);
     } catch (final InjectionException e) {
       LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName);
       throw new RuntimeException(e);
@@ -193,7 +197,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
 
     final Topology topology;
     try {
-      topology = topologyFactory.getNewInstance(operatorName, TreeTopology.class);
+      topology = topologyFactory.getNewInstance(operatorName, topologyClass);
     } catch (final InjectionException e) {
       LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName);
       throw new RuntimeException(e);
@@ -219,7 +223,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
 
     final Topology topology;
     try {
-      topology = topologyFactory.getNewInstance(operatorName, TreeTopology.class);
+      topology = topologyFactory.getNewInstance(operatorName, topologyClass);
     } catch (final InjectionException e) {
       LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName);
       throw new RuntimeException(e);
@@ -245,7 +249,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
 
     final Topology topology;
     try {
-      topology = topologyFactory.getNewInstance(operatorName, TreeTopology.class);
+      topology = topologyFactory.getNewInstance(operatorName, topologyClass);
     } catch (final InjectionException e) {
       LOG.log(Level.WARNING, "Cannot inject new topology named {0}", operatorName);
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/89e74cba/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
index 6c1e9ab..ea84367 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
@@ -28,6 +28,7 @@ import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.io.network.Message;
 import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
 import org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver;
+import org.apache.reef.io.network.group.api.driver.Topology;
 import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
 import org.apache.reef.io.network.group.impl.GroupCommunicationMessageCodec;
 import org.apache.reef.io.network.group.impl.config.parameters.*;
@@ -192,12 +193,20 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
   @Override
   public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName,
                                                         final int numberOfTasks) {
-    return newCommunicationGroup(groupName, numberOfTasks, fanOut);
+    return newCommunicationGroup(groupName, TreeTopology.class, numberOfTasks, fanOut);
   }
 
   @Override
   public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName,
                                                         final int numberOfTasks, final int customFanOut) {
+    return newCommunicationGroup(groupName, TreeTopology.class, numberOfTasks, customFanOut);
+  }
+
+  // TODO[JIRA REEF-391]: Allow different topology implementations for different operations in the same CommGroup.
+  @Override
+  public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName,
+                                                        final Class<? extends Topology> topologyClass,
+                                                        final int numberOfTasks, final int customFanOut) {
     LOG.entering("GroupCommDriverImpl", "newCommunicationGroup",
         new Object[]{Utils.simpleName(groupName), numberOfTasks});
 
@@ -205,8 +214,8 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver {
         = new BroadcastingEventHandler<>();
     final CommunicationGroupDriver commGroupDriver;
     try {
-      commGroupDriver
-          = commGroupDriverFactory.getNewInstance(groupName, commGroupMessageHandler, numberOfTasks, customFanOut);
+      commGroupDriver = commGroupDriverFactory.getNewInstance(
+          groupName, topologyClass, commGroupMessageHandler, numberOfTasks, customFanOut);
     } catch (final InjectionException e) {
       LOG.log(Level.WARNING, "Cannot inject new CommunicationGroupDriver");
       throw new RuntimeException(e);