You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/03/06 02:55:35 UTC

[5/8] incubator-reef git commit: [REEF-118] Add Shimoga library for elastic group communication.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/ReduceScatter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/ReduceScatter.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/ReduceScatter.java
new file mode 100644
index 0000000..792b655
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/ReduceScatter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.operators;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.wake.Identifier;
+
+import java.util.List;
+
+/**
+ * MPI Reduce Scatter operator.
+ * <p/>
+ * Each task has a list of elements. Assume that each task reduces
+ * each element in the list to form a list of reduced elements at a dummy root.
+ * The dummy root then keeps the portion of the list assigned to it and
+ * scatters the remaining among the other tasks
+ */
+public interface ReduceScatter<T> extends GroupCommOperator {
+
+  /**
+   * Apply this operation on elements where counts specify the distribution of
+   * elements to each task. Ordering is assumed to be default.
+   * <p/>
+   * Here counts is of the same size as the entire group not just children.
+   *
+   * @return List of values that result from applying reduce function on
+   * corresponding elements of each list received as a result of
+   * applying scatter.
+   */
+  List<T> apply(List<T> elements, List<Integer> counts) throws InterruptedException, NetworkException;
+
+  /**
+   * Apply this operation on elements where counts specify the distribution of
+   * elements to each task. Ordering is specified using order
+   * <p/>
+   * Here counts is of the same size as the entire group not just children
+   *
+   * @return List of values that result from applying reduce function on
+   * corresponding elements of each list received as a result of
+   * applying scatter.
+   */
+  List<T> apply(List<T> elements, List<Integer> counts,
+                List<? extends Identifier> order) throws InterruptedException, NetworkException;
+
+  /**
+   * get {@link org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction} configured
+   *
+   * @return {@link org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction}
+   */
+  Reduce.ReduceFunction<T> getReduceFunction();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
new file mode 100644
index 0000000..6c58d61
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.operators;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.wake.Identifier;
+
+import java.util.List;
+
+/**
+ * MPI Scatter operator
+ * <p/>
+ * Scatter a list of elements to the receivers The receivers will receive a
+ * sub-list of elements targeted for them. Supports non-uniform distribution
+ * through the specification of counts
+ */
+public interface Scatter {
+
+  /**
+   * Sender or Root.
+   */
+  static interface Sender<T> extends GroupCommOperator {
+
+    /**
+     * Distributes evenly across task ids sorted lexicographically.
+     */
+    void send(List<T> elements) throws NetworkException, InterruptedException;
+
+    /**
+     * Distributes as per counts across task ids sorted lexicographically.
+     */
+    void send(List<T> elements, Integer... counts) throws NetworkException, InterruptedException;
+
+    /**
+     * Distributes evenly across task ids sorted using order.
+     */
+    void send(List<T> elements, List<? extends Identifier> order)
+        throws NetworkException, InterruptedException;
+
+    /**
+     * Distributes as per counts across task ids sorted using order.
+     */
+    void send(List<T> elements, List<Integer> counts,
+              List<? extends Identifier> order) throws NetworkException, InterruptedException;
+  }
+
+  /**
+   * Receiver or non-roots.
+   */
+  static interface Receiver<T> extends GroupCommOperator {
+    /**
+     * Receive the sub-list of elements targeted for the current receiver.
+     *
+     * @return list of elements targeted for the current receiver.
+     */
+    List<T> receive() throws InterruptedException, NetworkException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/package-info.java
new file mode 100644
index 0000000..d2e600c
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/package-info.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Provides the interfaces for MPI style group communication operations.
+ * The interface is asymmetric for asymmetric operations and symmetric
+ * for symmetric operations unlike MPI which provides symmetric operations
+ * for all operations.
+ *
+ * The interface is asymmetric in the sense that Senders & Receivers are
+ * separated out for operations like Scatter and Gather. All participants
+ * do not execute the same function. A sender sends & a receiver receives.
+ *
+ * The interface only concentrates on the data part because we are on the
+ * data-plane of things in REEF. The control information is embedded in the
+ * {@link org.apache.reef.tang.Configuration} used to instantiate these
+ * operators. It is the responsibility of the Driver, the primary agent in
+ * the control-plane to configure these operators, that is, denote who is
+ * the sender, who are the receivers, what {@link org.apache.reef.io.serialization.Codec}
+ * need to be used and so on for an operation like Scatter with the root node
+ * acting as a sender and the other nodes as receivers.
+ *
+ * One thing implicit in MPI operations is the ordering of processors based
+ * on their ranks which determines the order of operations. For ex., if we
+ * scatter an array of 10 elements into 10 processors, then which processor
+ * gets the 1st entry & so on is based on the rank.
+ *
+ * In our case we do not have any ranks associated with tasks. Instead,
+ * by default we use the lexicographic order of the task ids. These can
+ * also be over-ridden in the send/receive/apply function calls
+ */
+package org.apache.reef.io.network.group.api.operators;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommGroupNetworkHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommGroupNetworkHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommGroupNetworkHandler.java
new file mode 100644
index 0000000..45b1839
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommGroupNetworkHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.task.CommGroupNetworkHandlerImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * The EventHandler that receives the GroupCommunicationMsg
+ * pertaining to a specific Communication Group
+ */
+@DefaultImplementation(value = CommGroupNetworkHandlerImpl.class)
+public interface CommGroupNetworkHandler extends EventHandler<GroupCommunicationMessage> {
+
+  void register(Class<? extends Name<String>> operName, EventHandler<GroupCommunicationMessage> handler);
+
+  void addTopologyElement(Class<? extends Name<String>> operName);
+
+  GroupCommunicationMessage waitForTopologyUpdate(Class<? extends Name<String>> operName);
+
+  byte[] waitForTopologyChanges(Class<? extends Name<String>> operName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
new file mode 100644
index 0000000..a1370e5
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.group.api.operators.Reduce;
+import org.apache.reef.io.network.group.api.GroupChanges;
+import org.apache.reef.io.network.group.impl.task.CommunicationGroupClientImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.tang.annotations.Name;
+
+/**
+ * The Task side interface of a communication group
+ * Lets one get the operators configured for this task
+ * and use them for communication between tasks configured
+ * in this communication group
+ */
+@TaskSide
+@DefaultImplementation(value = CommunicationGroupClientImpl.class)
+public interface CommunicationGroupClient {
+
+  /**
+   * @return The name configured on this communication group
+   */
+  Class<? extends Name<String>> getName();
+
+  /**
+   * The broadcast sender configured on this communication group
+   * with the given oepratorName
+   *
+   * @param operatorName
+   * @return
+   */
+  Broadcast.Sender getBroadcastSender(Class<? extends Name<String>> operatorName);
+
+  /**
+   * The broadcast receiver configured on this communication group
+   * with the given oepratorName
+   *
+   * @param operatorName
+   * @return
+   */
+  Broadcast.Receiver getBroadcastReceiver(Class<? extends Name<String>> operatorName);
+
+  /**
+   * The reduce receiver configured on this communication group
+   * with the given oepratorName
+   *
+   * @param operatorName
+   * @return
+   */
+  Reduce.Receiver getReduceReceiver(Class<? extends Name<String>> operatorName);
+
+  /**
+   * The reduce sender configured on this communication group
+   * with the given oepratorName
+   *
+   * @param operatorName
+   * @return
+   */
+  Reduce.Sender getReduceSender(Class<? extends Name<String>> operatorName);
+
+  /**
+   * @return Changes in topology of this communication group since the last time
+   * this method was called
+   */
+  GroupChanges getTopologyChanges();
+
+  /**
+   * Asks the driver to update the topology of this communication group. This can
+   * be an expensive call depending on what the minimum number of tasks is for this
+   * group to function as this first tells the driver, driver then tells the affected
+   * tasks and the driver gives a green only after affected tasks have had a chance
+   * to be sure that their topology will be updated before the next message is
+   * communicated
+   */
+  void updateTopology();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupServiceClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupServiceClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupServiceClient.java
new file mode 100644
index 0000000..b75af08
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupServiceClient.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.network.group.impl.task.CommunicationGroupClientImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+@Private
+@DefaultImplementation(value = CommunicationGroupClientImpl.class)
+public interface CommunicationGroupServiceClient extends CommunicationGroupClient {
+  /**
+   * Should not be used by user code
+   * Used for initialization of the
+   * communication group
+   */
+  void initialize();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommClient.java
new file mode 100644
index 0000000..14c811b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommClient.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.io.network.group.impl.task.GroupCommClientImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.tang.annotations.Name;
+
+
+/**
+ * The task side interface for the Group Communication Service
+ */
+@TaskSide
+@Provided
+@DefaultImplementation(value = GroupCommClientImpl.class)
+public interface GroupCommClient {
+
+  /**
+   * @param string
+   * @return The communication group client with the given name that gives access
+   * to the operators configured on it that will be used to do group communication
+   */
+  CommunicationGroupClient getCommunicationGroup(Class<? extends Name<String>> groupName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java
new file mode 100644
index 0000000..ecea973
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.task.GroupCommNetworkHandlerImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * The global EventHandler that receives the GroupCommunicationMsg
+ * and routes it to the relevant communication group
+ */
+@TaskSide
+@DefaultImplementation(value = GroupCommNetworkHandlerImpl.class)
+public interface GroupCommNetworkHandler extends EventHandler<Message<GroupCommunicationMessage>> {
+
+  void register(Class<? extends Name<String>> groupName, EventHandler<GroupCommunicationMessage> commGroupNetworkHandler);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/NodeStruct.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/NodeStruct.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/NodeStruct.java
new file mode 100644
index 0000000..08554f8
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/NodeStruct.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+
+/**
+ * The actual node that is part of the operator topology
+ *
+ * Receives data from the handlers & provides them to the
+ * operators/OperatorTopologyStruct when they need it.
+ *
+ * This implementation decouples the send & receive.
+ */
+public interface NodeStruct {
+
+  String getId();
+
+  int getVersion();
+
+  void setVersion(int version);
+
+  byte[] getData();
+
+  void addData(GroupCommunicationMessage msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
new file mode 100644
index 0000000..62b6934
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+
+/**
+ * Represents the local topology of tasks for an operator. It
+ * provides methods to send/rcv from parents & children
+ * <p/>
+ * Every operator is an EventHandler<GroupCommunicationMessage>
+ * and it will use an instance of this type to delegate the
+ * handling of the message and also uses it to communicate
+ * with its parents and children
+ * <p/>
+ * This is an operator facing interface. The actual topology is
+ * maintained in OperatorTopologyStruct. Current strategy is to
+ * maintain two versions of the topology and current operations
+ * are always delegated to effectiveTopology and the baseTopology
+ * is updated while initialization & when user calls updateTopology.
+ * So this is only a wrapper around the two versions of topologies
+ * and manages when to create/update them based on the messages from
+ * the driver.
+ */
+public interface OperatorTopology {
+
+  void handle(GroupCommunicationMessage msg);
+
+  void sendToParent(byte[] encode, ReefNetworkGroupCommProtos.GroupCommMessage.Type reduce) throws ParentDeadException;
+
+  byte[] recvFromParent() throws ParentDeadException;
+
+  void sendToChildren(byte[] data, ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws ParentDeadException;
+
+  <T> T recvFromChildren(ReduceFunction<T> redFunc, Codec<T> dataCodec) throws ParentDeadException;
+
+  void initialize() throws ParentDeadException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
new file mode 100644
index 0000000..dd262c9
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.operators.Sender;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * The actual local topology maintaining the
+ * children and parent that reacts to update
+ * and data msgs. The actual nodes are represented
+ * by NodeStruct and it handles receiving &
+ * providing data
+ */
+public interface OperatorTopologyStruct {
+
+  Class<? extends Name<String>> getGroupName();
+
+  Class<? extends Name<String>> getOperName();
+
+  String getSelfId();
+
+  int getVersion();
+
+  NodeStruct getParent();
+
+  Collection<? extends NodeStruct> getChildren();
+
+  String getDriverId();
+
+  Sender getSender();
+
+  boolean hasChanges();
+
+  void setChanges(boolean b);
+
+  void addAsData(GroupCommunicationMessage msg);
+
+  void update(Set<GroupCommunicationMessage> deletionDeltas);
+
+  void update(GroupCommunicationMessage msg);
+
+  void sendToParent(byte[] data, ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType);
+
+  byte[] recvFromParent();
+
+  void sendToChildren(byte[] data, ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType);
+
+  <T> T recvFromChildren(ReduceFunction<T> redFunc, Codec<T> dataCodec);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesCodec.java
new file mode 100644
index 0000000..46eb79e
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesCodec.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl;
+
+import org.apache.reef.io.network.group.api.GroupChanges;
+import org.apache.reef.io.serialization.Codec;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class GroupChangesCodec implements Codec<GroupChanges> {
+
+  @Inject
+  public GroupChangesCodec() {
+  }
+
+  @Override
+  public GroupChanges decode(final byte[] changeBytes) {
+    return new GroupChangesImpl(changeBytes[0] == 1);
+  }
+
+  @Override
+  public byte[] encode(final GroupChanges changes) {
+    final byte[] retVal = new byte[1];
+    if (changes.exist()) {
+      retVal[0] = 1;
+    }
+    return retVal;
+  }
+
+  public static void main(final String[] args) {
+    GroupChanges changes = new GroupChangesImpl(false);
+    final GroupChangesCodec changesCodec = new GroupChangesCodec();
+    GroupChanges changes1 = changesCodec.decode(changesCodec.encode(changes));
+    test(changes, changes1);
+    changes = new GroupChangesImpl(true);
+    changes1 = changesCodec.decode(changesCodec.encode(changes));
+    test(changes, changes1);
+  }
+
+  private static void test(final GroupChanges changes, final GroupChanges changes1) {
+
+    final Logger LOG = Logger.getLogger(GroupChangesCodec.class.getName());
+
+    final boolean c1 = changes.exist();
+    final boolean c2 = changes1.exist();
+
+    if (c1 != c2) {
+      LOG.log(Level.SEVERE, "Something is wrong: {0} != {1}", new Object[] {c1, c2});
+    } else {
+      LOG.log(Level.INFO, "Codec is fine");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesImpl.java
new file mode 100644
index 0000000..48da7e7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesImpl.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl;
+
+import org.apache.reef.io.network.group.api.GroupChanges;
+
+import javax.annotation.concurrent.Immutable;
+import javax.annotation.concurrent.ThreadSafe;
+
+@Immutable
+@ThreadSafe
+public class GroupChangesImpl implements GroupChanges {
+
+  private final boolean changes;
+
+  public GroupChangesImpl(final boolean changes) {
+    this.changes = changes;
+  }
+
+  @Override
+  public boolean exist() {
+    return changes;
+  }
+
+  @Override
+  public String toString() {
+    return "Changes: " + changes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java
new file mode 100644
index 0000000..c02d6af
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl;
+
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+
+import java.util.Arrays;
+
+/**
+ *
+ */
+public class GroupCommunicationMessage {
+  private final String groupName;
+  private final String operName;
+  private final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType;
+  private final String from;
+  private final int srcVersion;
+  private final String to;
+  private final int dstVersion;
+  private final byte[][] data;
+
+  private final String simpleGroupName;
+  private final String simpleOperName;
+
+  public GroupCommunicationMessage(
+      final String groupName,
+      final String operName,
+      final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType,
+      final String from, final int srcVersion,
+      final String to, final int dstVersion,
+      final byte[][] data) {
+    super();
+    this.groupName = groupName;
+    this.operName = operName;
+    this.msgType = msgType;
+    this.from = from;
+    this.srcVersion = srcVersion;
+    this.to = to;
+    this.dstVersion = dstVersion;
+    this.data = data;
+    this.simpleGroupName = Utils.simpleName(Utils.getClass(groupName));
+    this.simpleOperName = Utils.simpleName(Utils.getClass(operName));
+  }
+
+  public String getGroupname() {
+    return groupName;
+  }
+
+  public String getOperatorname() {
+    return operName;
+  }
+
+  public String getSimpleOperName() {
+    return simpleOperName;
+  }
+
+  public ReefNetworkGroupCommProtos.GroupCommMessage.Type getType() {
+    return msgType;
+  }
+
+  public String getSrcid() {
+    return from;
+  }
+
+  public int getSrcVersion() {
+    return srcVersion;
+  }
+
+  public String getDestid() {
+    return to;
+  }
+
+  public int getVersion() {
+    return dstVersion;
+  }
+
+  public String getSource() {
+    return "(" + getSrcid() + "," + getSrcVersion() + ")";
+  }
+
+  public String getDestination() {
+    return "(" + getDestid() + "," + getVersion() + ")";
+  }
+
+  public byte[][] getData() {
+    return data;
+  }
+
+  public int getMsgsCount() {
+    return data.length;
+  }
+
+  public boolean hasVersion() {
+    return true;
+  }
+
+  public boolean hasSrcVersion() {
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "[" + msgType + " from " + getSource() + " to " + getDestination() + " for " + simpleGroupName + ":" + simpleOperName + "]";
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (this != obj) {
+      if (obj instanceof GroupCommunicationMessage) {
+        final GroupCommunicationMessage that = (GroupCommunicationMessage) obj;
+        if (!this.groupName.equals(that.groupName)) {
+          return false;
+        }
+        if (!this.operName.equals(that.operName)) {
+          return false;
+        }
+        if (!this.from.equals(that.from)) {
+          return false;
+        }
+        if (this.srcVersion != that.srcVersion) {
+          return false;
+        }
+        if (!this.to.equals(that.to)) {
+          return false;
+        }
+        if (this.dstVersion != that.dstVersion) {
+          return false;
+        }
+        if (!this.msgType.equals(that.msgType)) {
+          return false;
+        }
+        if (this.data.length != that.data.length) {
+          return false;
+        }
+        for (int i = 0; i < data.length; i++) {
+          if (!Arrays.equals(this.data[i], that.data[i])) {
+            return false;
+          }
+        }
+
+        return true;
+      } else {
+        return false;
+      }
+    } else {
+      return true;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessageCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessageCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessageCodec.java
new file mode 100644
index 0000000..8b5225d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessageCodec.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl;
+
+
+import org.apache.reef.io.network.impl.StreamingCodec;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage.Type;
+
+import javax.inject.Inject;
+import java.io.*;
+
+/**
+ * Codec for {@link GroupCommMessage}
+ */
+public class GroupCommunicationMessageCodec implements StreamingCodec<GroupCommunicationMessage> {
+
+  @Inject
+  public GroupCommunicationMessageCodec() {
+    // Intentionally Blank
+  }
+
+  @Override
+  public GroupCommunicationMessage decode(final byte[] data) {
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(data)) {
+      try (DataInputStream dais = new DataInputStream(bais)) {
+        return decodeFromStream(dais);
+      }
+    } catch (final IOException e) {
+      throw new RuntimeException("IOException", e);
+    }
+  }
+
+  @Override
+  public GroupCommunicationMessage decodeFromStream(final DataInputStream stream) {
+    try {
+      final String groupName = stream.readUTF();
+      final String operName = stream.readUTF();
+      final Type msgType = Type.valueOf(stream.readInt());
+      final String from = stream.readUTF();
+      final int srcVersion = stream.readInt();
+      final String to = stream.readUTF();
+      final int dstVersion = stream.readInt();
+      final byte[][] gcmData = new byte[stream.readInt()][];
+      for (int i = 0; i < gcmData.length; i++) {
+        gcmData[i] = new byte[stream.readInt()];
+        stream.readFully(gcmData[i]);
+      }
+      return new GroupCommunicationMessage(
+          groupName,
+          operName,
+          msgType,
+          from,
+          srcVersion,
+          to,
+          dstVersion,
+          gcmData);
+    } catch (final IOException e) {
+      throw new RuntimeException("IOException", e);
+    }
+  }
+
+  @Override
+  public byte[] encode(final GroupCommunicationMessage msg) {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      try (DataOutputStream daos = new DataOutputStream(baos)) {
+        encodeToStream(msg, daos);
+      }
+      return baos.toByteArray();
+    } catch (final IOException e) {
+      throw new RuntimeException("IOException", e);
+    }
+  }
+
+  @Override
+  public void encodeToStream(final GroupCommunicationMessage msg, final DataOutputStream stream) {
+    try {
+      stream.writeUTF(msg.getGroupname());
+      stream.writeUTF(msg.getOperatorname());
+      stream.writeInt(msg.getType().getNumber());
+      stream.writeUTF(msg.getSrcid());
+      stream.writeInt(msg.getSrcVersion());
+      stream.writeUTF(msg.getDestid());
+      stream.writeInt(msg.getVersion());
+      stream.writeInt(msg.getMsgsCount());
+      for (final byte[] b : msg.getData()) {
+        stream.writeInt(b.length);
+        stream.write(b);
+      }
+    } catch (final IOException e) {
+      throw new RuntimeException("IOException", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/BroadcastOperatorSpec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/BroadcastOperatorSpec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/BroadcastOperatorSpec.java
new file mode 100644
index 0000000..11514b1
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/BroadcastOperatorSpec.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config;
+
+import org.apache.reef.io.network.group.api.config.OperatorSpec;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.serialization.Codec;
+
+
+/**
+ * The specification for the broadcast operator
+ */
+public class BroadcastOperatorSpec implements OperatorSpec {
+  private final String senderId;
+
+  /**
+   * Codec to be used to serialize data
+   */
+  private final Class<? extends Codec> dataCodecClass;
+
+
+  public BroadcastOperatorSpec(final String senderId,
+                               final Class<? extends Codec> dataCodecClass) {
+    super();
+    this.senderId = senderId;
+    this.dataCodecClass = dataCodecClass;
+  }
+
+  public String getSenderId() {
+    return senderId;
+  }
+
+  @Override
+  public Class<? extends Codec> getDataCodecClass() {
+    return dataCodecClass;
+  }
+
+  @Override
+  public String toString() {
+    return "Broadcast Operator Spec: [sender=" + senderId + "] [dataCodecClass=" + Utils.simpleName(dataCodecClass)
+        + "]";
+  }
+
+  public static Builder newBuilder() {
+    return new BroadcastOperatorSpec.Builder();
+  }
+
+  public static class Builder implements org.apache.reef.util.Builder<BroadcastOperatorSpec> {
+    private String senderId;
+
+    private Class<? extends Codec> dataCodecClass;
+
+
+    public Builder setSenderId(final String senderId) {
+      this.senderId = senderId;
+      return this;
+    }
+
+    public Builder setDataCodecClass(final Class<? extends Codec> codecClazz) {
+      this.dataCodecClass = codecClazz;
+      return this;
+    }
+
+    @Override
+    public BroadcastOperatorSpec build() {
+      return new BroadcastOperatorSpec(senderId, dataCodecClass);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ReduceOperatorSpec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ReduceOperatorSpec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ReduceOperatorSpec.java
new file mode 100644
index 0000000..386ad13
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ReduceOperatorSpec.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config;
+
+import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
+import org.apache.reef.io.network.group.api.config.OperatorSpec;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.serialization.Codec;
+
+/**
+ * The specification for the Reduce operator
+ */
+public class ReduceOperatorSpec implements OperatorSpec {
+
+  private final String receiverId;
+
+  /**
+   * Codec to be used to serialize data
+   */
+  private final Class<? extends Codec> dataCodecClass;
+
+  /**
+   * The reduce function to be used for operations that do reduction
+   */
+  public final Class<? extends ReduceFunction> redFuncClass;
+
+
+  public ReduceOperatorSpec(final String receiverId,
+                            final Class<? extends Codec> dataCodecClass,
+                            final Class<? extends ReduceFunction> redFuncClass) {
+    super();
+    this.receiverId = receiverId;
+    this.dataCodecClass = dataCodecClass;
+    this.redFuncClass = redFuncClass;
+  }
+
+  public String getReceiverId() {
+    return receiverId;
+  }
+
+  /**
+   * @return the redFuncClass
+   */
+  public Class<? extends ReduceFunction> getRedFuncClass() {
+    return redFuncClass;
+  }
+
+  @Override
+  public Class<? extends Codec> getDataCodecClass() {
+    return dataCodecClass;
+  }
+
+  @Override
+  public String toString() {
+    return "Reduce Operator Spec: [receiver=" + receiverId + "] [dataCodecClass=" + Utils.simpleName(dataCodecClass)
+        + "] [reduceFunctionClass=" + Utils.simpleName(redFuncClass) + "]";
+  }
+
+  public static Builder newBuilder() {
+    return new ReduceOperatorSpec.Builder();
+  }
+
+  public static class Builder implements org.apache.reef.util.Builder<ReduceOperatorSpec> {
+
+    private String receiverId;
+
+    private Class<? extends Codec> dataCodecClass;
+
+    private Class<? extends ReduceFunction> redFuncClass;
+
+    public Builder setReceiverId(final String receiverId) {
+      this.receiverId = receiverId;
+      return this;
+    }
+
+    public Builder setDataCodecClass(final Class<? extends Codec> codecClazz) {
+      this.dataCodecClass = codecClazz;
+      return this;
+    }
+
+    public Builder setReduceFunctionClass(final Class<? extends ReduceFunction> redFuncClass) {
+      this.redFuncClass = redFuncClass;
+      return this;
+    }
+
+    @Override
+    public ReduceOperatorSpec build() {
+      return new ReduceOperatorSpec(receiverId, dataCodecClass, redFuncClass);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/CommunicationGroupName.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/CommunicationGroupName.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/CommunicationGroupName.java
new file mode 100644
index 0000000..49dab96
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/CommunicationGroupName.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "Name of the comm group")
+public final class CommunicationGroupName implements Name<String> {
+  private CommunicationGroupName() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DataCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DataCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DataCodec.java
new file mode 100644
index 0000000..82613a7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DataCodec.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "Codec used to serialize and deserialize data in operators")
+public final class DataCodec implements Name<Codec> {
+  private DataCodec() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/OperatorName.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/OperatorName.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/OperatorName.java
new file mode 100644
index 0000000..9098577
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/OperatorName.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "Name of the operator")
+public final class OperatorName implements Name<String> {
+  private OperatorName() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/ReduceFunctionParam.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/ReduceFunctionParam.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/ReduceFunctionParam.java
new file mode 100644
index 0000000..d37408b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/ReduceFunctionParam.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The reduce function class that is associated with a reduce operator")
+public final class ReduceFunctionParam implements Name<ReduceFunction> {
+  private ReduceFunctionParam() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedGroupConfigs.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedGroupConfigs.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedGroupConfigs.java
new file mode 100644
index 0000000..1844766
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedGroupConfigs.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+@NamedParameter(doc = "Serialized communication group configurations")
+public final class SerializedGroupConfigs implements Name<Set<String>> {
+  private SerializedGroupConfigs() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedOperConfigs.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedOperConfigs.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedOperConfigs.java
new file mode 100644
index 0000000..e0103f5
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedOperConfigs.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+@NamedParameter(doc = "Serialized operator configurations")
+public final class SerializedOperConfigs implements Name<Set<String>> {
+  private SerializedOperConfigs() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TaskVersion.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TaskVersion.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TaskVersion.java
new file mode 100644
index 0000000..b1caa84
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TaskVersion.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The version that this task is assigned")
+public final class TaskVersion implements Name<Integer> {
+  private TaskVersion() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TreeTopologyFanOut.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TreeTopologyFanOut.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TreeTopologyFanOut.java
new file mode 100644
index 0000000..c759130
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TreeTopologyFanOut.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The fan out for the tree topology", default_value = "2", short_name = "fanout")
+public final class TreeTopologyFanOut implements Name<Integer> {
+  private TreeTopologyFanOut() {
+  }
+}