You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by js...@apache.org on 2015/11/12 04:20:10 UTC

incubator-reef git commit: [REEF-923] Make group comm run with a number of tasks less than the initial count

Repository: incubator-reef
Updated Branches:
  refs/heads/master 4fbc9038e -> 0b42c728d


[REEF-923] Make group comm run with a number of tasks less than the initial count

This addressed the issue by
  * Check number of running tasks only during initialization

JIRA:
  [REEF-923](https://issues.apache.org/jira/browse/REEF-923)

Pull Request:
  Closes #622


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/0b42c728
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/0b42c728
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/0b42c728

Branch: refs/heads/master
Commit: 0b42c728daecfac31b78d7fafcca81339d902bdc
Parents: 4fbc903
Author: Gyeongin Yu <gy...@gmail.com>
Authored: Fri Nov 6 19:24:11 2015 +0900
Committer: Jason (Joo Seong) Jeong <cu...@gmail.com>
Committed: Thu Nov 12 12:19:35 2015 +0900

----------------------------------------------------------------------
 .../driver/CommunicationGroupDriverImpl.java    | 26 +++++++++++---------
 1 file changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0b42c728/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
index 6efa128..74e094c 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
@@ -38,7 +38,6 @@ import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
 import org.apache.reef.io.network.group.impl.utils.CountingSemaphore;
 import org.apache.reef.io.network.group.impl.utils.SetMap;
 import org.apache.reef.io.network.group.impl.utils.Utils;
-import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.JavaConfigurationBuilder;
@@ -72,7 +71,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
   private final ConfigurationSerializer confSerializer;
   private final String driverId;
 
-  private final CountingSemaphore allTasksAdded;
+  private final CountingSemaphore allInitialTasksRunning;
 
   private final Object topologiesLock = new Object();
   private final Object configLock = new Object();
@@ -102,7 +101,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
     this.groupName = groupName;
     this.driverId = driverId;
     this.confSerializer = confSerializer;
-    this.allTasksAdded = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
+    this.allInitialTasksRunning = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
 
     groupCommRunningTaskHandler.addHandler(new TopologyRunningTaskHandler(this));
     groupCommFailedTaskHandler.addHandler(new TopologyFailedTaskHandler(this));
@@ -141,7 +140,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
     this.groupName = groupName;
     this.driverId = driverId;
     this.confSerializer = confSerializer;
-    this.allTasksAdded = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
+    this.allInitialTasksRunning = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
 
     registerHandlers(groupCommRunningTaskHandler, groupCommFailedTaskHandler,
         groupCommFailedEvaluatorHandler, groupCommMessageHandler);
@@ -431,8 +430,9 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
           final Topology topology = topologies.get(operName);
           topology.onRunningTask(id);
         }
-
-        allTasksAdded.decrement();
+        if (initializing.get()) {
+          allInitialTasksRunning.decrement();
+        }
         perTaskState.put(id, TaskState.RUNNING);
         LOG.finest(getQualifiedName() + "Released topologiesLock. Waiting to acquire yetToRunLock");
       } else {
@@ -485,7 +485,9 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
         final Topology topology = topologies.get(operName);
         topology.onFailedTask(id);
       }
-      allTasksAdded.increment();
+      if (initializing.get()) {
+        allInitialTasksRunning.increment();
+      }
       perTaskState.put(id, TaskState.FAILED);
       LOG.finest(getQualifiedName() + "Removing msgs associated with dead task " + id + " from msgQue.");
       final Set<MsgKey> keys = msgQue.keySet();
@@ -594,12 +596,12 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
         LOG.finer(getQualifiedName() + "Discarding msg. Released topologiesLock");
         return;
       }
-      if (initializing.get() || msg.getType().equals(ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology)) {
+      if (initializing.get()) {
         LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": Waiting for all required(" +
-            allTasksAdded.getInitialCount() + ") nodes to run");
-        allTasksAdded.await();
-        LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": All required(" + allTasksAdded.getInitialCount() +
-            ") nodes are running");
+            allInitialTasksRunning.getInitialCount() + ") nodes to run");
+        allInitialTasksRunning.await();
+        LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": All required(" +
+            allInitialTasksRunning.getInitialCount() + ") nodes are running");
         initializing.compareAndSet(true, false);
       }
       queNProcessMsg(msg);