You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/03/06 02:55:33 UTC
[3/8] incubator-reef git commit: [REEF-118] Add Shimoga library for
elastic group communication.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
new file mode 100644
index 0000000..7483483
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.io.network.group.api.driver.TaskNode;
+import org.apache.reef.io.network.group.api.driver.TaskNodeStatus;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.EStage;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+public class TaskNodeImpl implements TaskNode {
+
+ private static final Logger LOG = Logger.getLogger(TaskNodeImpl.class.getName());
+
+ private final EStage<GroupCommunicationMessage> senderStage;
+ private final Class<? extends Name<String>> groupName;
+ private final Class<? extends Name<String>> operName;
+ private final String taskId;
+ private final String driverId;
+
+ private final boolean isRoot;
+ private TaskNode parent;
+ private TaskNode sibling;
+ private final List<TaskNode> children = new ArrayList<>();
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final AtomicBoolean topoSetupSent = new AtomicBoolean(false);
+
+ private final TaskNodeStatus taskNodeStatus;
+
+ private final AtomicInteger version = new AtomicInteger(0);
+
+ public TaskNodeImpl(final EStage<GroupCommunicationMessage> senderStage,
+ final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operatorName,
+ final String taskId, final String driverId, final boolean isRoot) {
+ this.senderStage = senderStage;
+ this.groupName = groupName;
+ this.operName = operatorName;
+ this.taskId = taskId;
+ this.driverId = driverId;
+ this.isRoot = isRoot;
+ taskNodeStatus = new TaskNodeStatusImpl(groupName, operatorName, taskId, this);
+ }
+
+ @Override
+ public void setSibling(final TaskNode leaf) {
+ LOG.entering("TaskNodeImpl", "setSibling", new Object[]{getQualifiedName(), leaf});
+ sibling = leaf;
+ LOG.exiting("TaskNodeImpl", "setSibling", getQualifiedName());
+ }
+
+ @Override
+ public int getNumberOfChildren() {
+ LOG.entering("TaskNodeImpl", "getNumberOfChildren", getQualifiedName());
+ final int size = children.size();
+ LOG.exiting("TaskNodeImpl", "getNumberOfChildren", getQualifiedName() + size);
+ return size;
+ }
+
+ @Override
+ public TaskNode successor() {
+ LOG.entering("TaskNodeImpl", "successor", getQualifiedName());
+ LOG.exiting("TaskNodeImpl", "successor", getQualifiedName() + sibling);
+ return sibling;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + taskId + "," + version.get() + ")";
+ }
+
+ /**
+ * * Methods pertaining to my status change ***
+ */
+ @Override
+ public void onFailedTask() {
+ LOG.entering("TaskNodeImpl", "onFailedTask", getQualifiedName());
+ if (!running.compareAndSet(true, false)) {
+ LOG.fine(getQualifiedName() + "Trying to set failed on an already failed task. Something fishy!!!");
+ LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName() + "Trying to set failed on an already failed task. Something fishy!!!");
+ return;
+ }
+ taskNodeStatus.clearStateAndReleaseLocks();
+ LOG.finest(getQualifiedName() + "Changed status to failed.");
+ LOG.finest(getQualifiedName() + "Resetting topoSetupSent to false");
+ topoSetupSent.set(false);
+ if (parent != null && parent.isRunning()) {
+ parent.onChildDead(taskId);
+ } else {
+ LOG.finest(getQualifiedName() + "Skipping asking parent to process child death");
+ }
+ for (final TaskNode child : children) {
+ if (child.isRunning()) {
+ child.onParentDead();
+ }
+ }
+ final int version = this.version.incrementAndGet();
+ LOG.finest(getQualifiedName() + "Bumping up to version-" + version);
+ LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName());
+ }
+
+ @Override
+ public void onRunningTask() {
+ LOG.entering("TaskNodeImpl", "onRunningTask", getQualifiedName());
+ if (!running.compareAndSet(false, true)) {
+ LOG.fine(getQualifiedName() + "Trying to set running on an already running task. Something fishy!!!");
+ LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName() + "Trying to set running on an already running task. Something fishy!!!");
+ return;
+ }
+ final int version = this.version.get();
+ LOG.finest(getQualifiedName() + "Changed status to running version-" + version);
+ if (parent != null && parent.isRunning()) {
+ final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parent.getTaskId(),
+ parent.getVersion(), taskId,
+ version, Utils.EmptyByteArr);
+ taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
+ senderStage.onNext(gcm);
+ parent.onChildRunning(taskId);
+ } else {
+ LOG.finest(getQualifiedName() + "Skipping src add to & for parent");
+ }
+ for (final TaskNode child : children) {
+ if (child.isRunning()) {
+ final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, child.getTaskId(),
+ child.getVersion(), taskId, version,
+ Utils.EmptyByteArr);
+ taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
+ senderStage.onNext(gcm);
+ child.onParentRunning();
+ }
+ }
+ LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName());
+ }
+
+ /**
+ * * Methods pertaining to my status change ends ***
+ */
+
+ @Override
+ public void onParentRunning() {
+ LOG.entering("TaskNodeImpl", "onParentRunning", getQualifiedName());
+ if (parent != null && parent.isRunning()) {
+ final int parentVersion = parent.getVersion();
+ final String parentTaskId = parent.getTaskId();
+ final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parentTaskId,
+ parentVersion, taskId, version.get(),
+ Utils.EmptyByteArr);
+ taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
+ senderStage.onNext(gcm);
+ } else {
+ LOG.finer(getQualifiedName() + "Parent was running when I was asked to add him."
+ + " However, he is not active anymore. Returning without sending ParentAdd" + " msg. ***CHECK***");
+ }
+ LOG.exiting("TaskNodeImpl", "onParentRunning", getQualifiedName());
+ }
+
+ @Override
+ public void onParentDead() {
+ LOG.entering("TaskNodeImpl", "onParentDead", getQualifiedName());
+ if (parent != null) {
+ final int parentVersion = parent.getVersion();
+ final String parentTaskId = parent.getTaskId();
+ taskNodeStatus.updateFailureOf(parent.getTaskId());
+ final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead, parentTaskId,
+ parentVersion, taskId, version.get(),
+ Utils.EmptyByteArr);
+ taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
+ senderStage.onNext(gcm);
+ } else {
+ throw new RuntimeException(getQualifiedName() + "Don't expect parent to be null. Something wrong");
+ }
+ LOG.exiting("TaskNodeImpl", "onParentDead", getQualifiedName());
+ }
+
+ @Override
+ public void onChildRunning(final String childId) {
+ LOG.entering("TaskNodeImpl", "onChildRunning", new Object[]{getQualifiedName(), childId});
+ final TaskNode childTask = findTask(childId);
+ if (childTask != null && childTask.isRunning()) {
+ final int childVersion = childTask.getVersion();
+ final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, childId,
+ childVersion, taskId, version.get(),
+ Utils.EmptyByteArr);
+ taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
+ senderStage.onNext(gcm);
+ } else {
+ LOG.fine(getQualifiedName() + childId + " was running when I was asked to add him."
+ + " However, I can't find a task corresponding to him now."
+ + " Returning without sending ChildAdd msg. ***CHECK***");
+ }
+ LOG.exiting("TaskNodeImpl", "onChildRunning", getQualifiedName() + childId);
+ }
+
+ @Override
+ public void onChildDead(final String childId) {
+ LOG.entering("TaskNodeImpl", "onChildDead", new Object[]{getQualifiedName(), childId});
+ final TaskNode childTask = findChildTask(childId);
+ if (childTask != null) {
+ final int childVersion = childTask.getVersion();
+ taskNodeStatus.updateFailureOf(childId);
+ final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, childId,
+ childVersion, taskId, version.get(),
+ Utils.EmptyByteArr);
+ taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
+ senderStage.onNext(gcm);
+ } else {
+ throw new RuntimeException(getQualifiedName() + "Don't expect task for " + childId + " to be null. Something wrong");
+ }
+ LOG.exiting("TaskNodeImpl", "onChildDead", getQualifiedName() + childId);
+ }
+
+ /**
+ * * Methods pertaining to my neighbors status change ends ***
+ */
+
+ @Override
+ public void onReceiptOfAcknowledgement(final GroupCommunicationMessage msg) {
+ LOG.entering("TaskNodeImpl", "onReceiptOfAcknowledgement", new Object[]{getQualifiedName(), msg});
+ taskNodeStatus.processAcknowledgement(msg);
+ LOG.exiting("TaskNodeImpl", "onReceiptOfAcknowledgement", getQualifiedName() + msg);
+ }
+
+ @Override
+ public void updatingTopology() {
+ LOG.entering("TaskNodeImpl", "updatingTopology", getQualifiedName());
+ taskNodeStatus.updatingTopology();
+ LOG.exiting("TaskNodeImpl", "updatingTopology", getQualifiedName());
+ }
+
+ @Override
+ public String getTaskId() {
+ return taskId;
+ }
+
+ @Override
+ public void addChild(final TaskNode child) {
+ LOG.entering("TaskNodeImpl", "addChild", new Object[]{getQualifiedName(), child.getTaskId()});
+ children.add(child);
+ LOG.exiting("TaskNodeImpl", "addChild", getQualifiedName() + child);
+ }
+
+ @Override
+ public void removeChild(final TaskNode child) {
+ LOG.entering("TaskNodeImpl", "removeChild", new Object[]{getQualifiedName(), child.getTaskId()});
+ children.remove(child);
+ LOG.exiting("TaskNodeImpl", "removeChild", getQualifiedName() + child);
+ }
+
+ @Override
+ public void setParent(final TaskNode parent) {
+ LOG.entering("TaskNodeImpl", "setParent", new Object[]{getQualifiedName(), parent});
+ this.parent = parent;
+ LOG.exiting("TaskNodeImpl", "setParent", getQualifiedName() + parent);
+ }
+
+ @Override
+ public boolean isRunning() {
+ LOG.entering("TaskNodeImpl", "isRunning", getQualifiedName());
+ final boolean b = running.get();
+ LOG.exiting("TaskNodeImpl", "isRunning", getQualifiedName() + b);
+ return b;
+ }
+
+ @Override
+ public TaskNode getParent() {
+ LOG.entering("TaskNodeImpl", "getParent", getQualifiedName());
+ LOG.exiting("TaskNodeImpl", "getParent", getQualifiedName() + parent);
+ return parent;
+ }
+
+ private String getQualifiedName() {
+ return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":(" + taskId + "," + getVersion() + ") - ";
+ }
+
+ @Override
+ public boolean isNeighborActive(final String neighborId) {
+ LOG.entering("TaskNodeImpl", "isNeighborActive", new Object[]{getQualifiedName(), neighborId});
+ final boolean active = taskNodeStatus.isActive(neighborId);
+ LOG.exiting("TaskNodeImpl", "isNeighborActive", getQualifiedName() + active);
+ return active;
+ }
+
+ @Override
+ public boolean resetTopologySetupSent() {
+ LOG.entering("TaskNodeImpl", "resetTopologySetupSent", new Object[]{getQualifiedName(),});
+ final boolean retVal = topoSetupSent.compareAndSet(true, false);
+ LOG.exiting("TaskNodeImpl", "resetTopologySetupSent", getQualifiedName() + retVal);
+ return retVal;
+ }
+
+ @Override
+ public void checkAndSendTopologySetupMessage() {
+ LOG.entering("TaskNodeImpl", "checkAndSendTopologySetupMessage", getQualifiedName());
+ if (!topoSetupSent.get()
+ && (parentActive() && activeNeighborOfParent())
+ && (allChildrenActive() && activeNeighborOfAllChildren())) {
+ sendTopoSetupMsg();
+ }
+ LOG.exiting("TaskNodeImpl", "checkAndSendTopologySetupMessage", getQualifiedName());
+ }
+
+ private void sendTopoSetupMsg() {
+ LOG.entering("TaskNodeImpl", "sendTopoSetupMsg", getQualifiedName() + taskId);
+ LOG.fine(getQualifiedName() + "is an active participant in the topology");
+ senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, driverId, 0, taskId,
+ version.get(), Utils.EmptyByteArr));
+ taskNodeStatus.onTopologySetupMessageSent();
+ final boolean sentAlready = !topoSetupSent.compareAndSet(false, true);
+ if (sentAlready) {
+ LOG.fine(getQualifiedName() + "TopologySetup msg was sent more than once. Something fishy!!!");
+ }
+ LOG.exiting("TaskNodeImpl", "sendTopoSetupMsg", getQualifiedName());
+ }
+
+ @Override
+ public void checkAndSendTopologySetupMessageFor(final String source) {
+ LOG.entering("TaskNodeImpl", "checkAndSendTopologySetupMessageFor", new Object[]{getQualifiedName(), source});
+ final TaskNode srcNode = findTask(source);
+ if (srcNode != null) {
+ srcNode.checkAndSendTopologySetupMessage();
+ }
+ LOG.exiting("TaskNodeImpl", "checkAndSendTopologySetupMessageFor", getQualifiedName() + source);
+ }
+
+ /**
+ * @param sourceId
+ * @return
+ */
+ private TaskNode findTask(final String sourceId) {
+ LOG.entering("TaskNodeImpl", "findTask", new Object[]{getQualifiedName(), sourceId});
+ final TaskNode retNode;
+ if (parent != null && parent.getTaskId().equals(sourceId)) {
+ retNode = parent;
+ } else {
+ retNode = findChildTask(sourceId);
+ }
+ LOG.exiting("TaskNodeImpl", "findTask", getQualifiedName() + retNode);
+ return retNode;
+ }
+
+ private TaskNode findChildTask(final String sourceId) {
+ LOG.entering("TaskNodeImpl", "findChildTask", new Object[]{getQualifiedName(), sourceId});
+ TaskNode retNode = null;
+ for (final TaskNode child : children) {
+ if (child.getTaskId().equals(sourceId)) {
+ retNode = child;
+ break;
+ }
+ }
+ LOG.exiting("TaskNodeImpl", "findChildTask", getQualifiedName() + retNode);
+ return retNode;
+ }
+
+ private boolean parentActive() {
+ LOG.entering("TaskNodeImpl", "parentActive", getQualifiedName());
+ if (isRoot) {
+ LOG.exiting("TaskNodeImpl", "parentActive", Arrays.toString(new Object[]{true, getQualifiedName(), "I am root. Will never have parent. So signalling active"}));
+ return true;
+ }
+ if (isNeighborActive(parent.getTaskId())) {
+ LOG.exiting("TaskNodeImpl", "parentActive", Arrays.toString(new Object[]{true, getQualifiedName(), parent, " is an active neghbor"}));
+ return true;
+ }
+ LOG.exiting("TaskNodeImpl", "parentActive", getQualifiedName() + "Neither root Nor is " + parent + " an active neghbor");
+ return false;
+ }
+
+ private boolean activeNeighborOfParent() {
+ LOG.entering("TaskNodeImpl", "activeNeighborOfParent", getQualifiedName());
+ if (isRoot) {
+ LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(), "I am root. Will never have parent. So signalling active"}));
+ return true;
+ }
+ if (parent.isNeighborActive(taskId)) {
+ LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(), "I am an active neighbor of parent ", parent}));
+ return true;
+ }
+ LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{false, getQualifiedName(), "Neither is parent null Nor am I an active neighbor of parent ", parent}));
+ return false;
+ }
+
+ private boolean allChildrenActive() {
+ LOG.entering("TaskNodeImpl", "allChildrenActive", getQualifiedName());
+ for (final TaskNode child : children) {
+ final String childId = child.getTaskId();
+ if (child.isRunning() && !isNeighborActive(childId)) {
+ LOG.exiting("TaskNodeImpl", "allChildrenActive", Arrays.toString(new Object[]{false, getQualifiedName(), childId, " not active yet"}));
+ return false;
+ }
+ }
+ LOG.exiting("TaskNodeImpl", "allChildrenActive", Arrays.toString(new Object[]{true, getQualifiedName(), "All children active"}));
+ return true;
+ }
+
+ private boolean activeNeighborOfAllChildren() {
+ LOG.entering("TaskNodeImpl", "activeNeighborOfAllChildren", getQualifiedName());
+ for (final TaskNode child : children) {
+ if (child.isRunning() && !child.isNeighborActive(taskId)) {
+ LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", Arrays.toString(new Object[]{false, getQualifiedName(), "Not an active neighbor of child ", child}));
+ return false;
+ }
+ }
+ LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", Arrays.toString(new Object[]{true, getQualifiedName(), "Active neighbor of all children"}));
+ return true;
+ }
+
+ @Override
+ public void waitForTopologySetupOrFailure() {
+ LOG.entering("TaskNodeImpl", "waitForTopologySetupOrFailure", getQualifiedName());
+ taskNodeStatus.waitForTopologySetup();
+ LOG.exiting("TaskNodeImpl", "waitForTopologySetupOrFailure", getQualifiedName());
+ }
+
+ @Override
+ public boolean hasChanges() {
+ LOG.entering("TaskNodeImpl", "hasChanges", getQualifiedName());
+ final boolean changes = taskNodeStatus.hasChanges();
+ LOG.exiting("TaskNodeImpl", "hasChanges", getQualifiedName() + changes);
+ return changes;
+ }
+
+ @Override
+ public int getVersion() {
+ return version.get();
+ }
+
+ @Override
+ public int hashCode() {
+ int r = taskId.hashCode();
+ r = 31 * r + version.get();
+ return r;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj != this) {
+ if (obj instanceof TaskNodeImpl) {
+ final TaskNodeImpl that = (TaskNodeImpl) obj;
+ if (this.taskId.equals(that.taskId) && this.version.get() == that.version.get()) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java
new file mode 100644
index 0000000..3f904e2
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.io.network.group.api.driver.TaskNode;
+import org.apache.reef.io.network.group.api.driver.TaskNodeStatus;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.utils.ConcurrentCountingMap;
+import org.apache.reef.io.network.group.impl.utils.CountingMap;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage.Type;
+import org.apache.reef.tang.annotations.Name;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class TaskNodeStatusImpl implements TaskNodeStatus {
+
+ private static final Logger LOG = Logger.getLogger(TaskNodeStatusImpl.class.getName());
+
+ private final ConcurrentCountingMap<Type, String> statusMap = new ConcurrentCountingMap<>();
+ private final Class<? extends Name<String>> groupName;
+ private final Class<? extends Name<String>> operName;
+ private final String taskId;
+ private final Set<String> activeNeighbors = new HashSet<>();
+ private final CountingMap<String> neighborStatus = new CountingMap<>();
+ private final AtomicBoolean updatingTopo = new AtomicBoolean(false);
+ private final Object topoUpdateStageLock = new Object();
+ private final Object topoSetupSentLock = new Object();
+ private final TaskNode node;
+
+ public TaskNodeStatusImpl(final Class<? extends Name<String>> groupName,
+ final Class<? extends Name<String>> operName, final String taskId, final TaskNode node) {
+ this.groupName = groupName;
+ this.operName = operName;
+ this.taskId = taskId;
+ this.node = node;
+ }
+
+ private boolean isDeadMsg(final Type msgAcked) {
+ return msgAcked == Type.ParentDead || msgAcked == Type.ChildDead;
+ }
+
+ private boolean isAddMsg(final Type msgAcked) {
+ return msgAcked == Type.ParentAdd || msgAcked == Type.ChildAdd;
+ }
+
+ private Type getAckedMsg(final Type msgType) {
+ switch (msgType) {
+ case ParentAdded:
+ return Type.ParentAdd;
+ case ChildAdded:
+ return Type.ChildAdd;
+ case ParentRemoved:
+ return Type.ParentDead;
+ case ChildRemoved:
+ return Type.ChildDead;
+ default:
+ return msgType;
+ }
+ }
+
+ private void chkIamActiveToSendTopoSetup(final Type msgDealt) {
+ LOG.entering("TaskNodeStatusImpl", "chkAndSendTopoSetup", new Object[]{getQualifiedName(), msgDealt});
+ if (statusMap.isEmpty()) {
+ LOG.finest(getQualifiedName() + "Empty status map.");
+ node.checkAndSendTopologySetupMessage();
+ } else {
+ LOG.finest(getQualifiedName() + "Status map non-empty" + statusMap);
+ }
+ LOG.exiting("TaskNodeStatusImpl", "chkAndSendTopoSetup", getQualifiedName() + msgDealt);
+ }
+
+ @Override
+ public void onTopologySetupMessageSent() {
+ LOG.entering("TaskNodeStatusImpl", "onTopologySetupMessageSent", getQualifiedName());
+ neighborStatus.clear();
+ synchronized (topoSetupSentLock) {
+ topoSetupSentLock.notifyAll();
+ }
+ LOG.exiting("TaskNodeStatusImpl", "onTopologySetupMessageSent", getQualifiedName());
+ }
+
+ @Override
+ public boolean isActive(final String neighborId) {
+ LOG.entering("TaskNodeStatusImpl", "isActive", new Object[]{getQualifiedName(), neighborId});
+ final boolean contains = activeNeighbors.contains(neighborId);
+ LOG.exiting("TaskNodeStatusImpl", "isActive", getQualifiedName() + contains);
+ return contains;
+ }
+
+ /**
+ * This needs to happen in line rather than in a stage because we need to note
+ * the messages we send to the tasks before we start processing msgs from the
+ * nodes.(Acks & Topology msgs)
+ */
+ @Override
+ public void expectAckFor(final Type msgType, final String srcId) {
+ LOG.entering("TaskNodeStatusImpl", "expectAckFor", new Object[]{getQualifiedName(), msgType, srcId});
+ LOG.finest(getQualifiedName() + "Adding " + srcId + " to sources");
+ statusMap.add(msgType, srcId);
+ LOG.exiting("TaskNodeStatusImpl", "expectAckFor", getQualifiedName() + "Sources from which ACKs for " + msgType + " are expected: " + statusMap.get(msgType));
+ }
+
+ @Override
+ public void clearStateAndReleaseLocks() {
+ LOG.entering("TaskNodeStatusImpl", "clearStateAndReleaseLocks", getQualifiedName());
+ statusMap.clear();
+ activeNeighbors.clear();
+ neighborStatus.clear();
+ updatingTopo.compareAndSet(true, false);
+ synchronized (topoSetupSentLock) {
+ topoSetupSentLock.notifyAll();
+ }
+ synchronized (topoUpdateStageLock) {
+ topoUpdateStageLock.notifyAll();
+ }
+ LOG.exiting("TaskNodeStatusImpl", "clearStateAndReleaseLocks", getQualifiedName());
+ }
+
+ @Override
+ public void updateFailureOf(final String taskId) {
+ LOG.entering("TaskNodeStatusImpl", "updateFailureOf", new Object[]{getQualifiedName(), taskId});
+ activeNeighbors.remove(taskId);
+ neighborStatus.remove(taskId);
+ LOG.exiting("TaskNodeStatusImpl", "updateFailureOf", getQualifiedName());
+ }
+
+ @Override
+ public void processAcknowledgement(final GroupCommunicationMessage gcm) {
+ LOG.entering("TaskNodeStatusImpl", "processMsg", new Object[]{getQualifiedName(), gcm});
+ final String self = gcm.getSrcid();
+ final Type msgType = gcm.getType();
+ final Type msgAcked = getAckedMsg(msgType);
+ final String sourceId = gcm.getDestid();
+ switch (msgType) {
+ case TopologySetup:
+ synchronized (topoUpdateStageLock) {
+ if (!updatingTopo.compareAndSet(true, false)) {
+ LOG.fine(getQualifiedName() + "Was expecting updateTopo to be true but it was false");
+ }
+ topoUpdateStageLock.notifyAll();
+ }
+ break;
+ case ParentAdded:
+ case ChildAdded:
+ case ParentRemoved:
+ case ChildRemoved:
+ processNeighborAcks(gcm, msgType, msgAcked, sourceId);
+ break;
+
+ default:
+ LOG.fine(getQualifiedName() + "Non ACK msg " + gcm.getType() + " for " + gcm.getDestid() + " unexpected");
+ break;
+ }
+ LOG.exiting("TaskNodeStatusImpl", "processMsg", getQualifiedName());
+ }
+
+ private void processNeighborAcks(final GroupCommunicationMessage gcm, final Type msgType, final Type msgAcked,
+ final String sourceId) {
+ LOG.entering("TaskNodeStatusImpl", "processNeighborAcks", getQualifiedName() + gcm);
+ if (statusMap.containsKey(msgAcked)) {
+ if (statusMap.contains(msgAcked, sourceId)) {
+ statusMap.remove(msgAcked, sourceId);
+ updateNeighborStatus(msgAcked, sourceId);
+ checkNeighborActiveToSendTopoSetup(sourceId);
+ chkIamActiveToSendTopoSetup(msgAcked);
+ } else {
+ LOG.fine(getQualifiedName() + "NodeStatusMsgProcessorStage Got " + msgType + " from a source(" + sourceId
+ + ") to whom ChildAdd was not sent. "
+ + "Perhaps reset during failure. If context not indicative use ***CAUTION***");
+ }
+ } else {
+ LOG.fine(getQualifiedName() + "NodeStatusMsgProcessorStage There were no " + msgAcked
+ + " msgs sent in the previous update cycle. "
+ + "Perhaps reset during failure. If context not indicative use ***CAUTION***");
+ }
+ LOG.exiting("TaskNodeStatusImpl", "processNeighborAcks", getQualifiedName() + gcm);
+ }
+
+ private void checkNeighborActiveToSendTopoSetup(final String sourceId) {
+ LOG.entering("TaskNodeStatusImpl", "checkNeighborActiveToSendTopoSetup", new Object[]{getQualifiedName(),
+ sourceId});
+ if (statusMap.notContains(sourceId)) {
+ //All msgs corresponding to sourceId have been ACKed
+ if (neighborStatus.get(sourceId) > 0) {
+ activeNeighbors.add(sourceId);
+ node.checkAndSendTopologySetupMessageFor(sourceId);
+ } else {
+ LOG.finest(getQualifiedName() + sourceId + " is not a neighbor anymore");
+ }
+ } else {
+ LOG.finest(getQualifiedName() + "Not done processing " + sourceId + " acks yet. So it is still inactive");
+ }
+ LOG.exiting("TaskNodeStatusImpl", "checkNeighborActiveToSendTopoSetup", getQualifiedName() + sourceId);
+ }
+
+ private void updateNeighborStatus(final Type msgAcked, final String sourceId) {
+ LOG.entering("TaskNodeStatusImpl", "updateNeighborStatus", new Object[]{getQualifiedName(), msgAcked, sourceId});
+ if (isAddMsg(msgAcked)) {
+ neighborStatus.add(sourceId);
+ } else if (isDeadMsg(msgAcked)) {
+ neighborStatus.remove(sourceId);
+ } else {
+ throw new RuntimeException("Can only deal with Neigbor ACKs while I received " + msgAcked + " from " + sourceId);
+ }
+ LOG.exiting("TaskNodeStatusImpl", "updateNeighborStatus", new Object[]{getQualifiedName(), msgAcked, sourceId});
+ }
+
+ @Override
+ public void updatingTopology() {
+ LOG.entering("TaskNodeStatusImpl", "updatingTopology", getQualifiedName());
+ final boolean topoBeingUpdated = !updatingTopo.compareAndSet(false, true);
+ if (topoBeingUpdated) {
+ throw new RuntimeException(getQualifiedName() + "Was expecting updateTopo to be false but it was true");
+ }
+ LOG.exiting("TaskNodeStatusImpl", "updatingTopology", getQualifiedName());
+ }
+
+ private String getQualifiedName() {
+ return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":(" + taskId + "," + node.getVersion() + ") - ";
+ }
+
+ @Override
+ public boolean hasChanges() {
+ LOG.entering("TaskNodeStatusImpl", "hasChanges", getQualifiedName());
+ final boolean notEmpty = !statusMap.isEmpty();
+ LOG.exiting("TaskNodeStatusImpl", "hasChanges", getQualifiedName() + notEmpty);
+ return notEmpty;
+ }
+
+ @Override
+ public void waitForTopologySetup() {
+ LOG.entering("TaskNodeStatusImpl", "waitForTopologySetup", getQualifiedName());
+ LOG.finest("Waiting to acquire topoUpdateStageLock");
+ synchronized (topoUpdateStageLock) {
+ LOG.finest(getQualifiedName() + "Acquired topoUpdateStageLock. updatingTopo: " + updatingTopo.get());
+ while (updatingTopo.get() && node.isRunning()) {
+ try {
+ LOG.finest(getQualifiedName() + "Waiting on topoUpdateStageLock");
+ topoUpdateStageLock.wait();
+ } catch (final InterruptedException e) {
+ throw new RuntimeException("InterruptedException in NodeTopologyUpdateWaitStage "
+ + "while waiting for receiving TopologySetup", e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskState.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskState.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskState.java
new file mode 100644
index 0000000..8404d89
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskState.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+public enum TaskState {
+ NOT_STARTED, RUNNING, FAILED;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedEvaluatorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedEvaluatorHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedEvaluatorHandler.java
new file mode 100644
index 0000000..228cf4b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedEvaluatorHandler.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.logging.Logger;
+
+public class TopologyFailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
+
+ private static final Logger LOG = Logger.getLogger(TopologyFailedEvaluatorHandler.class.getName());
+
+
+ private final CommunicationGroupDriverImpl communicationGroupDriverImpl;
+
+ public TopologyFailedEvaluatorHandler(final CommunicationGroupDriverImpl communicationGroupDriverImpl) {
+ this.communicationGroupDriverImpl = communicationGroupDriverImpl;
+ }
+
+ @Override
+ public void onNext(final FailedEvaluator failedEvaluator) {
+ final String failedEvaluatorId = failedEvaluator.getId();
+ LOG.entering("TopologyFailedEvaluatorHandler", "onNext", failedEvaluatorId);
+ if (failedEvaluator.getFailedTask().isPresent()) {
+ final String failedTaskId = failedEvaluator.getFailedTask().get().getId();
+ LOG.finest("Failed Evaluator contains a failed task: " + failedTaskId);
+ communicationGroupDriverImpl.failTask(failedTaskId);
+ communicationGroupDriverImpl.removeTask(failedTaskId);
+ }
+ LOG.exiting("TopologyFailedEvaluatorHandler", "onNext", failedEvaluatorId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedTaskHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedTaskHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedTaskHandler.java
new file mode 100644
index 0000000..a7ce7f7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedTaskHandler.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.logging.Logger;
+
+public class TopologyFailedTaskHandler implements EventHandler<FailedTask> {
+
+ private static final Logger LOG = Logger.getLogger(TopologyFailedTaskHandler.class.getName());
+
+
+ private final CommunicationGroupDriverImpl communicationGroupDriverImpl;
+
+ public TopologyFailedTaskHandler(final CommunicationGroupDriverImpl communicationGroupDriverImpl) {
+ this.communicationGroupDriverImpl = communicationGroupDriverImpl;
+ }
+
+ @Override
+ public void onNext(final FailedTask failedTask) {
+ final String failedTaskId = failedTask.getId();
+ LOG.entering("TopologyFailedTaskHandler", "onNext", failedTaskId);
+ communicationGroupDriverImpl.failTask(failedTaskId);
+ LOG.exiting("TopologyFailedTaskHandler", "onNext", failedTaskId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyMessageHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyMessageHandler.java
new file mode 100644
index 0000000..e651aa7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyMessageHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.logging.Logger;
+
+public class TopologyMessageHandler implements EventHandler<GroupCommunicationMessage> {
+
+ private static final Logger LOG = Logger.getLogger(TopologyMessageHandler.class.getName());
+
+
+ private final CommunicationGroupDriverImpl communicationGroupDriverImpl;
+
+ public TopologyMessageHandler(final CommunicationGroupDriverImpl communicationGroupDriverImpl) {
+ this.communicationGroupDriverImpl = communicationGroupDriverImpl;
+ }
+
+ @Override
+ public void onNext(final GroupCommunicationMessage msg) {
+ LOG.entering("TopologyMessageHandler", "onNext", msg);
+ communicationGroupDriverImpl.processMsg(msg);
+ LOG.exiting("TopologyMessageHandler", "onNext", msg);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyRunningTaskHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyRunningTaskHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyRunningTaskHandler.java
new file mode 100644
index 0000000..d6e0fae
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyRunningTaskHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.logging.Logger;
+
+public class TopologyRunningTaskHandler implements EventHandler<RunningTask> {
+
+ private static final Logger LOG = Logger.getLogger(TopologyRunningTaskHandler.class.getName());
+
+ private final CommunicationGroupDriverImpl communicationGroupDriverImpl;
+
+ public TopologyRunningTaskHandler(final CommunicationGroupDriverImpl communicationGroupDriverImpl) {
+ this.communicationGroupDriverImpl = communicationGroupDriverImpl;
+ }
+
+ @Override
+ public void onNext(final RunningTask runningTask) {
+ final String runningTaskId = runningTask.getId();
+ LOG.entering("TopologyRunningTaskHandler", "onNext", runningTaskId);
+ communicationGroupDriverImpl.runTask(runningTaskId);
+ LOG.exiting("TopologyRunningTaskHandler", "onNext", runningTaskId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
new file mode 100644
index 0000000..3905d47
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.io.network.group.api.driver.TaskNode;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ *
+ */
+public class TopologyUpdateWaitHandler implements EventHandler<List<TaskNode>> {
+
+ private static final Logger LOG = Logger.getLogger(TopologyUpdateWaitHandler.class.getName());
+ private final EStage<GroupCommunicationMessage> senderStage;
+ private final Class<? extends Name<String>> groupName;
+ private final Class<? extends Name<String>> operName;
+ private final String driverId;
+ private final int driverVersion;
+ private final String dstId;
+ private final int dstVersion;
+ private final String qualifiedName;
+
+
+ /**
+ * The handler will wait for all nodes to acquire topoLock
+ * and send TopologySetup msg. Then it will send TopologyUpdated
+ * msg. However, any local topology changes are not in effect
+ * till driver sends TopologySetup once statusMap is emptied
+ * The operations in the tasks that have topology changes will
+ * wait for this. However other tasks that do not have any changes
+ * will continue their regular operation
+ */
+ public TopologyUpdateWaitHandler(final EStage<GroupCommunicationMessage> senderStage,
+ final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operName,
+ final String driverId, final int driverVersion, final String dstId, final int dstVersion,
+ final String qualifiedName) {
+ super();
+ this.senderStage = senderStage;
+ this.groupName = groupName;
+ this.operName = operName;
+ this.driverId = driverId;
+ this.driverVersion = driverVersion;
+ this.dstId = dstId;
+ this.dstVersion = dstVersion;
+ this.qualifiedName = qualifiedName;
+ }
+
+
+ @Override
+ public void onNext(final List<TaskNode> nodes) {
+ LOG.entering("TopologyUpdateWaitHandler", "onNext", new Object[]{qualifiedName, nodes});
+
+ for (final TaskNode node : nodes) {
+ LOG.fine(qualifiedName + "Waiting for " + node + " to enter TopologyUdate phase");
+ node.waitForTopologySetupOrFailure();
+ if (node.isRunning()) {
+ LOG.fine(qualifiedName + node + " is in TopologyUpdate phase");
+ } else {
+ LOG.fine(qualifiedName + node + " has failed");
+ }
+ }
+ LOG.finest(qualifiedName + "NodeTopologyUpdateWaitStage All to be updated nodes " + "have received TopologySetup");
+ LOG.fine(qualifiedName + "All affected parts of the topology are in TopologyUpdate phase. Will send a note to ("
+ + dstId + "," + dstVersion + ")");
+ senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated, driverId, driverVersion, dstId,
+ dstVersion, Utils.EmptyByteArr));
+ LOG.exiting("TopologyUpdateWaitHandler", "onNext", qualifiedName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
new file mode 100644
index 0000000..b1695a9
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.io.network.group.api.operators.GroupCommOperator;
+import org.apache.reef.io.network.group.api.GroupChanges;
+import org.apache.reef.io.network.group.api.config.OperatorSpec;
+import org.apache.reef.io.network.group.api.driver.TaskNode;
+import org.apache.reef.io.network.group.api.driver.Topology;
+import org.apache.reef.io.network.group.impl.GroupChangesCodec;
+import org.apache.reef.io.network.group.impl.GroupChangesImpl;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.ReduceFunctionParam;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.operators.BroadcastReceiver;
+import org.apache.reef.io.network.group.impl.operators.BroadcastSender;
+import org.apache.reef.io.network.group.impl.operators.ReduceReceiver;
+import org.apache.reef.io.network.group.impl.operators.ReduceSender;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.SingleThreadStage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.logging.Logger;
+
+/**
+ * Implements a tree topology with the specified Fan Out
+ */
+public class TreeTopology implements Topology {
+
+ private static final Logger LOG = Logger.getLogger(TreeTopology.class.getName());
+
+ private final EStage<GroupCommunicationMessage> senderStage;
+ private final Class<? extends Name<String>> groupName;
+ private final Class<? extends Name<String>> operName;
+ private final String driverId;
+ private String rootId;
+ private OperatorSpec operatorSpec;
+
+ private TaskNode root;
+ private TaskNode logicalRoot;
+ private TaskNode prev;
+ private final int fanOut;
+
+ private final ConcurrentMap<String, TaskNode> nodes = new ConcurrentSkipListMap<>();
+ private final ConfigurationSerializer confSer = new AvroConfigurationSerializer();
+
+
+ public TreeTopology(final EStage<GroupCommunicationMessage> senderStage,
+ final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operatorName,
+ final String driverId, final int numberOfTasks, final int fanOut) {
+ this.senderStage = senderStage;
+ this.groupName = groupName;
+ this.operName = operatorName;
+ this.driverId = driverId;
+ this.fanOut = fanOut;
+ LOG.config(getQualifiedName() + "Tree Topology running with a fan-out of " + fanOut);
+ }
+
+ @Override
+ public void setRootTask(final String rootId) {
+ LOG.entering("TreeTopology", "setRootTask", new Object[]{getQualifiedName(), rootId});
+ this.rootId = rootId;
+ LOG.exiting("TreeTopology", "setRootTask", getQualifiedName() + rootId);
+ }
+
+ @Override
+ public String getRootId() {
+ LOG.entering("TreeTopology", "getRootId", getQualifiedName());
+ LOG.exiting("TreeTopology", "getRootId", getQualifiedName() + rootId);
+ return rootId;
+ }
+
+ @Override
+ public void setOperatorSpecification(final OperatorSpec spec) {
+ LOG.entering("TreeTopology", "setOperSpec", new Object[]{getQualifiedName(), spec});
+ this.operatorSpec = spec;
+ LOG.exiting("TreeTopology", "setOperSpec", getQualifiedName() + spec);
+ }
+
+ @Override
+ public Configuration getTaskConfiguration(final String taskId) {
+ LOG.entering("TreeTopology", "getTaskConfig", new Object[]{getQualifiedName(), taskId});
+ final TaskNode taskNode = nodes.get(taskId);
+ if (taskNode == null) {
+ throw new RuntimeException(getQualifiedName() + taskId + " does not exist");
+ }
+
+ final int version = getNodeVersion(taskId);
+ final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder();
+ jcb.bindNamedParameter(DataCodec.class, operatorSpec.getDataCodecClass());
+ jcb.bindNamedParameter(TaskVersion.class, Integer.toString(version));
+ if (operatorSpec instanceof BroadcastOperatorSpec) {
+ final BroadcastOperatorSpec broadcastOperatorSpec = (BroadcastOperatorSpec) operatorSpec;
+ if (taskId.equals(broadcastOperatorSpec.getSenderId())) {
+ jcb.bindImplementation(GroupCommOperator.class, BroadcastSender.class);
+ } else {
+ jcb.bindImplementation(GroupCommOperator.class, BroadcastReceiver.class);
+ }
+ } else if (operatorSpec instanceof ReduceOperatorSpec) {
+ final ReduceOperatorSpec reduceOperatorSpec = (ReduceOperatorSpec) operatorSpec;
+ jcb.bindNamedParameter(ReduceFunctionParam.class, reduceOperatorSpec.getRedFuncClass());
+ if (taskId.equals(reduceOperatorSpec.getReceiverId())) {
+ jcb.bindImplementation(GroupCommOperator.class, ReduceReceiver.class);
+ } else {
+ jcb.bindImplementation(GroupCommOperator.class, ReduceSender.class);
+ }
+ }
+ final Configuration retConf = jcb.build();
+ LOG.exiting("TreeTopology", "getTaskConfig", getQualifiedName() + confSer.toString(retConf));
+ return retConf;
+ }
+
+ @Override
+ public int getNodeVersion(final String taskId) {
+ LOG.entering("TreeTopology", "getNodeVersion", new Object[]{getQualifiedName(), taskId});
+ final TaskNode node = nodes.get(taskId);
+ if (node == null) {
+ throw new RuntimeException(getQualifiedName() + taskId + " is not available on the nodes map");
+ }
+ final int version = node.getVersion();
+ LOG.exiting("TreeTopology", "getNodeVersion", getQualifiedName() + " " + taskId + " " + version);
+ return version;
+ }
+
+ @Override
+ public void removeTask(final String taskId) {
+ LOG.entering("TreeTopology", "removeTask", new Object[]{getQualifiedName(), taskId});
+ if (!nodes.containsKey(taskId)) {
+ LOG.fine("Trying to remove a non-existent node in the task graph");
+ LOG.exiting("TreeTopology", "removeTask", getQualifiedName());
+ return;
+ }
+ if (taskId.equals(rootId)) {
+ unsetRootNode(taskId);
+ } else {
+ removeChild(taskId);
+ }
+ LOG.exiting("TreeTopology", "removeTask", getQualifiedName() + taskId);
+ }
+
+ @Override
+ public void addTask(final String taskId) {
+ LOG.entering("TreeTopology", "addTask", new Object[]{getQualifiedName(), taskId});
+ if (nodes.containsKey(taskId)) {
+ LOG.fine("Got a request to add a task that is already in the graph. " +
+ "We need to block this request till the delete finishes. ***CAUTION***");
+ }
+
+ if (taskId.equals(rootId)) {
+ setRootNode(taskId);
+ } else {
+ addChild(taskId);
+ }
+ prev = nodes.get(taskId);
+ LOG.exiting("TreeTopology", "addTask", getQualifiedName() + taskId);
+ }
+
+ private void addChild(final String taskId) {
+ LOG.entering("TreeTopology", "addChild", new Object[]{getQualifiedName(), taskId});
+ LOG.finest(getQualifiedName() + "Adding leaf " + taskId);
+ final TaskNode node = new TaskNodeImpl(senderStage, groupName, operName, taskId, driverId, false);
+ if (logicalRoot != null) {
+ addTaskNode(node);
+ }
+ nodes.put(taskId, node);
+ LOG.exiting("TreeTopology", "addChild", getQualifiedName() + taskId);
+ }
+
+ private void addTaskNode(final TaskNode node) {
+ LOG.entering("TreeTopology", "addTaskNode", new Object[]{getQualifiedName(), node});
+ if (logicalRoot.getNumberOfChildren() >= this.fanOut) {
+ logicalRoot = logicalRoot.successor();
+ }
+ node.setParent(logicalRoot);
+ logicalRoot.addChild(node);
+ prev.setSibling(node);
+ LOG.exiting("TreeTopology", "addTaskNode", getQualifiedName() + node);
+ }
+
+ private void removeChild(final String taskId) {
+ LOG.entering("TreeTopology", "removeChild", new Object[]{getQualifiedName(), taskId});
+ if (root != null) {
+ root.removeChild(nodes.get(taskId));
+ }
+ nodes.remove(taskId);
+ LOG.exiting("TreeTopology", "removeChild", getQualifiedName() + taskId);
+ }
+
+ private void setRootNode(final String rootId) {
+ LOG.entering("TreeTopology", "setRootNode", new Object[]{getQualifiedName(), rootId});
+ final TaskNode node = new TaskNodeImpl(senderStage, groupName, operName, rootId, driverId, true);
+ this.root = node;
+ this.logicalRoot = this.root;
+ this.prev = this.root;
+
+ for (final Map.Entry<String, TaskNode> nodeEntry : nodes.entrySet()) {
+ final TaskNode leaf = nodeEntry.getValue();
+ addTaskNode(leaf);
+ this.prev = leaf;
+ }
+ nodes.put(rootId, root);
+ LOG.exiting("TreeTopology", "setRootNode", getQualifiedName() + rootId);
+ }
+
+ private void unsetRootNode(final String taskId) {
+ LOG.entering("TreeTopology", "unsetRootNode", new Object[]{getQualifiedName(), taskId});
+ nodes.remove(rootId);
+
+ for (final Map.Entry<String, TaskNode> nodeEntry : nodes.entrySet()) {
+ final String id = nodeEntry.getKey();
+ final TaskNode leaf = nodeEntry.getValue();
+ leaf.setParent(null);
+ }
+ LOG.exiting("TreeTopology", "unsetRootNode", getQualifiedName() + taskId);
+ }
+
+ @Override
+ public void onFailedTask(final String taskId) {
+ LOG.entering("TreeTopology", "onFailedTask", new Object[]{getQualifiedName(), taskId});
+ final TaskNode taskNode = nodes.get(taskId);
+ if (taskNode == null) {
+ throw new RuntimeException(getQualifiedName() + taskId + " does not exist");
+ }
+ taskNode.onFailedTask();
+ LOG.exiting("TreeTopology", "onFailedTask", getQualifiedName() + taskId);
+ }
+
+ @Override
+ public void onRunningTask(final String taskId) {
+ LOG.entering("TreeTopology", "onRunningTask", new Object[]{getQualifiedName(), taskId});
+ final TaskNode taskNode = nodes.get(taskId);
+ if (taskNode == null) {
+ throw new RuntimeException(getQualifiedName() + taskId + " does not exist");
+ }
+ taskNode.onRunningTask();
+ LOG.exiting("TreeTopology", "onRunningTask", getQualifiedName() + taskId);
+ }
+
+ @Override
+ public void onReceiptOfMessage(final GroupCommunicationMessage msg) {
+ LOG.entering("TreeTopology", "onReceiptOfMessage", new Object[]{getQualifiedName(), msg});
+ switch (msg.getType()) {
+ case TopologyChanges:
+ onTopologyChanges(msg);
+ break;
+ case UpdateTopology:
+ onUpdateTopology(msg);
+ break;
+
+ default:
+ nodes.get(msg.getSrcid()).onReceiptOfAcknowledgement(msg);
+ break;
+ }
+ LOG.exiting("TreeTopology", "onReceiptOfMessage", getQualifiedName() + msg);
+ }
+
+ private void onUpdateTopology(final GroupCommunicationMessage msg) {
+ LOG.entering("TreeTopology", "onUpdateTopology", new Object[]{getQualifiedName(), msg});
+ LOG.fine(getQualifiedName() + "Update affected parts of Topology");
+ final String dstId = msg.getSrcid();
+ final int version = getNodeVersion(dstId);
+
+ LOG.finest(getQualifiedName() + "Creating NodeTopologyUpdateWaitStage to wait on nodes to be updated");
+ final EventHandler<List<TaskNode>> topoUpdateWaitHandler = new TopologyUpdateWaitHandler(senderStage, groupName,
+ operName, driverId, 0,
+ dstId, version,
+ getQualifiedName());
+ final EStage<List<TaskNode>> nodeTopologyUpdateWaitStage = new SingleThreadStage<>("NodeTopologyUpdateWaitStage",
+ topoUpdateWaitHandler,
+ nodes.size());
+
+ final List<TaskNode> toBeUpdatedNodes = new ArrayList<>(nodes.size());
+ LOG.finest(getQualifiedName() + "Checking which nodes need to be updated");
+ for (final TaskNode node : nodes.values()) {
+ if (node.isRunning() && node.hasChanges() && node.resetTopologySetupSent()) {
+ toBeUpdatedNodes.add(node);
+ }
+ }
+ for (final TaskNode node : toBeUpdatedNodes) {
+ node.updatingTopology();
+ LOG.fine(getQualifiedName() + "Asking " + node + " to UpdateTopology");
+ senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, driverId, 0, node.getTaskId(),
+ node.getVersion(), Utils.EmptyByteArr));
+ }
+ nodeTopologyUpdateWaitStage.onNext(toBeUpdatedNodes);
+ LOG.exiting("TreeTopology", "onUpdateTopology", getQualifiedName() + msg);
+ }
+
+ private void onTopologyChanges(final GroupCommunicationMessage msg) {
+ LOG.entering("TreeTopology", "onTopologyChanges", new Object[]{getQualifiedName(), msg});
+ LOG.fine(getQualifiedName() + "Check TopologyChanges");
+ final String dstId = msg.getSrcid();
+ boolean hasTopologyChanged = false;
+ LOG.finest(getQualifiedName() + "Checking which nodes need to be updated");
+ for (final TaskNode node : nodes.values()) {
+ if (!node.isRunning() || node.hasChanges()) {
+ hasTopologyChanged = true;
+ break;
+ }
+ }
+ final GroupChanges changes = new GroupChangesImpl(hasTopologyChanged);
+ final Codec<GroupChanges> changesCodec = new GroupChangesCodec();
+ LOG.fine(getQualifiedName() + "TopologyChanges: " + changes);
+ senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, driverId, 0, dstId, getNodeVersion(dstId),
+ changesCodec.encode(changes)));
+ LOG.exiting("TreeTopology", "onTopologyChanges", getQualifiedName() + msg);
+ }
+
+ private String getQualifiedName() {
+ return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + " - ";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/package-info.java
new file mode 100644
index 0000000..82424ce
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/package-info.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * This package contains the implementation of the driver side of the
+ * Group Communication Service using the tree/flat topology. The Service
+ * can be configured with many named Communication Groups and each
+ * Communication Group can be configured with many named operators. This
+ * configuration is typically done on the GroupCommDriver object injected
+ * into the driver. During this specification only the root nodes are
+ * specified.
+ *
+ * After the GroupCommDriver is configured the driver would want to submit
+ * tasks with the Communication Groups & their operators configured. To do
+ * that the user has to create a partial task configuration containing his
+ * set of configurations and add it to the relevant communication group.
+ * Based on whether the given task is a Master/Slave different roles for
+ * the operators are configured. Once added, the final Configuration containing
+ * operators and their roles encoded can be obtained by the
+ * CommunicationGroupDriver.getConfiguration() call. The topology is complete
+ * once the minimum number of tasks needed for the group to function have been
+ * added.
+ *
+ * The initial configuration dished out by the service
+ * creates a bunch of tasks that are not connected. Connections are
+ * established when the Tasks start running. Each operator defines its own
+ * topology and can have potentially different root nodes. Each node in the
+ * topology called a TaskNode is a logical representation of a running Task.
+ * Adding a task to a group creates its TaskNode with TaskState NOT_STARTED.
+ *
+ * The driver side of the service plays a major role in setting up the
+ * topology and making sure that topology is set-up only when the parties
+ * involved are ready to participate in the communication. A topology will
+ * not contain parties that are not active. Active status is given to parties
+ * who have acknowledged the presence of their neighbors and who have been
+ * acknowledged by their neighbors as present. The connection between two
+ * parties is initiated by the driver and the driver then expects the parties
+ * to ACK that their end of the connection has been set-up. Once a party has
+ * ACK its end of all connections and all its neighbors ACK the outgoing part
+ * of their connection to this party the driver sends a TopologySetup msg to
+ * indicate that the topology is usable by this party now. The driver also
+ * listens in on failure events and appropriately updates its state so that
+ * it does not wait for ACKs from failed tasks.
+ *
+ * There are two chains of control:
+ * 1. Driver Events (Running/Failed Tasks)
+ * 2. Tasks (Msgs sent by Task side of Group Communication Service)
+ *
+ * All events and msgs are funneled through the CommunicationGroupDriver so
+ * that all the topologies belonging to different operators configured on the
+ * group are in sync. Without this there is possibility of a deadlock between
+ * the tasks and the driver. So there is no finer level locking other than that
+ * in the CommunicationGroupDriver.
+ *
+ * 1. Driver Events
+ * These are routed to all communication groups and each communication group
+ * routes it to all topologies. The topology will then route this event to the
+ * corresponding TaskNode which will process that event. When a task starts
+ * running it is notified of its running neighbors and the running neighbors
+ * are notified of its running. The TaskNodeStatus object keeps track of the
+ * state of the local topology for this TaskNode. What msgs have been sent to
+ * this node that need to be ACKed, the status of its neighbors and whether
+ * this TaskNode is ready to accept data from a neighboring TaskNode when we
+ * ask the neighbor to check if he was only waiting for this TaskNode to ACK
+ * in order to send TopologySetup. So when we are sending (Parent|Child)(Add|Dead)
+ * msgs we first note that we expect an ACK back for this. These ACK expectations
+ * are then deleted if the node fails. Neighbor failures are also updated.
+ * All the msg sending is done by the TaskNode. The TaskNodeStatus is only a
+ * state manager to consult on the status of ACKs and neighbors. This is needed
+ * by the chkAndSendTopSetup logic. These events also send msgs related to failure
+ * of tasks so that any task in the toplogy that waited for a response from the
+ * failed task can move on.
+ *
+ * 2. Tasks
+ * We get ACK msgs from tasks and they update the status of ACK expectations.
+ * Here the TaskNodeStatus acts as a bridge between the initiation of a link
+ * between two parties and the final set-up of the link. Once all ACKs have
+ * been received we ask the TaskNode to check if it is ready to send a
+ * TopologySetup msg. Every ACK can also trigger the chkAndSendTopSetup for
+ * a neighbor.
+ *
+ * The above concerns the topology set-up and fault notiifcations. However, the
+ * other major task that the driver helps with is in updating a topology. When
+ * requested for the update of a Topology using the UpdateTopology msg, the
+ * driver notifies all the parties that have to update their topologies by
+ * sending an UpdateTopology msg to the affected parties. The tasks then try to
+ * enter the UpdateToplogy phase and as soon as they can do(by acquiring a lock)
+ * they respond that they have done so. The driver will wait till all the affected
+ * parties do so and then sends the initiator a msg that Topology has been updated.
+ * The effect of this is that the unaffected regions of the topology can continue
+ * to work normally while the affected regions are healing. The affected regions
+ * heal their (local)topologies using the TopologySetup mechanism described above.
+ * Both update topology and inital set-up use the minimum number of tasks to define
+ * when the set-up is complete.
+ *
+ * The CommGroupDriver class also takes care of minor alterations to event ordering
+ * by the use of locks to coerce the events to occur in a definite way.
+ *
+ */
+package org.apache.reef.io.network.group.impl.driver;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
new file mode 100644
index 0000000..b8dd425
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class BroadcastReceiver<T> implements Broadcast.Receiver<T>, EventHandler<GroupCommunicationMessage> {
+
+ private static final Logger LOG = Logger.getLogger(BroadcastReceiver.class.getName());
+
+ private final Class<? extends Name<String>> groupName;
+ private final Class<? extends Name<String>> operName;
+ private final CommGroupNetworkHandler commGroupNetworkHandler;
+ private final Codec<T> dataCodec;
+ private final NetworkService<GroupCommunicationMessage> netService;
+ private final Sender sender;
+
+ private final OperatorTopology topology;
+
+ private final AtomicBoolean init = new AtomicBoolean(false);
+
+ private final CommunicationGroupServiceClient commGroupClient;
+
+ private final int version;
+
+ @Inject
+ public BroadcastReceiver(@Parameter(CommunicationGroupName.class) final String groupName,
+ @Parameter(OperatorName.class) final String operName,
+ @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
+ @Parameter(DataCodec.class) final Codec<T> dataCodec,
+ @Parameter(DriverIdentifier.class) final String driverId,
+ @Parameter(TaskVersion.class) final int version,
+ final CommGroupNetworkHandler commGroupNetworkHandler,
+ final NetworkService<GroupCommunicationMessage> netService,
+ final CommunicationGroupServiceClient commGroupClient) {
+ super();
+ this.version = version;
+ LOG.finest(operName + " has CommGroupHandler-" + commGroupNetworkHandler.toString());
+ this.groupName = Utils.getClass(groupName);
+ this.operName = Utils.getClass(operName);
+ this.dataCodec = dataCodec;
+ this.commGroupNetworkHandler = commGroupNetworkHandler;
+ this.netService = netService;
+ this.sender = new Sender(this.netService);
+ this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version);
+ this.commGroupNetworkHandler.register(this.operName, this);
+ this.commGroupClient = commGroupClient;
+ }
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public void initialize() throws ParentDeadException {
+ topology.initialize();
+ }
+
+ @Override
+ public Class<? extends Name<String>> getOperName() {
+ return operName;
+ }
+
+ @Override
+ public Class<? extends Name<String>> getGroupName() {
+ return groupName;
+ }
+
+ @Override
+ public String toString() {
+ return "BroadcastReceiver:" + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version;
+ }
+
+ @Override
+ public void onNext(final GroupCommunicationMessage msg) {
+ topology.handle(msg);
+ }
+
+ @Override
+ public T receive() throws NetworkException, InterruptedException {
+ LOG.entering("BroadcastReceiver", "receive", this);
+ LOG.fine("I am " + this);
+
+ if (init.compareAndSet(false, true)) {
+ LOG.fine(this + " Communication group initializing");
+ commGroupClient.initialize();
+ LOG.fine(this + " Communication group initialized");
+ }
+ // I am an intermediate node or leaf.
+
+ final T retVal;
+ // Wait for parent to send
+ LOG.fine(this + " Waiting to receive broadcast");
+ byte[] data;
+ try {
+ data = topology.recvFromParent();
+ // TODO: Should receive the identity element instead of null
+ if (data == null) {
+ LOG.fine(this + " Received null. Perhaps one of my ancestors is dead.");
+ retVal = null;
+ } else {
+ LOG.finest("Using " + dataCodec.getClass().getSimpleName() + " as codec");
+ retVal = dataCodec.decode(data);
+ LOG.finest("Decoded msg successfully");
+ LOG.fine(this + " Received: " + retVal);
+ LOG.finest(this + " Sending to children.");
+ }
+
+ topology.sendToChildren(data, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast);
+ } catch (final ParentDeadException e) {
+ throw new RuntimeException("ParentDeadException", e);
+ }
+ LOG.exiting("BroadcastReceiver", "receive", Arrays.toString(new Object[]{retVal, this}));
+ return retVal;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java
new file mode 100644
index 0000000..6f146e6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class BroadcastSender<T> implements Broadcast.Sender<T>, EventHandler<GroupCommunicationMessage> {
+
+ private static final Logger LOG = Logger.getLogger(BroadcastSender.class.getName());
+
+ private final Class<? extends Name<String>> groupName;
+ private final Class<? extends Name<String>> operName;
+ private final CommGroupNetworkHandler commGroupNetworkHandler;
+ private final Codec<T> dataCodec;
+ private final NetworkService<GroupCommunicationMessage> netService;
+ private final Sender sender;
+
+ private final OperatorTopology topology;
+
+ private final AtomicBoolean init = new AtomicBoolean(false);
+
+ private final CommunicationGroupServiceClient commGroupClient;
+
+ private final int version;
+
+ @Inject
+ public BroadcastSender(@Parameter(CommunicationGroupName.class) final String groupName,
+ @Parameter(OperatorName.class) final String operName,
+ @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
+ @Parameter(DataCodec.class) final Codec<T> dataCodec,
+ @Parameter(DriverIdentifier.class) final String driverId,
+ @Parameter(TaskVersion.class) final int version,
+ final CommGroupNetworkHandler commGroupNetworkHandler,
+ final NetworkService<GroupCommunicationMessage> netService,
+ final CommunicationGroupServiceClient commGroupClient) {
+ super();
+ this.version = version;
+ LOG.finest(operName + "has CommGroupHandler-" + commGroupNetworkHandler.toString());
+ this.groupName = Utils.getClass(groupName);
+ this.operName = Utils.getClass(operName);
+ this.dataCodec = dataCodec;
+ this.commGroupNetworkHandler = commGroupNetworkHandler;
+ this.netService = netService;
+ this.sender = new Sender(this.netService);
+ this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version);
+ this.commGroupNetworkHandler.register(this.operName, this);
+ this.commGroupClient = commGroupClient;
+ }
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public void initialize() throws ParentDeadException {
+ topology.initialize();
+ }
+
+ @Override
+ public Class<? extends Name<String>> getOperName() {
+ return operName;
+ }
+
+ @Override
+ public Class<? extends Name<String>> getGroupName() {
+ return groupName;
+ }
+
+ @Override
+ public String toString() {
+ return "BroadcastSender:" + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version;
+ }
+
+ @Override
+ public void onNext(final GroupCommunicationMessage msg) {
+ topology.handle(msg);
+ }
+
+ @Override
+ public void send(final T element) throws NetworkException, InterruptedException {
+ LOG.entering("BroadcastSender", "send", new Object[]{this, element});
+ LOG.fine("I am " + this);
+
+ if (init.compareAndSet(false, true)) {
+ LOG.fine(this + " Communication group initializing");
+ commGroupClient.initialize();
+ LOG.fine(this + " Communication group initialized");
+ }
+
+ try {
+ LOG.fine(this + " Broadcasting " + element);
+ topology.sendToChildren(dataCodec.encode(element), ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast);
+ } catch (final ParentDeadException e) {
+ throw new RuntimeException("ParentDeadException", e);
+ }
+ LOG.exiting("BroadcastSender", "send", Arrays.toString(new Object[]{this, element}));
+ }
+
+}