You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/02/05 00:35:10 UTC
reef git commit: [REEF-1186] Remove wrong inject annotation on
OperatorTopologyImpl
Repository: reef
Updated Branches:
refs/heads/master dd724ffdc -> 24e73feb9
[REEF-1186] Remove wrong inject annotation on OperatorTopologyImpl
This removes the wrong inject annotation and fixes some typos.
JIRA:
[REEF-1186](https://issues.apache.org/jira/browse/REEF-1186)
Pull Request:
This closes #821
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/24e73feb
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/24e73feb
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/24e73feb
Branch: refs/heads/master
Commit: 24e73feb98c575669217360fbb507a325e0147f5
Parents: dd724ff
Author: Dongjoon Hyun <do...@apache.org>
Authored: Thu Feb 4 00:00:25 2016 -0800
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Feb 4 15:34:25 2016 -0800
----------------------------------------------------------------------
.../group/impl/task/OperatorTopologyImpl.java | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/24e73feb/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
index 86e6f55..a5ac964 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
@@ -34,7 +34,6 @@ import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.SingleThreadStage;
-import javax.inject.Inject;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
@@ -62,7 +61,7 @@ public class OperatorTopologyImpl implements OperatorTopology {
private OperatorTopologyStruct baseTopology;
private OperatorTopologyStruct effectiveTopology;
- private final ResettingCountDownLatch topologyLockAquired = new ResettingCountDownLatch(1);
+ private final ResettingCountDownLatch topologyLockAcquired = new ResettingCountDownLatch(1);
private final AtomicBoolean updatingTopo = new AtomicBoolean(false);
private final EventHandler<GroupCommunicationMessage> baseTopologyUpdateHandler = new BaseTopologyUpdateHandler();
@@ -79,7 +78,6 @@ public class OperatorTopologyImpl implements OperatorTopology {
dataHandlingStageHandler,
10000);
- @Inject
public OperatorTopologyImpl(final Class<? extends Name<String>> groupName,
final Class<? extends Name<String>> operName, final String selfId,
final String driverId, final Sender sender, final int version) {
@@ -127,8 +125,8 @@ public class OperatorTopologyImpl implements OperatorTopology {
case UpdateTopology:
updatingTopo.set(true);
baseTopologyUpdateStage.onNext(msg);
- topologyLockAquired.awaitAndReset(1);
- LOG.finest(getQualifiedName() + "topoLockAquired CDL released. Resetting it to new CDL");
+ topologyLockAcquired.awaitAndReset(1);
+ LOG.finest(getQualifiedName() + "topoLockAcquired CDL released. Resetting it to new CDL");
sendAckToDriver(msg);
break;
@@ -329,7 +327,7 @@ public class OperatorTopologyImpl implements OperatorTopology {
throw new ParentDeadException(getQualifiedName()
+ "Parent dead. Current behavior is for the child to die too.");
} else {
- LOG.finest(getQualifiedName() + "Updating basetopology struct");
+ LOG.finest(getQualifiedName() + "Updating baseTopology struct");
baseTopology.update(msg);
sendAckToDriver(msg);
}
@@ -428,7 +426,7 @@ public class OperatorTopologyImpl implements OperatorTopology {
* Unlike Dead msgs this needs to be synchronized because data msgs are not
* routed through the base topo changes So we need to make sure to wait for
* updateTopo to complete and for the new effective topo to take effect. Hence
- * updatinTopo is set to false in refreshEffTopo. Also, since this is called
+ * updatingTopo is set to false in refreshEffTopo. Also, since this is called
* from a netty IO thread, we need to create a stage to move the msgs from
* netty space to application space and release the netty threads. Otherwise
* weird deadlocks can happen Ex: Sent model to k nodes using broadcast. Send
@@ -447,12 +445,12 @@ public class OperatorTopologyImpl implements OperatorTopology {
dataMsg});
LOG.finest(getQualifiedName() + "Waiting to acquire topoLock");
synchronized (topologyLock) {
- LOG.finest(getQualifiedName() + "Aqcuired topoLock");
+ LOG.finest(getQualifiedName() + "Acquired topoLock");
while (updatingTopo.get()) {
try {
LOG.finest(getQualifiedName() + "Topology is being updated. Released topoLock, Waiting on topoLock");
topologyLock.wait();
- LOG.finest(getQualifiedName() + "Aqcuired topoLock");
+ LOG.finest(getQualifiedName() + "Acquired topoLock");
} catch (final InterruptedException e) {
throw new RuntimeException("InterruptedException while data handling"
+ "stage was waiting for updatingTopo to become false", e);
@@ -480,8 +478,8 @@ public class OperatorTopologyImpl implements OperatorTopology {
LOG.finest(getQualifiedName() + "Waiting to acquire topoLock");
synchronized (topologyLock) {
LOG.finest(getQualifiedName() + "Acquired topoLock");
- LOG.finest(getQualifiedName() + "Releasing topoLoackAcquired CDL");
- topologyLockAquired.countDown();
+ LOG.finest(getQualifiedName() + "Releasing topoLockAcquired CDL");
+ topologyLockAcquired.countDown();
try {
updateBaseTopology();
LOG.finest(getQualifiedName() + "Completed updating base & effective topologies");