You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/03/06 02:55:33 UTC

[3/8] incubator-reef git commit: [REEF-118] Add Shimoga library for elastic group communication.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
new file mode 100644
index 0000000..7483483
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.io.network.group.api.driver.TaskNode;
+import org.apache.reef.io.network.group.api.driver.TaskNodeStatus;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.EStage;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+public class TaskNodeImpl implements TaskNode {
+
+  private static final Logger LOG = Logger.getLogger(TaskNodeImpl.class.getName());
+
+  private final EStage<GroupCommunicationMessage> senderStage;
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final String taskId;
+  private final String driverId;
+
+  private final boolean isRoot;
+  private TaskNode parent;
+  private TaskNode sibling;
+  private final List<TaskNode> children = new ArrayList<>();
+
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  private final AtomicBoolean topoSetupSent = new AtomicBoolean(false);
+
+  private final TaskNodeStatus taskNodeStatus;
+
+  private final AtomicInteger version = new AtomicInteger(0);
+
+  public TaskNodeImpl(final EStage<GroupCommunicationMessage> senderStage,
+                      final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operatorName,
+                      final String taskId, final String driverId, final boolean isRoot) {
+    this.senderStage = senderStage;
+    this.groupName = groupName;
+    this.operName = operatorName;
+    this.taskId = taskId;
+    this.driverId = driverId;
+    this.isRoot = isRoot;
+    taskNodeStatus = new TaskNodeStatusImpl(groupName, operatorName, taskId, this);
+  }
+
+  @Override
+  public void setSibling(final TaskNode leaf) {
+    LOG.entering("TaskNodeImpl", "setSibling", new Object[]{getQualifiedName(), leaf});
+    sibling = leaf;
+    LOG.exiting("TaskNodeImpl", "setSibling", getQualifiedName());
+  }
+
+  @Override
+  public int getNumberOfChildren() {
+    LOG.entering("TaskNodeImpl", "getNumberOfChildren", getQualifiedName());
+    final int size = children.size();
+    LOG.exiting("TaskNodeImpl", "getNumberOfChildren", getQualifiedName() + size);
+    return size;
+  }
+
+  @Override
+  public TaskNode successor() {
+    LOG.entering("TaskNodeImpl", "successor", getQualifiedName());
+    LOG.exiting("TaskNodeImpl", "successor", getQualifiedName() + sibling);
+    return sibling;
+  }
+
+  @Override
+  public String toString() {
+    return "(" + taskId + "," + version.get() + ")";
+  }
+
+  /**
+   * * Methods pertaining to my status change ***
+   */
+  @Override
+  public void onFailedTask() {
+    LOG.entering("TaskNodeImpl", "onFailedTask", getQualifiedName());
+    if (!running.compareAndSet(true, false)) {
+      LOG.fine(getQualifiedName() + "Trying to set failed on an already failed task. Something fishy!!!");
+      LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName() + "Trying to set failed on an already failed task. Something fishy!!!");
+      return;
+    }
+    taskNodeStatus.clearStateAndReleaseLocks();
+    LOG.finest(getQualifiedName() + "Changed status to failed.");
+    LOG.finest(getQualifiedName() + "Resetting topoSetupSent to false");
+    topoSetupSent.set(false);
+    if (parent != null && parent.isRunning()) {
+      parent.onChildDead(taskId);
+    } else {
+      LOG.finest(getQualifiedName() + "Skipping asking parent to process child death");
+    }
+    for (final TaskNode child : children) {
+      if (child.isRunning()) {
+        child.onParentDead();
+      }
+    }
+    final int version = this.version.incrementAndGet();
+    LOG.finest(getQualifiedName() + "Bumping up to version-" + version);
+    LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName());
+  }
+
+  @Override
+  public void onRunningTask() {
+    LOG.entering("TaskNodeImpl", "onRunningTask", getQualifiedName());
+    if (!running.compareAndSet(false, true)) {
+      LOG.fine(getQualifiedName() + "Trying to set running on an already running task. Something fishy!!!");
+      LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName() + "Trying to set running on an already running task. Something fishy!!!");
+      return;
+    }
+    final int version = this.version.get();
+    LOG.finest(getQualifiedName() + "Changed status to running version-" + version);
+    if (parent != null && parent.isRunning()) {
+      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parent.getTaskId(),
+          parent.getVersion(), taskId,
+          version, Utils.EmptyByteArr);
+      taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
+      senderStage.onNext(gcm);
+      parent.onChildRunning(taskId);
+    } else {
+      LOG.finest(getQualifiedName() + "Skipping src add to & for parent");
+    }
+    for (final TaskNode child : children) {
+      if (child.isRunning()) {
+        final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, child.getTaskId(),
+            child.getVersion(), taskId, version,
+            Utils.EmptyByteArr);
+        taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
+        senderStage.onNext(gcm);
+        child.onParentRunning();
+      }
+    }
+    LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName());
+  }
+
+  /**
+   * * Methods pertaining to my status change ends ***
+   */
+
+  @Override
+  public void onParentRunning() {
+    LOG.entering("TaskNodeImpl", "onParentRunning", getQualifiedName());
+    if (parent != null && parent.isRunning()) {
+      final int parentVersion = parent.getVersion();
+      final String parentTaskId = parent.getTaskId();
+      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parentTaskId,
+          parentVersion, taskId, version.get(),
+          Utils.EmptyByteArr);
+      taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
+      senderStage.onNext(gcm);
+    } else {
+      LOG.finer(getQualifiedName() + "Parent was running when I was asked to add him."
+          + " However, he is not active anymore. Returning without sending ParentAdd" + " msg. ***CHECK***");
+    }
+    LOG.exiting("TaskNodeImpl", "onParentRunning", getQualifiedName());
+  }
+
+  @Override
+  public void onParentDead() {
+    LOG.entering("TaskNodeImpl", "onParentDead", getQualifiedName());
+    if (parent != null) {
+      final int parentVersion = parent.getVersion();
+      final String parentTaskId = parent.getTaskId();
+      taskNodeStatus.updateFailureOf(parent.getTaskId());
+      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead, parentTaskId,
+          parentVersion, taskId, version.get(),
+          Utils.EmptyByteArr);
+      taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
+      senderStage.onNext(gcm);
+    } else {
+      throw new RuntimeException(getQualifiedName() + "Don't expect parent to be null. Something wrong");
+    }
+    LOG.exiting("TaskNodeImpl", "onParentDead", getQualifiedName());
+  }
+
+  @Override
+  public void onChildRunning(final String childId) {
+    LOG.entering("TaskNodeImpl", "onChildRunning", new Object[]{getQualifiedName(), childId});
+    final TaskNode childTask = findTask(childId);
+    if (childTask != null && childTask.isRunning()) {
+      final int childVersion = childTask.getVersion();
+      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, childId,
+          childVersion, taskId, version.get(),
+          Utils.EmptyByteArr);
+      taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
+      senderStage.onNext(gcm);
+    } else {
+      LOG.fine(getQualifiedName() + childId + " was running when I was asked to add him."
+          + " However, I can't find a task corresponding to him now."
+          + " Returning without sending ChildAdd msg. ***CHECK***");
+    }
+    LOG.exiting("TaskNodeImpl", "onChildRunning", getQualifiedName() + childId);
+  }
+
+  @Override
+  public void onChildDead(final String childId) {
+    LOG.entering("TaskNodeImpl", "onChildDead", new Object[]{getQualifiedName(), childId});
+    final TaskNode childTask = findChildTask(childId);
+    if (childTask != null) {
+      final int childVersion = childTask.getVersion();
+      taskNodeStatus.updateFailureOf(childId);
+      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, childId,
+          childVersion, taskId, version.get(),
+          Utils.EmptyByteArr);
+      taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
+      senderStage.onNext(gcm);
+    } else {
+      throw new RuntimeException(getQualifiedName() + "Don't expect task for " + childId + " to be null. Something wrong");
+    }
+    LOG.exiting("TaskNodeImpl", "onChildDead", getQualifiedName() + childId);
+  }
+
+  /**
+   * * Methods pertaining to my neighbors status change ends ***
+   */
+
+  @Override
+  public void onReceiptOfAcknowledgement(final GroupCommunicationMessage msg) {
+    LOG.entering("TaskNodeImpl", "onReceiptOfAcknowledgement", new Object[]{getQualifiedName(), msg});
+    taskNodeStatus.processAcknowledgement(msg);
+    LOG.exiting("TaskNodeImpl", "onReceiptOfAcknowledgement", getQualifiedName() + msg);
+  }
+
+  @Override
+  public void updatingTopology() {
+    LOG.entering("TaskNodeImpl", "updatingTopology", getQualifiedName());
+    taskNodeStatus.updatingTopology();
+    LOG.exiting("TaskNodeImpl", "updatingTopology", getQualifiedName());
+  }
+
+  @Override
+  public String getTaskId() {
+    return taskId;
+  }
+
+  @Override
+  public void addChild(final TaskNode child) {
+    LOG.entering("TaskNodeImpl", "addChild", new Object[]{getQualifiedName(), child.getTaskId()});
+    children.add(child);
+    LOG.exiting("TaskNodeImpl", "addChild", getQualifiedName() + child);
+  }
+
+  @Override
+  public void removeChild(final TaskNode child) {
+    LOG.entering("TaskNodeImpl", "removeChild", new Object[]{getQualifiedName(), child.getTaskId()});
+    children.remove(child);
+    LOG.exiting("TaskNodeImpl", "removeChild", getQualifiedName() + child);
+  }
+
+  @Override
+  public void setParent(final TaskNode parent) {
+    LOG.entering("TaskNodeImpl", "setParent", new Object[]{getQualifiedName(), parent});
+    this.parent = parent;
+    LOG.exiting("TaskNodeImpl", "setParent", getQualifiedName() + parent);
+  }
+
+  @Override
+  public boolean isRunning() {
+    LOG.entering("TaskNodeImpl", "isRunning", getQualifiedName());
+    final boolean b = running.get();
+    LOG.exiting("TaskNodeImpl", "isRunning", getQualifiedName() + b);
+    return b;
+  }
+
+  @Override
+  public TaskNode getParent() {
+    LOG.entering("TaskNodeImpl", "getParent", getQualifiedName());
+    LOG.exiting("TaskNodeImpl", "getParent", getQualifiedName() + parent);
+    return parent;
+  }
+
+  private String getQualifiedName() {
+    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":(" + taskId + "," + getVersion() + ") - ";
+  }
+
+  @Override
+  public boolean isNeighborActive(final String neighborId) {
+    LOG.entering("TaskNodeImpl", "isNeighborActive", new Object[]{getQualifiedName(), neighborId});
+    final boolean active = taskNodeStatus.isActive(neighborId);
+    LOG.exiting("TaskNodeImpl", "isNeighborActive", getQualifiedName() + active);
+    return active;
+  }
+
+  @Override
+  public boolean resetTopologySetupSent() {
+    LOG.entering("TaskNodeImpl", "resetTopologySetupSent", new Object[]{getQualifiedName(),});
+    final boolean retVal = topoSetupSent.compareAndSet(true, false);
+    LOG.exiting("TaskNodeImpl", "resetTopologySetupSent", getQualifiedName() + retVal);
+    return retVal;
+  }
+
+  @Override
+  public void checkAndSendTopologySetupMessage() {
+    LOG.entering("TaskNodeImpl", "checkAndSendTopologySetupMessage", getQualifiedName());
+    if (!topoSetupSent.get()
+        && (parentActive() && activeNeighborOfParent())
+        && (allChildrenActive() && activeNeighborOfAllChildren())) {
+      sendTopoSetupMsg();
+    }
+    LOG.exiting("TaskNodeImpl", "checkAndSendTopologySetupMessage", getQualifiedName());
+  }
+
+  private void sendTopoSetupMsg() {
+    LOG.entering("TaskNodeImpl", "sendTopoSetupMsg", getQualifiedName() + taskId);
+    LOG.fine(getQualifiedName() + "is an active participant in the topology");
+    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, driverId, 0, taskId,
+        version.get(), Utils.EmptyByteArr));
+    taskNodeStatus.onTopologySetupMessageSent();
+    final boolean sentAlready = !topoSetupSent.compareAndSet(false, true);
+    if (sentAlready) {
+      LOG.fine(getQualifiedName() + "TopologySetup msg was sent more than once. Something fishy!!!");
+    }
+    LOG.exiting("TaskNodeImpl", "sendTopoSetupMsg", getQualifiedName());
+  }
+
+  @Override
+  public void checkAndSendTopologySetupMessageFor(final String source) {
+    LOG.entering("TaskNodeImpl", "checkAndSendTopologySetupMessageFor", new Object[]{getQualifiedName(), source});
+    final TaskNode srcNode = findTask(source);
+    if (srcNode != null) {
+      srcNode.checkAndSendTopologySetupMessage();
+    }
+    LOG.exiting("TaskNodeImpl", "checkAndSendTopologySetupMessageFor", getQualifiedName() + source);
+  }
+
+  /**
+   * @param sourceId
+   * @return
+   */
+  private TaskNode findTask(final String sourceId) {
+    LOG.entering("TaskNodeImpl", "findTask", new Object[]{getQualifiedName(), sourceId});
+    final TaskNode retNode;
+    if (parent != null && parent.getTaskId().equals(sourceId)) {
+      retNode = parent;
+    } else {
+      retNode = findChildTask(sourceId);
+    }
+    LOG.exiting("TaskNodeImpl", "findTask", getQualifiedName() + retNode);
+    return retNode;
+  }
+
+  private TaskNode findChildTask(final String sourceId) {
+    LOG.entering("TaskNodeImpl", "findChildTask", new Object[]{getQualifiedName(), sourceId});
+    TaskNode retNode = null;
+    for (final TaskNode child : children) {
+      if (child.getTaskId().equals(sourceId)) {
+        retNode = child;
+        break;
+      }
+    }
+    LOG.exiting("TaskNodeImpl", "findChildTask", getQualifiedName() + retNode);
+    return retNode;
+  }
+
+  private boolean parentActive() {
+    LOG.entering("TaskNodeImpl", "parentActive", getQualifiedName());
+    if (isRoot) {
+      LOG.exiting("TaskNodeImpl", "parentActive", Arrays.toString(new Object[]{true, getQualifiedName(), "I am root. Will never have parent. So signalling active"}));
+      return true;
+    }
+    if (isNeighborActive(parent.getTaskId())) {
+      LOG.exiting("TaskNodeImpl", "parentActive", Arrays.toString(new Object[]{true, getQualifiedName(), parent, " is an active neghbor"}));
+      return true;
+    }
+    LOG.exiting("TaskNodeImpl", "parentActive", getQualifiedName() + "Neither root Nor is " + parent + " an active neghbor");
+    return false;
+  }
+
+  private boolean activeNeighborOfParent() {
+    LOG.entering("TaskNodeImpl", "activeNeighborOfParent", getQualifiedName());
+    if (isRoot) {
+      LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(), "I am root. Will never have parent. So signalling active"}));
+      return true;
+    }
+    if (parent.isNeighborActive(taskId)) {
+      LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(), "I am an active neighbor of parent ", parent}));
+      return true;
+    }
+    LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{false, getQualifiedName(), "Neither is parent null Nor am I an active neighbor of parent ", parent}));
+    return false;
+  }
+
+  private boolean allChildrenActive() {
+    LOG.entering("TaskNodeImpl", "allChildrenActive", getQualifiedName());
+    for (final TaskNode child : children) {
+      final String childId = child.getTaskId();
+      if (child.isRunning() && !isNeighborActive(childId)) {
+        LOG.exiting("TaskNodeImpl", "allChildrenActive", Arrays.toString(new Object[]{false, getQualifiedName(), childId, " not active yet"}));
+        return false;
+      }
+    }
+    LOG.exiting("TaskNodeImpl", "allChildrenActive", Arrays.toString(new Object[]{true, getQualifiedName(), "All children active"}));
+    return true;
+  }
+
+  private boolean activeNeighborOfAllChildren() {
+    LOG.entering("TaskNodeImpl", "activeNeighborOfAllChildren", getQualifiedName());
+    for (final TaskNode child : children) {
+      if (child.isRunning() && !child.isNeighborActive(taskId)) {
+        LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", Arrays.toString(new Object[]{false, getQualifiedName(), "Not an active neighbor of child ", child}));
+        return false;
+      }
+    }
+    LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", Arrays.toString(new Object[]{true, getQualifiedName(), "Active neighbor of all children"}));
+    return true;
+  }
+
+  @Override
+  public void waitForTopologySetupOrFailure() {
+    LOG.entering("TaskNodeImpl", "waitForTopologySetupOrFailure", getQualifiedName());
+    taskNodeStatus.waitForTopologySetup();
+    LOG.exiting("TaskNodeImpl", "waitForTopologySetupOrFailure", getQualifiedName());
+  }
+
+  @Override
+  public boolean hasChanges() {
+    LOG.entering("TaskNodeImpl", "hasChanges", getQualifiedName());
+    final boolean changes = taskNodeStatus.hasChanges();
+    LOG.exiting("TaskNodeImpl", "hasChanges", getQualifiedName() + changes);
+    return changes;
+  }
+
+  @Override
+  public int getVersion() {
+    return version.get();
+  }
+
+  @Override
+  public int hashCode() {
+    int r = taskId.hashCode();
+    r = 31 * r + version.get();
+    return r;
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (obj != this) {
+      if (obj instanceof TaskNodeImpl) {
+        final TaskNodeImpl that = (TaskNodeImpl) obj;
+        if (this.taskId.equals(that.taskId) && this.version.get() == that.version.get()) {
+          return true;
+        } else {
+          return false;
+        }
+      } else {
+        return false;
+      }
+    } else {
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java
new file mode 100644
index 0000000..3f904e2
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.io.network.group.api.driver.TaskNode;
+import org.apache.reef.io.network.group.api.driver.TaskNodeStatus;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.utils.ConcurrentCountingMap;
+import org.apache.reef.io.network.group.impl.utils.CountingMap;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage.Type;
+import org.apache.reef.tang.annotations.Name;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class TaskNodeStatusImpl implements TaskNodeStatus {
+
+  private static final Logger LOG = Logger.getLogger(TaskNodeStatusImpl.class.getName());
+
+  private final ConcurrentCountingMap<Type, String> statusMap = new ConcurrentCountingMap<>();
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final String taskId;
+  private final Set<String> activeNeighbors = new HashSet<>();
+  private final CountingMap<String> neighborStatus = new CountingMap<>();
+  private final AtomicBoolean updatingTopo = new AtomicBoolean(false);
+  private final Object topoUpdateStageLock = new Object();
+  private final Object topoSetupSentLock = new Object();
+  private final TaskNode node;
+
+  public TaskNodeStatusImpl(final Class<? extends Name<String>> groupName,
+                            final Class<? extends Name<String>> operName, final String taskId, final TaskNode node) {
+    this.groupName = groupName;
+    this.operName = operName;
+    this.taskId = taskId;
+    this.node = node;
+  }
+
+  private boolean isDeadMsg(final Type msgAcked) {
+    return msgAcked == Type.ParentDead || msgAcked == Type.ChildDead;
+  }
+
+  private boolean isAddMsg(final Type msgAcked) {
+    return msgAcked == Type.ParentAdd || msgAcked == Type.ChildAdd;
+  }
+
+  private Type getAckedMsg(final Type msgType) {
+    switch (msgType) {
+      case ParentAdded:
+        return Type.ParentAdd;
+      case ChildAdded:
+        return Type.ChildAdd;
+      case ParentRemoved:
+        return Type.ParentDead;
+      case ChildRemoved:
+        return Type.ChildDead;
+      default:
+        return msgType;
+    }
+  }
+
+  private void chkIamActiveToSendTopoSetup(final Type msgDealt) {
+    LOG.entering("TaskNodeStatusImpl", "chkAndSendTopoSetup", new Object[]{getQualifiedName(), msgDealt});
+    if (statusMap.isEmpty()) {
+      LOG.finest(getQualifiedName() + "Empty status map.");
+      node.checkAndSendTopologySetupMessage();
+    } else {
+      LOG.finest(getQualifiedName() + "Status map non-empty" + statusMap);
+    }
+    LOG.exiting("TaskNodeStatusImpl", "chkAndSendTopoSetup", getQualifiedName() + msgDealt);
+  }
+
+  @Override
+  public void onTopologySetupMessageSent() {
+    LOG.entering("TaskNodeStatusImpl", "onTopologySetupMessageSent", getQualifiedName());
+    neighborStatus.clear();
+    synchronized (topoSetupSentLock) {
+      topoSetupSentLock.notifyAll();
+    }
+    LOG.exiting("TaskNodeStatusImpl", "onTopologySetupMessageSent", getQualifiedName());
+  }
+
+  @Override
+  public boolean isActive(final String neighborId) {
+    LOG.entering("TaskNodeStatusImpl", "isActive", new Object[]{getQualifiedName(), neighborId});
+    final boolean contains = activeNeighbors.contains(neighborId);
+    LOG.exiting("TaskNodeStatusImpl", "isActive", getQualifiedName() + contains);
+    return contains;
+  }
+
+  /**
+   * This needs to happen in line rather than in a stage because we need to note
+   * the messages we send to the tasks before we start processing msgs from the
+   * nodes.(Acks & Topology msgs)
+   */
+  @Override
+  public void expectAckFor(final Type msgType, final String srcId) {
+    LOG.entering("TaskNodeStatusImpl", "expectAckFor", new Object[]{getQualifiedName(), msgType, srcId});
+    LOG.finest(getQualifiedName() + "Adding " + srcId + " to sources");
+    statusMap.add(msgType, srcId);
+    LOG.exiting("TaskNodeStatusImpl", "expectAckFor", getQualifiedName() + "Sources from which ACKs for " + msgType + " are expected: " + statusMap.get(msgType));
+  }
+
+  @Override
+  public void clearStateAndReleaseLocks() {
+    LOG.entering("TaskNodeStatusImpl", "clearStateAndReleaseLocks", getQualifiedName());
+    statusMap.clear();
+    activeNeighbors.clear();
+    neighborStatus.clear();
+    updatingTopo.compareAndSet(true, false);
+    synchronized (topoSetupSentLock) {
+      topoSetupSentLock.notifyAll();
+    }
+    synchronized (topoUpdateStageLock) {
+      topoUpdateStageLock.notifyAll();
+    }
+    LOG.exiting("TaskNodeStatusImpl", "clearStateAndReleaseLocks", getQualifiedName());
+  }
+
+  @Override
+  public void updateFailureOf(final String taskId) {
+    LOG.entering("TaskNodeStatusImpl", "updateFailureOf", new Object[]{getQualifiedName(), taskId});
+    activeNeighbors.remove(taskId);
+    neighborStatus.remove(taskId);
+    LOG.exiting("TaskNodeStatusImpl", "updateFailureOf", getQualifiedName());
+  }
+
+  @Override
+  public void processAcknowledgement(final GroupCommunicationMessage gcm) {
+    LOG.entering("TaskNodeStatusImpl", "processMsg", new Object[]{getQualifiedName(), gcm});
+    final String self = gcm.getSrcid();
+    final Type msgType = gcm.getType();
+    final Type msgAcked = getAckedMsg(msgType);
+    final String sourceId = gcm.getDestid();
+    switch (msgType) {
+      case TopologySetup:
+        synchronized (topoUpdateStageLock) {
+          if (!updatingTopo.compareAndSet(true, false)) {
+            LOG.fine(getQualifiedName() + "Was expecting updateTopo to be true but it was false");
+          }
+          topoUpdateStageLock.notifyAll();
+        }
+        break;
+      case ParentAdded:
+      case ChildAdded:
+      case ParentRemoved:
+      case ChildRemoved:
+        processNeighborAcks(gcm, msgType, msgAcked, sourceId);
+        break;
+
+      default:
+        LOG.fine(getQualifiedName() + "Non ACK msg " + gcm.getType() + " for " + gcm.getDestid() + " unexpected");
+        break;
+    }
+    LOG.exiting("TaskNodeStatusImpl", "processMsg", getQualifiedName());
+  }
+
+  private void processNeighborAcks(final GroupCommunicationMessage gcm, final Type msgType, final Type msgAcked,
+                                   final String sourceId) {
+    LOG.entering("TaskNodeStatusImpl", "processNeighborAcks", getQualifiedName() + gcm);
+    if (statusMap.containsKey(msgAcked)) {
+      if (statusMap.contains(msgAcked, sourceId)) {
+        statusMap.remove(msgAcked, sourceId);
+        updateNeighborStatus(msgAcked, sourceId);
+        checkNeighborActiveToSendTopoSetup(sourceId);
+        chkIamActiveToSendTopoSetup(msgAcked);
+      } else {
+        LOG.fine(getQualifiedName() + "NodeStatusMsgProcessorStage Got " + msgType + " from a source(" + sourceId
+            + ") to whom ChildAdd was not sent. "
+            + "Perhaps reset during failure. If context not indicative use ***CAUTION***");
+      }
+    } else {
+      LOG.fine(getQualifiedName() + "NodeStatusMsgProcessorStage There were no " + msgAcked
+          + " msgs sent in the previous update cycle. "
+          + "Perhaps reset during failure. If context not indicative use ***CAUTION***");
+    }
+    LOG.exiting("TaskNodeStatusImpl", "processNeighborAcks", getQualifiedName() + gcm);
+  }
+
+  private void checkNeighborActiveToSendTopoSetup(final String sourceId) {
+    LOG.entering("TaskNodeStatusImpl", "checkNeighborActiveToSendTopoSetup", new Object[]{getQualifiedName(),
+        sourceId});
+    if (statusMap.notContains(sourceId)) {
+      //All msgs corresponding to sourceId have been ACKed
+      if (neighborStatus.get(sourceId) > 0) {
+        activeNeighbors.add(sourceId);
+        node.checkAndSendTopologySetupMessageFor(sourceId);
+      } else {
+        LOG.finest(getQualifiedName() + sourceId + " is not a neighbor anymore");
+      }
+    } else {
+      LOG.finest(getQualifiedName() + "Not done processing " + sourceId + " acks yet. So it is still inactive");
+    }
+    LOG.exiting("TaskNodeStatusImpl", "checkNeighborActiveToSendTopoSetup", getQualifiedName() + sourceId);
+  }
+
+  private void updateNeighborStatus(final Type msgAcked, final String sourceId) {
+    LOG.entering("TaskNodeStatusImpl", "updateNeighborStatus", new Object[]{getQualifiedName(), msgAcked, sourceId});
+    if (isAddMsg(msgAcked)) {
+      neighborStatus.add(sourceId);
+    } else if (isDeadMsg(msgAcked)) {
+      neighborStatus.remove(sourceId);
+    } else {
+      throw new RuntimeException("Can only deal with Neigbor ACKs while I received " + msgAcked + " from " + sourceId);
+    }
+    LOG.exiting("TaskNodeStatusImpl", "updateNeighborStatus", new Object[]{getQualifiedName(), msgAcked, sourceId});
+  }
+
+  @Override
+  public void updatingTopology() {
+    LOG.entering("TaskNodeStatusImpl", "updatingTopology", getQualifiedName());
+    final boolean topoBeingUpdated = !updatingTopo.compareAndSet(false, true);
+    if (topoBeingUpdated) {
+      throw new RuntimeException(getQualifiedName() + "Was expecting updateTopo to be false but it was true");
+    }
+    LOG.exiting("TaskNodeStatusImpl", "updatingTopology", getQualifiedName());
+  }
+
+  private String getQualifiedName() {
+    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":(" + taskId + "," + node.getVersion() + ") - ";
+  }
+
+  @Override
+  public boolean hasChanges() {
+    LOG.entering("TaskNodeStatusImpl", "hasChanges", getQualifiedName());
+    final boolean notEmpty = !statusMap.isEmpty();
+    LOG.exiting("TaskNodeStatusImpl", "hasChanges", getQualifiedName() + notEmpty);
+    return notEmpty;
+  }
+
+  @Override
+  public void waitForTopologySetup() {
+    LOG.entering("TaskNodeStatusImpl", "waitForTopologySetup", getQualifiedName());
+    LOG.finest("Waiting to acquire topoUpdateStageLock");
+    synchronized (topoUpdateStageLock) {
+      LOG.finest(getQualifiedName() + "Acquired topoUpdateStageLock. updatingTopo: " + updatingTopo.get());
+      while (updatingTopo.get() && node.isRunning()) {
+        try {
+          LOG.finest(getQualifiedName() + "Waiting on topoUpdateStageLock");
+          topoUpdateStageLock.wait();
+        } catch (final InterruptedException e) {
+          throw new RuntimeException("InterruptedException in NodeTopologyUpdateWaitStage "
+              + "while waiting for receiving TopologySetup", e);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskState.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskState.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskState.java
new file mode 100644
index 0000000..8404d89
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskState.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+public enum TaskState {
+  NOT_STARTED, RUNNING, FAILED;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedEvaluatorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedEvaluatorHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedEvaluatorHandler.java
new file mode 100644
index 0000000..228cf4b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedEvaluatorHandler.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.logging.Logger;
+
+public class TopologyFailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
+
+  private static final Logger LOG = Logger.getLogger(TopologyFailedEvaluatorHandler.class.getName());
+
+
+  private final CommunicationGroupDriverImpl communicationGroupDriverImpl;
+
+  public TopologyFailedEvaluatorHandler(final CommunicationGroupDriverImpl communicationGroupDriverImpl) {
+    this.communicationGroupDriverImpl = communicationGroupDriverImpl;
+  }
+
+  @Override
+  public void onNext(final FailedEvaluator failedEvaluator) {
+    final String failedEvaluatorId = failedEvaluator.getId();
+    LOG.entering("TopologyFailedEvaluatorHandler", "onNext", failedEvaluatorId);
+    if (failedEvaluator.getFailedTask().isPresent()) {
+      final String failedTaskId = failedEvaluator.getFailedTask().get().getId();
+      LOG.finest("Failed Evaluator contains a failed task: " + failedTaskId);
+      communicationGroupDriverImpl.failTask(failedTaskId);
+      communicationGroupDriverImpl.removeTask(failedTaskId);
+    }
+    LOG.exiting("TopologyFailedEvaluatorHandler", "onNext", failedEvaluatorId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedTaskHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedTaskHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedTaskHandler.java
new file mode 100644
index 0000000..a7ce7f7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedTaskHandler.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.logging.Logger;
+
+public class TopologyFailedTaskHandler implements EventHandler<FailedTask> {
+
+  private static final Logger LOG = Logger.getLogger(TopologyFailedTaskHandler.class.getName());
+
+
+  private final CommunicationGroupDriverImpl communicationGroupDriverImpl;
+
+  public TopologyFailedTaskHandler(final CommunicationGroupDriverImpl communicationGroupDriverImpl) {
+    this.communicationGroupDriverImpl = communicationGroupDriverImpl;
+  }
+
+  @Override
+  public void onNext(final FailedTask failedTask) {
+    final String failedTaskId = failedTask.getId();
+    LOG.entering("TopologyFailedTaskHandler", "onNext", failedTaskId);
+    communicationGroupDriverImpl.failTask(failedTaskId);
+    LOG.exiting("TopologyFailedTaskHandler", "onNext", failedTaskId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyMessageHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyMessageHandler.java
new file mode 100644
index 0000000..e651aa7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyMessageHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.logging.Logger;
+
+public class TopologyMessageHandler implements EventHandler<GroupCommunicationMessage> {
+
+  private static final Logger LOG = Logger.getLogger(TopologyMessageHandler.class.getName());
+
+
+  private final CommunicationGroupDriverImpl communicationGroupDriverImpl;
+
+  public TopologyMessageHandler(final CommunicationGroupDriverImpl communicationGroupDriverImpl) {
+    this.communicationGroupDriverImpl = communicationGroupDriverImpl;
+  }
+
+  @Override
+  public void onNext(final GroupCommunicationMessage msg) {
+    LOG.entering("TopologyMessageHandler", "onNext", msg);
+    communicationGroupDriverImpl.processMsg(msg);
+    LOG.exiting("TopologyMessageHandler", "onNext", msg);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyRunningTaskHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyRunningTaskHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyRunningTaskHandler.java
new file mode 100644
index 0000000..d6e0fae
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyRunningTaskHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.logging.Logger;
+
+public class TopologyRunningTaskHandler implements EventHandler<RunningTask> {
+
+  private static final Logger LOG = Logger.getLogger(TopologyRunningTaskHandler.class.getName());
+
+  private final CommunicationGroupDriverImpl communicationGroupDriverImpl;
+
+  public TopologyRunningTaskHandler(final CommunicationGroupDriverImpl communicationGroupDriverImpl) {
+    this.communicationGroupDriverImpl = communicationGroupDriverImpl;
+  }
+
+  @Override
+  public void onNext(final RunningTask runningTask) {
+    final String runningTaskId = runningTask.getId();
+    LOG.entering("TopologyRunningTaskHandler", "onNext", runningTaskId);
+    communicationGroupDriverImpl.runTask(runningTaskId);
+    LOG.exiting("TopologyRunningTaskHandler", "onNext", runningTaskId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
new file mode 100644
index 0000000..3905d47
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.io.network.group.api.driver.TaskNode;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ *
+ */
+public class TopologyUpdateWaitHandler implements EventHandler<List<TaskNode>> {
+
+  private static final Logger LOG = Logger.getLogger(TopologyUpdateWaitHandler.class.getName());
+  private final EStage<GroupCommunicationMessage> senderStage;
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final String driverId;
+  private final int driverVersion;
+  private final String dstId;
+  private final int dstVersion;
+  private final String qualifiedName;
+
+
+  /**
+   * The handler will wait for all nodes to acquire topoLock
+   * and send TopologySetup msg. Then it will send TopologyUpdated
+   * msg. However, any local topology changes are not in effect
+   * till driver sends TopologySetup once statusMap is emptied
+   * The operations in the tasks that have topology changes will
+   * wait for this. However other tasks that do not have any changes
+   * will continue their regular operation
+   */
+  public TopologyUpdateWaitHandler(final EStage<GroupCommunicationMessage> senderStage,
+                                   final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operName,
+                                   final String driverId, final int driverVersion, final String dstId, final int dstVersion,
+                                   final String qualifiedName) {
+    super();
+    this.senderStage = senderStage;
+    this.groupName = groupName;
+    this.operName = operName;
+    this.driverId = driverId;
+    this.driverVersion = driverVersion;
+    this.dstId = dstId;
+    this.dstVersion = dstVersion;
+    this.qualifiedName = qualifiedName;
+  }
+
+
+  @Override
+  public void onNext(final List<TaskNode> nodes) {
+    LOG.entering("TopologyUpdateWaitHandler", "onNext", new Object[]{qualifiedName, nodes});
+
+    for (final TaskNode node : nodes) {
+      LOG.fine(qualifiedName + "Waiting for " + node + " to enter TopologyUdate phase");
+      node.waitForTopologySetupOrFailure();
+      if (node.isRunning()) {
+        LOG.fine(qualifiedName + node + " is in TopologyUpdate phase");
+      } else {
+        LOG.fine(qualifiedName + node + " has failed");
+      }
+    }
+    LOG.finest(qualifiedName + "NodeTopologyUpdateWaitStage All to be updated nodes " + "have received TopologySetup");
+    LOG.fine(qualifiedName + "All affected parts of the topology are in TopologyUpdate phase. Will send a note to ("
+        + dstId + "," + dstVersion + ")");
+    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated, driverId, driverVersion, dstId,
+        dstVersion, Utils.EmptyByteArr));
+    LOG.exiting("TopologyUpdateWaitHandler", "onNext", qualifiedName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
new file mode 100644
index 0000000..b1695a9
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.io.network.group.api.operators.GroupCommOperator;
+import org.apache.reef.io.network.group.api.GroupChanges;
+import org.apache.reef.io.network.group.api.config.OperatorSpec;
+import org.apache.reef.io.network.group.api.driver.TaskNode;
+import org.apache.reef.io.network.group.api.driver.Topology;
+import org.apache.reef.io.network.group.impl.GroupChangesCodec;
+import org.apache.reef.io.network.group.impl.GroupChangesImpl;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.ReduceFunctionParam;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.operators.BroadcastReceiver;
+import org.apache.reef.io.network.group.impl.operators.BroadcastSender;
+import org.apache.reef.io.network.group.impl.operators.ReduceReceiver;
+import org.apache.reef.io.network.group.impl.operators.ReduceSender;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.SingleThreadStage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.logging.Logger;
+
+/**
+ * Implements a tree topology with the specified Fan Out
+ */
+public class TreeTopology implements Topology {
+
+  private static final Logger LOG = Logger.getLogger(TreeTopology.class.getName());
+
+  private final EStage<GroupCommunicationMessage> senderStage;
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final String driverId;
+  private String rootId;
+  private OperatorSpec operatorSpec;
+
+  private TaskNode root;
+  private TaskNode logicalRoot;
+  private TaskNode prev;
+  private final int fanOut;
+
+  private final ConcurrentMap<String, TaskNode> nodes = new ConcurrentSkipListMap<>();
+  private final ConfigurationSerializer confSer = new AvroConfigurationSerializer();
+
+
+  public TreeTopology(final EStage<GroupCommunicationMessage> senderStage,
+                      final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operatorName,
+                      final String driverId, final int numberOfTasks, final int fanOut) {
+    this.senderStage = senderStage;
+    this.groupName = groupName;
+    this.operName = operatorName;
+    this.driverId = driverId;
+    this.fanOut = fanOut;
+    LOG.config(getQualifiedName() + "Tree Topology running with a fan-out of " + fanOut);
+  }
+
+  @Override
+  public void setRootTask(final String rootId) {
+    LOG.entering("TreeTopology", "setRootTask", new Object[]{getQualifiedName(), rootId});
+    this.rootId = rootId;
+    LOG.exiting("TreeTopology", "setRootTask", getQualifiedName() + rootId);
+  }
+
+  @Override
+  public String getRootId() {
+    LOG.entering("TreeTopology", "getRootId", getQualifiedName());
+    LOG.exiting("TreeTopology", "getRootId", getQualifiedName() + rootId);
+    return rootId;
+  }
+
+  @Override
+  public void setOperatorSpecification(final OperatorSpec spec) {
+    LOG.entering("TreeTopology", "setOperSpec", new Object[]{getQualifiedName(), spec});
+    this.operatorSpec = spec;
+    LOG.exiting("TreeTopology", "setOperSpec", getQualifiedName() + spec);
+  }
+
+  @Override
+  public Configuration getTaskConfiguration(final String taskId) {
+    LOG.entering("TreeTopology", "getTaskConfig", new Object[]{getQualifiedName(), taskId});
+    final TaskNode taskNode = nodes.get(taskId);
+    if (taskNode == null) {
+      throw new RuntimeException(getQualifiedName() + taskId + " does not exist");
+    }
+
+    final int version = getNodeVersion(taskId);
+    final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder();
+    jcb.bindNamedParameter(DataCodec.class, operatorSpec.getDataCodecClass());
+    jcb.bindNamedParameter(TaskVersion.class, Integer.toString(version));
+    if (operatorSpec instanceof BroadcastOperatorSpec) {
+      final BroadcastOperatorSpec broadcastOperatorSpec = (BroadcastOperatorSpec) operatorSpec;
+      if (taskId.equals(broadcastOperatorSpec.getSenderId())) {
+        jcb.bindImplementation(GroupCommOperator.class, BroadcastSender.class);
+      } else {
+        jcb.bindImplementation(GroupCommOperator.class, BroadcastReceiver.class);
+      }
+    } else if (operatorSpec instanceof ReduceOperatorSpec) {
+      final ReduceOperatorSpec reduceOperatorSpec = (ReduceOperatorSpec) operatorSpec;
+      jcb.bindNamedParameter(ReduceFunctionParam.class, reduceOperatorSpec.getRedFuncClass());
+      if (taskId.equals(reduceOperatorSpec.getReceiverId())) {
+        jcb.bindImplementation(GroupCommOperator.class, ReduceReceiver.class);
+      } else {
+        jcb.bindImplementation(GroupCommOperator.class, ReduceSender.class);
+      }
+    }
+    final Configuration retConf = jcb.build();
+    LOG.exiting("TreeTopology", "getTaskConfig", getQualifiedName() + confSer.toString(retConf));
+    return retConf;
+  }
+
+  @Override
+  public int getNodeVersion(final String taskId) {
+    LOG.entering("TreeTopology", "getNodeVersion", new Object[]{getQualifiedName(), taskId});
+    final TaskNode node = nodes.get(taskId);
+    if (node == null) {
+      throw new RuntimeException(getQualifiedName() + taskId + " is not available on the nodes map");
+    }
+    final int version = node.getVersion();
+    LOG.exiting("TreeTopology", "getNodeVersion", getQualifiedName() + " " + taskId + " " + version);
+    return version;
+  }
+
+  @Override
+  public void removeTask(final String taskId) {
+    LOG.entering("TreeTopology", "removeTask", new Object[]{getQualifiedName(), taskId});
+    if (!nodes.containsKey(taskId)) {
+      LOG.fine("Trying to remove a non-existent node in the task graph");
+      LOG.exiting("TreeTopology", "removeTask", getQualifiedName());
+      return;
+    }
+    if (taskId.equals(rootId)) {
+      unsetRootNode(taskId);
+    } else {
+      removeChild(taskId);
+    }
+    LOG.exiting("TreeTopology", "removeTask", getQualifiedName() + taskId);
+  }
+
+  @Override
+  public void addTask(final String taskId) {
+    LOG.entering("TreeTopology", "addTask", new Object[]{getQualifiedName(), taskId});
+    if (nodes.containsKey(taskId)) {
+      LOG.fine("Got a request to add a task that is already in the graph. " +
+          "We need to block this request till the delete finishes. ***CAUTION***");
+    }
+
+    if (taskId.equals(rootId)) {
+      setRootNode(taskId);
+    } else {
+      addChild(taskId);
+    }
+    prev = nodes.get(taskId);
+    LOG.exiting("TreeTopology", "addTask", getQualifiedName() + taskId);
+  }
+
+  private void addChild(final String taskId) {
+    LOG.entering("TreeTopology", "addChild", new Object[]{getQualifiedName(), taskId});
+    LOG.finest(getQualifiedName() + "Adding leaf " + taskId);
+    final TaskNode node = new TaskNodeImpl(senderStage, groupName, operName, taskId, driverId, false);
+    if (logicalRoot != null) {
+      addTaskNode(node);
+    }
+    nodes.put(taskId, node);
+    LOG.exiting("TreeTopology", "addChild", getQualifiedName() + taskId);
+  }
+
+  private void addTaskNode(final TaskNode node) {
+    LOG.entering("TreeTopology", "addTaskNode", new Object[]{getQualifiedName(), node});
+    if (logicalRoot.getNumberOfChildren() >= this.fanOut) {
+      logicalRoot = logicalRoot.successor();
+    }
+    node.setParent(logicalRoot);
+    logicalRoot.addChild(node);
+    prev.setSibling(node);
+    LOG.exiting("TreeTopology", "addTaskNode", getQualifiedName() + node);
+  }
+
+  private void removeChild(final String taskId) {
+    LOG.entering("TreeTopology", "removeChild", new Object[]{getQualifiedName(), taskId});
+    if (root != null) {
+      root.removeChild(nodes.get(taskId));
+    }
+    nodes.remove(taskId);
+    LOG.exiting("TreeTopology", "removeChild", getQualifiedName() + taskId);
+  }
+
+  private void setRootNode(final String rootId) {
+    LOG.entering("TreeTopology", "setRootNode", new Object[]{getQualifiedName(), rootId});
+    final TaskNode node = new TaskNodeImpl(senderStage, groupName, operName, rootId, driverId, true);
+    this.root = node;
+    this.logicalRoot = this.root;
+    this.prev = this.root;
+
+    for (final Map.Entry<String, TaskNode> nodeEntry : nodes.entrySet()) {
+      final TaskNode leaf = nodeEntry.getValue();
+      addTaskNode(leaf);
+      this.prev = leaf;
+    }
+    nodes.put(rootId, root);
+    LOG.exiting("TreeTopology", "setRootNode", getQualifiedName() + rootId);
+  }
+
+  private void unsetRootNode(final String taskId) {
+    LOG.entering("TreeTopology", "unsetRootNode", new Object[]{getQualifiedName(), taskId});
+    nodes.remove(rootId);
+
+    for (final Map.Entry<String, TaskNode> nodeEntry : nodes.entrySet()) {
+      final String id = nodeEntry.getKey();
+      final TaskNode leaf = nodeEntry.getValue();
+      leaf.setParent(null);
+    }
+    LOG.exiting("TreeTopology", "unsetRootNode", getQualifiedName() + taskId);
+  }
+
+  @Override
+  public void onFailedTask(final String taskId) {
+    LOG.entering("TreeTopology", "onFailedTask", new Object[]{getQualifiedName(), taskId});
+    final TaskNode taskNode = nodes.get(taskId);
+    if (taskNode == null) {
+      throw new RuntimeException(getQualifiedName() + taskId + " does not exist");
+    }
+    taskNode.onFailedTask();
+    LOG.exiting("TreeTopology", "onFailedTask", getQualifiedName() + taskId);
+  }
+
+  @Override
+  public void onRunningTask(final String taskId) {
+    LOG.entering("TreeTopology", "onRunningTask", new Object[]{getQualifiedName(), taskId});
+    final TaskNode taskNode = nodes.get(taskId);
+    if (taskNode == null) {
+      throw new RuntimeException(getQualifiedName() + taskId + " does not exist");
+    }
+    taskNode.onRunningTask();
+    LOG.exiting("TreeTopology", "onRunningTask", getQualifiedName() + taskId);
+  }
+
+  @Override
+  public void onReceiptOfMessage(final GroupCommunicationMessage msg) {
+    LOG.entering("TreeTopology", "onReceiptOfMessage", new Object[]{getQualifiedName(), msg});
+    switch (msg.getType()) {
+      case TopologyChanges:
+        onTopologyChanges(msg);
+        break;
+      case UpdateTopology:
+        onUpdateTopology(msg);
+        break;
+
+      default:
+        nodes.get(msg.getSrcid()).onReceiptOfAcknowledgement(msg);
+        break;
+    }
+    LOG.exiting("TreeTopology", "onReceiptOfMessage", getQualifiedName() + msg);
+  }
+
+  private void onUpdateTopology(final GroupCommunicationMessage msg) {
+    LOG.entering("TreeTopology", "onUpdateTopology", new Object[]{getQualifiedName(), msg});
+    LOG.fine(getQualifiedName() + "Update affected parts of Topology");
+    final String dstId = msg.getSrcid();
+    final int version = getNodeVersion(dstId);
+
+    LOG.finest(getQualifiedName() + "Creating NodeTopologyUpdateWaitStage to wait on nodes to be updated");
+    final EventHandler<List<TaskNode>> topoUpdateWaitHandler = new TopologyUpdateWaitHandler(senderStage, groupName,
+        operName, driverId, 0,
+        dstId, version,
+        getQualifiedName());
+    final EStage<List<TaskNode>> nodeTopologyUpdateWaitStage = new SingleThreadStage<>("NodeTopologyUpdateWaitStage",
+        topoUpdateWaitHandler,
+        nodes.size());
+
+    final List<TaskNode> toBeUpdatedNodes = new ArrayList<>(nodes.size());
+    LOG.finest(getQualifiedName() + "Checking which nodes need to be updated");
+    for (final TaskNode node : nodes.values()) {
+      if (node.isRunning() && node.hasChanges() && node.resetTopologySetupSent()) {
+        toBeUpdatedNodes.add(node);
+      }
+    }
+    for (final TaskNode node : toBeUpdatedNodes) {
+      node.updatingTopology();
+      LOG.fine(getQualifiedName() + "Asking " + node + " to UpdateTopology");
+      senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, driverId, 0, node.getTaskId(),
+          node.getVersion(), Utils.EmptyByteArr));
+    }
+    nodeTopologyUpdateWaitStage.onNext(toBeUpdatedNodes);
+    LOG.exiting("TreeTopology", "onUpdateTopology", getQualifiedName() + msg);
+  }
+
+  private void onTopologyChanges(final GroupCommunicationMessage msg) {
+    LOG.entering("TreeTopology", "onTopologyChanges", new Object[]{getQualifiedName(), msg});
+    LOG.fine(getQualifiedName() + "Check TopologyChanges");
+    final String dstId = msg.getSrcid();
+    boolean hasTopologyChanged = false;
+    LOG.finest(getQualifiedName() + "Checking which nodes need to be updated");
+    for (final TaskNode node : nodes.values()) {
+      if (!node.isRunning() || node.hasChanges()) {
+        hasTopologyChanged = true;
+        break;
+      }
+    }
+    final GroupChanges changes = new GroupChangesImpl(hasTopologyChanged);
+    final Codec<GroupChanges> changesCodec = new GroupChangesCodec();
+    LOG.fine(getQualifiedName() + "TopologyChanges: " + changes);
+    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, driverId, 0, dstId, getNodeVersion(dstId),
+        changesCodec.encode(changes)));
+    LOG.exiting("TreeTopology", "onTopologyChanges", getQualifiedName() + msg);
+  }
+
+  private String getQualifiedName() {
+    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + " - ";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/package-info.java
new file mode 100644
index 0000000..82424ce
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/package-info.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * This package contains the implementation of the driver side of the
+ * Group Communication Service using the tree/flat topology. The Service
+ * can be configured with many named Communication Groups and each
+ * Communication Group can be configured with many named operators. This
+ * configuration is typically done on the GroupCommDriver object injected
+ * into the driver. During this specification only the root nodes are
+ * specified.
+ *
+ * After the GroupCommDriver is configured the driver would want to submit
+ * tasks with the Communication Groups & their operators configured. To do
+ * that the user has to create a partial task configuration containing his
+ * set of configurations and add it to the relevant communication group.
+ * Based on whether the given task is a Master/Slave different roles for
+ * the operators are configured. Once added, the final Configuration containing
+ * operators and their roles encoded can be obtained by the
+ * CommunicationGroupDriver.getConfiguration() call. The topology is complete
+ * once the minimum number of tasks needed for the group to function have been
+ * added.
+ *
+ * The initial configuration dished out by the service
+ * creates a bunch of tasks that are not connected. Connections are
+ * established when the Tasks start running. Each operator defines its own
+ * topology and can have potentially different root nodes. Each node in the
+ * topology called a TaskNode is a logical representation of a running Task.
+ * Adding a task to a group creates its TaskNode with TaskState NOT_STARTED.
+ *
+ * The driver side of the service plays a major role in setting up the
+ * topology and making sure that topology is set-up only when the parties
+ * involved are ready to participate in the communication. A topology will
+ * not contain parties that are not active. Active status is given to parties
+ * who have acknowledged the presence of their neighbors and who have been
+ * acknowledged by their neighbors as present. The connection between two
+ * parties is initiated by the driver and the driver then expects the parties
+ * to ACK that their end of the connection has been set-up. Once a party has
+ * ACK its end of all connections and all its neighbors ACK the outgoing part
+ * of their connection to this party the driver sends a TopologySetup msg to
+ * indicate that the topology is usable by this party now. The driver also
+ * listens in on failure events and appropriately updates its state so that
+ * it does not wait for ACKs from failed tasks.
+ *
+ * There are two chains of control:
+ * 1. Driver Events (Running/Failed Tasks)
+ * 2. Tasks (Msgs sent by Task side of Group Communication Service)
+ *
+ * All events and msgs are funneled through the CommunicationGroupDriver so
+ * that all the topologies belonging to different operators configured on the
+ * group are in sync. Without this there is possibility of a deadlock between
+ * the tasks and the driver. So there is no finer level locking other than that
+ * in the CommunicationGroupDriver.
+ *
+ * 1. Driver Events
+ *  These are routed to all communication groups and each communication group
+ *  routes it to all topologies. The topology will then route this event to the
+ *  corresponding TaskNode which will process that event. When a task starts
+ *  running it is notified of its running neighbors and the running neighbors
+ *  are notified of its running. The TaskNodeStatus object keeps track of the
+ *  state of the local topology for this TaskNode. What msgs have been sent to
+ *  this node that need to be ACKed, the status of its neighbors and whether
+ *  this TaskNode is ready to accept data from a neighboring TaskNode when we
+ *  ask the neighbor to check if he was only waiting for this TaskNode to ACK
+ *  in order to send TopologySetup. So when we are sending (Parent|Child)(Add|Dead)
+ *  msgs we first note that we expect an ACK back for this. These ACK expectations
+ *  are then deleted if the node fails. Neighbor failures are also updated.
+ *  All the msg sending is done by the TaskNode. The TaskNodeStatus is only a
+ *  state manager to consult on the status of ACKs and neighbors. This is needed
+ *  by the chkAndSendTopSetup logic. These events also send msgs related to failure
+ *  of tasks so that any task in the toplogy that waited for a response from the
+ *  failed task can move on.
+ *
+ * 2. Tasks
+ *  We get ACK msgs from tasks and they update the status of ACK expectations.
+ *  Here the TaskNodeStatus acts as a bridge between the initiation of a link
+ *  between two parties and the final set-up of the link. Once all ACKs have
+ *  been received we ask the TaskNode to check if it is ready to send a
+ *  TopologySetup msg. Every ACK can also trigger the chkAndSendTopSetup for
+ *  a neighbor.
+ *
+ * The above concerns the topology set-up and fault notiifcations. However, the
+ * other major task that the driver helps with is in updating a topology. When
+ * requested for the update of a Topology using the UpdateTopology msg, the
+ * driver notifies all the parties that have to update their topologies by
+ * sending an UpdateTopology msg to the affected parties. The tasks then try to
+ * enter the UpdateToplogy phase and as soon as they can do(by acquiring a lock)
+ * they respond that they have done so. The driver will wait till all the affected
+ * parties do so and then sends the initiator a msg that Topology has been updated.
+ * The effect of this is that the unaffected regions of the topology can continue
+ * to work normally while the affected regions are healing. The affected regions
+ * heal their (local)topologies using the TopologySetup mechanism described above.
+ * Both update topology and inital set-up use the minimum number of tasks to define
+ * when the set-up is complete.
+ *
+ * The CommGroupDriver class also takes care of minor alterations to event ordering
+ * by the use of locks to coerce the events to occur in a definite way.
+ *
+ */
+package org.apache.reef.io.network.group.impl.driver;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
new file mode 100644
index 0000000..b8dd425
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class BroadcastReceiver<T> implements Broadcast.Receiver<T>, EventHandler<GroupCommunicationMessage> {
+
+  private static final Logger LOG = Logger.getLogger(BroadcastReceiver.class.getName());
+
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final CommGroupNetworkHandler commGroupNetworkHandler;
+  private final Codec<T> dataCodec;
+  private final NetworkService<GroupCommunicationMessage> netService;
+  private final Sender sender;
+
+  private final OperatorTopology topology;
+
+  private final AtomicBoolean init = new AtomicBoolean(false);
+
+  private final CommunicationGroupServiceClient commGroupClient;
+
+  private final int version;
+
+  @Inject
+  public BroadcastReceiver(@Parameter(CommunicationGroupName.class) final String groupName,
+                           @Parameter(OperatorName.class) final String operName,
+                           @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
+                           @Parameter(DataCodec.class) final Codec<T> dataCodec,
+                           @Parameter(DriverIdentifier.class) final String driverId,
+                           @Parameter(TaskVersion.class) final int version,
+                           final CommGroupNetworkHandler commGroupNetworkHandler,
+                           final NetworkService<GroupCommunicationMessage> netService,
+                           final CommunicationGroupServiceClient commGroupClient) {
+    super();
+    this.version = version;
+    LOG.finest(operName + " has CommGroupHandler-" + commGroupNetworkHandler.toString());
+    this.groupName = Utils.getClass(groupName);
+    this.operName = Utils.getClass(operName);
+    this.dataCodec = dataCodec;
+    this.commGroupNetworkHandler = commGroupNetworkHandler;
+    this.netService = netService;
+    this.sender = new Sender(this.netService);
+    this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version);
+    this.commGroupNetworkHandler.register(this.operName, this);
+    this.commGroupClient = commGroupClient;
+  }
+
+  @Override
+  public int getVersion() {
+    return version;
+  }
+
+  @Override
+  public void initialize() throws ParentDeadException {
+    topology.initialize();
+  }
+
+  @Override
+  public Class<? extends Name<String>> getOperName() {
+    return operName;
+  }
+
+  @Override
+  public Class<? extends Name<String>> getGroupName() {
+    return groupName;
+  }
+
+  @Override
+  public String toString() {
+    return "BroadcastReceiver:" + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version;
+  }
+
+  @Override
+  public void onNext(final GroupCommunicationMessage msg) {
+    topology.handle(msg);
+  }
+
+  @Override
+  public T receive() throws NetworkException, InterruptedException {
+    LOG.entering("BroadcastReceiver", "receive", this);
+    LOG.fine("I am " + this);
+
+    if (init.compareAndSet(false, true)) {
+      LOG.fine(this + " Communication group initializing");
+      commGroupClient.initialize();
+      LOG.fine(this + " Communication group initialized");
+    }
+    // I am an intermediate node or leaf.
+
+    final T retVal;
+    // Wait for parent to send
+    LOG.fine(this + " Waiting to receive broadcast");
+    byte[] data;
+    try {
+      data = topology.recvFromParent();
+      // TODO: Should receive the identity element instead of null
+      if (data == null) {
+        LOG.fine(this + " Received null. Perhaps one of my ancestors is dead.");
+        retVal = null;
+      } else {
+        LOG.finest("Using " + dataCodec.getClass().getSimpleName() + " as codec");
+        retVal = dataCodec.decode(data);
+        LOG.finest("Decoded msg successfully");
+        LOG.fine(this + " Received: " + retVal);
+        LOG.finest(this + " Sending to children.");
+      }
+
+      topology.sendToChildren(data, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast);
+    } catch (final ParentDeadException e) {
+      throw new RuntimeException("ParentDeadException", e);
+    }
+    LOG.exiting("BroadcastReceiver", "receive", Arrays.toString(new Object[]{retVal, this}));
+    return retVal;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java
new file mode 100644
index 0000000..6f146e6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class BroadcastSender<T> implements Broadcast.Sender<T>, EventHandler<GroupCommunicationMessage> {
+
+  private static final Logger LOG = Logger.getLogger(BroadcastSender.class.getName());
+
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final CommGroupNetworkHandler commGroupNetworkHandler;
+  private final Codec<T> dataCodec;
+  private final NetworkService<GroupCommunicationMessage> netService;
+  private final Sender sender;
+
+  private final OperatorTopology topology;
+
+  private final AtomicBoolean init = new AtomicBoolean(false);
+
+  private final CommunicationGroupServiceClient commGroupClient;
+
+  private final int version;
+
+  @Inject
+  public BroadcastSender(@Parameter(CommunicationGroupName.class) final String groupName,
+                         @Parameter(OperatorName.class) final String operName,
+                         @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
+                         @Parameter(DataCodec.class) final Codec<T> dataCodec,
+                         @Parameter(DriverIdentifier.class) final String driverId,
+                         @Parameter(TaskVersion.class) final int version,
+                         final CommGroupNetworkHandler commGroupNetworkHandler,
+                         final NetworkService<GroupCommunicationMessage> netService,
+                         final CommunicationGroupServiceClient commGroupClient) {
+    super();
+    this.version = version;
+    LOG.finest(operName + "has CommGroupHandler-" + commGroupNetworkHandler.toString());
+    this.groupName = Utils.getClass(groupName);
+    this.operName = Utils.getClass(operName);
+    this.dataCodec = dataCodec;
+    this.commGroupNetworkHandler = commGroupNetworkHandler;
+    this.netService = netService;
+    this.sender = new Sender(this.netService);
+    this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version);
+    this.commGroupNetworkHandler.register(this.operName, this);
+    this.commGroupClient = commGroupClient;
+  }
+
+  @Override
+  public int getVersion() {
+    return version;
+  }
+
+  @Override
+  public void initialize() throws ParentDeadException {
+    topology.initialize();
+  }
+
+  @Override
+  public Class<? extends Name<String>> getOperName() {
+    return operName;
+  }
+
+  @Override
+  public Class<? extends Name<String>> getGroupName() {
+    return groupName;
+  }
+
+  @Override
+  public String toString() {
+    return "BroadcastSender:" + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version;
+  }
+
+  @Override
+  public void onNext(final GroupCommunicationMessage msg) {
+    topology.handle(msg);
+  }
+
+  @Override
+  public void send(final T element) throws NetworkException, InterruptedException {
+    LOG.entering("BroadcastSender", "send", new Object[]{this, element});
+    LOG.fine("I am " + this);
+
+    if (init.compareAndSet(false, true)) {
+      LOG.fine(this + " Communication group initializing");
+      commGroupClient.initialize();
+      LOG.fine(this + " Communication group initialized");
+    }
+
+    try {
+      LOG.fine(this + " Broadcasting " + element);
+      topology.sendToChildren(dataCodec.encode(element), ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast);
+    } catch (final ParentDeadException e) {
+      throw new RuntimeException("ParentDeadException", e);
+    }
+    LOG.exiting("BroadcastSender", "send", Arrays.toString(new Object[]{this, element}));
+  }
+
+}