You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by js...@apache.org on 2015/11/12 04:20:10 UTC
incubator-reef git commit: [REEF-923] Make group comm run with a
number of tasks less than the initial count
Repository: incubator-reef
Updated Branches:
refs/heads/master 4fbc9038e -> 0b42c728d
[REEF-923] Make group comm run with a number of tasks less than the initial count
This addressed the issue by
* Check number of running tasks only during initialization
JIRA:
[REEF-923](https://issues.apache.org/jira/browse/REEF-923)
Pull Request:
Closes #622
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/0b42c728
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/0b42c728
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/0b42c728
Branch: refs/heads/master
Commit: 0b42c728daecfac31b78d7fafcca81339d902bdc
Parents: 4fbc903
Author: Gyeongin Yu <gy...@gmail.com>
Authored: Fri Nov 6 19:24:11 2015 +0900
Committer: Jason (Joo Seong) Jeong <cu...@gmail.com>
Committed: Thu Nov 12 12:19:35 2015 +0900
----------------------------------------------------------------------
.../driver/CommunicationGroupDriverImpl.java | 26 +++++++++++---------
1 file changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0b42c728/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
index 6efa128..74e094c 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
@@ -38,7 +38,6 @@ import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
import org.apache.reef.io.network.group.impl.utils.CountingSemaphore;
import org.apache.reef.io.network.group.impl.utils.SetMap;
import org.apache.reef.io.network.group.impl.utils.Utils;
-import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.JavaConfigurationBuilder;
@@ -72,7 +71,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
private final ConfigurationSerializer confSerializer;
private final String driverId;
- private final CountingSemaphore allTasksAdded;
+ private final CountingSemaphore allInitialTasksRunning;
private final Object topologiesLock = new Object();
private final Object configLock = new Object();
@@ -102,7 +101,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
this.groupName = groupName;
this.driverId = driverId;
this.confSerializer = confSerializer;
- this.allTasksAdded = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
+ this.allInitialTasksRunning = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
groupCommRunningTaskHandler.addHandler(new TopologyRunningTaskHandler(this));
groupCommFailedTaskHandler.addHandler(new TopologyFailedTaskHandler(this));
@@ -141,7 +140,7 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
this.groupName = groupName;
this.driverId = driverId;
this.confSerializer = confSerializer;
- this.allTasksAdded = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
+ this.allInitialTasksRunning = new CountingSemaphore(numberOfTasks, getQualifiedName(), topologiesLock);
registerHandlers(groupCommRunningTaskHandler, groupCommFailedTaskHandler,
groupCommFailedEvaluatorHandler, groupCommMessageHandler);
@@ -431,8 +430,9 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
final Topology topology = topologies.get(operName);
topology.onRunningTask(id);
}
-
- allTasksAdded.decrement();
+ if (initializing.get()) {
+ allInitialTasksRunning.decrement();
+ }
perTaskState.put(id, TaskState.RUNNING);
LOG.finest(getQualifiedName() + "Released topologiesLock. Waiting to acquire yetToRunLock");
} else {
@@ -485,7 +485,9 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
final Topology topology = topologies.get(operName);
topology.onFailedTask(id);
}
- allTasksAdded.increment();
+ if (initializing.get()) {
+ allInitialTasksRunning.increment();
+ }
perTaskState.put(id, TaskState.FAILED);
LOG.finest(getQualifiedName() + "Removing msgs associated with dead task " + id + " from msgQue.");
final Set<MsgKey> keys = msgQue.keySet();
@@ -594,12 +596,12 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
LOG.finer(getQualifiedName() + "Discarding msg. Released topologiesLock");
return;
}
- if (initializing.get() || msg.getType().equals(ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology)) {
+ if (initializing.get()) {
LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": Waiting for all required(" +
- allTasksAdded.getInitialCount() + ") nodes to run");
- allTasksAdded.await();
- LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": All required(" + allTasksAdded.getInitialCount() +
- ") nodes are running");
+ allInitialTasksRunning.getInitialCount() + ") nodes to run");
+ allInitialTasksRunning.await();
+ LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": All required(" +
+ allInitialTasksRunning.getInitialCount() + ") nodes are running");
initializing.compareAndSet(true, false);
}
queNProcessMsg(msg);