You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/03/06 02:55:35 UTC
[5/8] incubator-reef git commit: [REEF-118] Add Shimoga library for
elastic group communication.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/ReduceScatter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/ReduceScatter.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/ReduceScatter.java
new file mode 100644
index 0000000..792b655
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/ReduceScatter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.operators;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.wake.Identifier;
+
+import java.util.List;
+
+/**
+ * MPI Reduce Scatter operator.
+ * <p/>
+ * Each task has a list of elements. Assume that each task reduces
+ * each element in the list to form a list of reduced elements at a dummy root.
+ * The dummy root then keeps the portion of the list assigned to it and
+ * scatters the remaining among the other tasks
+ */
+public interface ReduceScatter<T> extends GroupCommOperator {
+
+ /**
+ * Apply this operation on elements where counts specify the distribution of
+ * elements to each task. Ordering is assumed to be default.
+ * <p/>
+ * Here counts is of the same size as the entire group not just children.
+ *
+ * @return List of values that result from applying reduce function on
+ * corresponding elements of each list received as a result of
+ * applying scatter.
+ */
+ List<T> apply(List<T> elements, List<Integer> counts) throws InterruptedException, NetworkException;
+
+ /**
+ * Apply this operation on elements where counts specify the distribution of
+ * elements to each task. Ordering is specified using order
+ * <p/>
+ * Here counts is of the same size as the entire group not just children
+ *
+ * @return List of values that result from applying reduce function on
+ * corresponding elements of each list received as a result of
+ * applying scatter.
+ */
+ List<T> apply(List<T> elements, List<Integer> counts,
+ List<? extends Identifier> order) throws InterruptedException, NetworkException;
+
+ /**
+ * get {@link org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction} configured
+ *
+ * @return {@link org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction}
+ */
+ Reduce.ReduceFunction<T> getReduceFunction();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
new file mode 100644
index 0000000..6c58d61
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.operators;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.wake.Identifier;
+
+import java.util.List;
+
+/**
+ * MPI Scatter operator
+ * <p/>
+ * Scatter a list of elements to the receivers The receivers will receive a
+ * sub-list of elements targeted for them. Supports non-uniform distribution
+ * through the specification of counts
+ */
+public interface Scatter {
+
+ /**
+ * Sender or Root.
+ */
+ static interface Sender<T> extends GroupCommOperator {
+
+ /**
+ * Distributes evenly across task ids sorted lexicographically.
+ */
+ void send(List<T> elements) throws NetworkException, InterruptedException;
+
+ /**
+ * Distributes as per counts across task ids sorted lexicographically.
+ */
+ void send(List<T> elements, Integer... counts) throws NetworkException, InterruptedException;
+
+ /**
+ * Distributes evenly across task ids sorted using order.
+ */
+ void send(List<T> elements, List<? extends Identifier> order)
+ throws NetworkException, InterruptedException;
+
+ /**
+ * Distributes as per counts across task ids sorted using order.
+ */
+ void send(List<T> elements, List<Integer> counts,
+ List<? extends Identifier> order) throws NetworkException, InterruptedException;
+ }
+
+ /**
+ * Receiver or non-roots.
+ */
+ static interface Receiver<T> extends GroupCommOperator {
+ /**
+ * Receive the sub-list of elements targeted for the current receiver.
+ *
+ * @return list of elements targeted for the current receiver.
+ */
+ List<T> receive() throws InterruptedException, NetworkException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/package-info.java
new file mode 100644
index 0000000..d2e600c
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/package-info.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Provides the interfaces for MPI style group communication operations.
+ * The interface is asymmetric for asymmetric operations and symmetric
+ * for symmetric operations unlike MPI which provides symmetric operations
+ * for all operations.
+ *
+ * The interface is asymmetric in the sense that Senders & Receivers are
+ * separated out for operations like Scatter and Gather. All participants
+ * do not execute the same function. A sender sends & a receiver receives.
+ *
+ * The interface only concentrates on the data part because we are on the
+ * data-plane of things in REEF. The control information is embedded in the
+ * {@link org.apache.reef.tang.Configuration} used to instantiate these
+ * operators. It is the responsibility of the Driver, the primary agent in
+ * the control-plane to configure these operators, that is, denote who is
+ * the sender, who are the receivers, what {@link org.apache.reef.io.serialization.Codec}
+ * need to be used and so on for an operation like Scatter with the root node
+ * acting as a sender and the other nodes as receivers.
+ *
+ * One thing implicit in MPI operations is the ordering of processors based
+ * on their ranks which determines the order of operations. For ex., if we
+ * scatter an array of 10 elements into 10 processors, then which processor
+ * gets the 1st entry & so on is based on the rank.
+ *
+ * In our case we do not have any ranks associated with tasks. Instead,
+ * by default we use the lexicographic order of the task ids. These can
+ * also be over-ridden in the send/receive/apply function calls
+ */
+package org.apache.reef.io.network.group.api.operators;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommGroupNetworkHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommGroupNetworkHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommGroupNetworkHandler.java
new file mode 100644
index 0000000..45b1839
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommGroupNetworkHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.task.CommGroupNetworkHandlerImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * The EventHandler that receives the GroupCommunicationMsg
+ * pertaining to a specific Communication Group
+ */
+@DefaultImplementation(value = CommGroupNetworkHandlerImpl.class)
+public interface CommGroupNetworkHandler extends EventHandler<GroupCommunicationMessage> {
+
+ void register(Class<? extends Name<String>> operName, EventHandler<GroupCommunicationMessage> handler);
+
+ void addTopologyElement(Class<? extends Name<String>> operName);
+
+ GroupCommunicationMessage waitForTopologyUpdate(Class<? extends Name<String>> operName);
+
+ byte[] waitForTopologyChanges(Class<? extends Name<String>> operName);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
new file mode 100644
index 0000000..a1370e5
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.group.api.operators.Reduce;
+import org.apache.reef.io.network.group.api.GroupChanges;
+import org.apache.reef.io.network.group.impl.task.CommunicationGroupClientImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.tang.annotations.Name;
+
+/**
+ * The Task side interface of a communication group
+ * Lets one get the operators configured for this task
+ * and use them for communication between tasks configured
+ * in this communication group
+ */
+@TaskSide
+@DefaultImplementation(value = CommunicationGroupClientImpl.class)
+public interface CommunicationGroupClient {
+
+ /**
+ * @return The name configured on this communication group
+ */
+ Class<? extends Name<String>> getName();
+
+ /**
+ * The broadcast sender configured on this communication group
+ * with the given oepratorName
+ *
+ * @param operatorName
+ * @return
+ */
+ Broadcast.Sender getBroadcastSender(Class<? extends Name<String>> operatorName);
+
+ /**
+ * The broadcast receiver configured on this communication group
+ * with the given oepratorName
+ *
+ * @param operatorName
+ * @return
+ */
+ Broadcast.Receiver getBroadcastReceiver(Class<? extends Name<String>> operatorName);
+
+ /**
+ * The reduce receiver configured on this communication group
+ * with the given oepratorName
+ *
+ * @param operatorName
+ * @return
+ */
+ Reduce.Receiver getReduceReceiver(Class<? extends Name<String>> operatorName);
+
+ /**
+ * The reduce sender configured on this communication group
+ * with the given oepratorName
+ *
+ * @param operatorName
+ * @return
+ */
+ Reduce.Sender getReduceSender(Class<? extends Name<String>> operatorName);
+
+ /**
+ * @return Changes in topology of this communication group since the last time
+ * this method was called
+ */
+ GroupChanges getTopologyChanges();
+
+ /**
+ * Asks the driver to update the topology of this communication group. This can
+ * be an expensive call depending on what the minimum number of tasks is for this
+ * group to function as this first tells the driver, driver then tells the affected
+ * tasks and the driver gives a green only after affected tasks have had a chance
+ * to be sure that their topology will be updated before the next message is
+ * communicated
+ */
+ void updateTopology();
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupServiceClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupServiceClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupServiceClient.java
new file mode 100644
index 0000000..b75af08
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupServiceClient.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.network.group.impl.task.CommunicationGroupClientImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+@Private
+@DefaultImplementation(value = CommunicationGroupClientImpl.class)
+public interface CommunicationGroupServiceClient extends CommunicationGroupClient {
+ /**
+ * Should not be used by user code
+ * Used for initialization of the
+ * communication group
+ */
+ void initialize();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommClient.java
new file mode 100644
index 0000000..14c811b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommClient.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.io.network.group.impl.task.GroupCommClientImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.tang.annotations.Name;
+
+
+/**
+ * The task side interface for the Group Communication Service
+ */
+@TaskSide
+@Provided
+@DefaultImplementation(value = GroupCommClientImpl.class)
+public interface GroupCommClient {
+
+ /**
+ * @param string
+ * @return The communication group client with the given name that gives access
+ * to the operators configured on it that will be used to do group communication
+ */
+ CommunicationGroupClient getCommunicationGroup(Class<? extends Name<String>> groupName);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java
new file mode 100644
index 0000000..ecea973
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.task.GroupCommNetworkHandlerImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * The global EventHandler that receives the GroupCommunicationMsg
+ * and routes it to the relevant communication group
+ */
+@TaskSide
+@DefaultImplementation(value = GroupCommNetworkHandlerImpl.class)
+public interface GroupCommNetworkHandler extends EventHandler<Message<GroupCommunicationMessage>> {
+
+ void register(Class<? extends Name<String>> groupName, EventHandler<GroupCommunicationMessage> commGroupNetworkHandler);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/NodeStruct.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/NodeStruct.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/NodeStruct.java
new file mode 100644
index 0000000..08554f8
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/NodeStruct.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+
+/**
+ * The actual node that is part of the operator topology
+ *
+ * Receives data from the handlers & provides them to the
+ * operators/OperatorTopologyStruct when they need it.
+ *
+ * This implementation decouples the send & receive.
+ */
+public interface NodeStruct {
+
+ String getId();
+
+ int getVersion();
+
+ void setVersion(int version);
+
+ byte[] getData();
+
+ void addData(GroupCommunicationMessage msg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
new file mode 100644
index 0000000..62b6934
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+
+/**
+ * Represents the local topology of tasks for an operator. It
+ * provides methods to send/rcv from parents & children
+ * <p/>
+ * Every operator is an EventHandler<GroupCommunicationMessage>
+ * and it will use an instance of this type to delegate the
+ * handling of the message and also uses it to communicate
+ * with its parents and children
+ * <p/>
+ * This is an operator facing interface. The actual topology is
+ * maintained in OperatorTopologyStruct. Current strategy is to
+ * maintain two versions of the topology and current operations
+ * are always delegated to effectiveTopology and the baseTopology
+ * is updated while initialization & when user calls updateTopology.
+ * So this is only a wrapper around the two versions of topologies
+ * and manages when to create/update them based on the messages from
+ * the driver.
+ */
+public interface OperatorTopology {
+
+ void handle(GroupCommunicationMessage msg);
+
+ void sendToParent(byte[] encode, ReefNetworkGroupCommProtos.GroupCommMessage.Type reduce) throws ParentDeadException;
+
+ byte[] recvFromParent() throws ParentDeadException;
+
+ void sendToChildren(byte[] data, ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws ParentDeadException;
+
+ <T> T recvFromChildren(ReduceFunction<T> redFunc, Codec<T> dataCodec) throws ParentDeadException;
+
+ void initialize() throws ParentDeadException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
new file mode 100644
index 0000000..dd262c9
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.task;
+
+import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.operators.Sender;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * The actual local topology maintaining the
+ * children and parent that reacts to update
+ * and data msgs. The actual nodes are represented
+ * by NodeStruct and it handles receiving &
+ * providing data
+ */
+public interface OperatorTopologyStruct {
+
+ Class<? extends Name<String>> getGroupName();
+
+ Class<? extends Name<String>> getOperName();
+
+ String getSelfId();
+
+ int getVersion();
+
+ NodeStruct getParent();
+
+ Collection<? extends NodeStruct> getChildren();
+
+ String getDriverId();
+
+ Sender getSender();
+
+ boolean hasChanges();
+
+ void setChanges(boolean b);
+
+ void addAsData(GroupCommunicationMessage msg);
+
+ void update(Set<GroupCommunicationMessage> deletionDeltas);
+
+ void update(GroupCommunicationMessage msg);
+
+ void sendToParent(byte[] data, ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType);
+
+ byte[] recvFromParent();
+
+ void sendToChildren(byte[] data, ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType);
+
+ <T> T recvFromChildren(ReduceFunction<T> redFunc, Codec<T> dataCodec);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesCodec.java
new file mode 100644
index 0000000..46eb79e
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesCodec.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl;
+
+import org.apache.reef.io.network.group.api.GroupChanges;
+import org.apache.reef.io.serialization.Codec;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class GroupChangesCodec implements Codec<GroupChanges> {
+
+ @Inject
+ public GroupChangesCodec() {
+ }
+
+ @Override
+ public GroupChanges decode(final byte[] changeBytes) {
+ return new GroupChangesImpl(changeBytes[0] == 1);
+ }
+
+ @Override
+ public byte[] encode(final GroupChanges changes) {
+ final byte[] retVal = new byte[1];
+ if (changes.exist()) {
+ retVal[0] = 1;
+ }
+ return retVal;
+ }
+
+ public static void main(final String[] args) {
+ GroupChanges changes = new GroupChangesImpl(false);
+ final GroupChangesCodec changesCodec = new GroupChangesCodec();
+ GroupChanges changes1 = changesCodec.decode(changesCodec.encode(changes));
+ test(changes, changes1);
+ changes = new GroupChangesImpl(true);
+ changes1 = changesCodec.decode(changesCodec.encode(changes));
+ test(changes, changes1);
+ }
+
+ private static void test(final GroupChanges changes, final GroupChanges changes1) {
+
+ final Logger LOG = Logger.getLogger(GroupChangesCodec.class.getName());
+
+ final boolean c1 = changes.exist();
+ final boolean c2 = changes1.exist();
+
+ if (c1 != c2) {
+ LOG.log(Level.SEVERE, "Something is wrong: {0} != {1}", new Object[] {c1, c2});
+ } else {
+ LOG.log(Level.INFO, "Codec is fine");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesImpl.java
new file mode 100644
index 0000000..48da7e7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesImpl.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl;
+
+import org.apache.reef.io.network.group.api.GroupChanges;
+
+import javax.annotation.concurrent.Immutable;
+import javax.annotation.concurrent.ThreadSafe;
+
+@Immutable
+@ThreadSafe
+public class GroupChangesImpl implements GroupChanges {
+
+ private final boolean changes;
+
+ public GroupChangesImpl(final boolean changes) {
+ this.changes = changes;
+ }
+
+ @Override
+ public boolean exist() {
+ return changes;
+ }
+
+ @Override
+ public String toString() {
+ return "Changes: " + changes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java
new file mode 100644
index 0000000..c02d6af
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl;
+
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+
+import java.util.Arrays;
+
+/**
+ *
+ */
+public class GroupCommunicationMessage {
+ private final String groupName;
+ private final String operName;
+ private final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType;
+ private final String from;
+ private final int srcVersion;
+ private final String to;
+ private final int dstVersion;
+ private final byte[][] data;
+
+ private final String simpleGroupName;
+ private final String simpleOperName;
+
+ public GroupCommunicationMessage(
+ final String groupName,
+ final String operName,
+ final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType,
+ final String from, final int srcVersion,
+ final String to, final int dstVersion,
+ final byte[][] data) {
+ super();
+ this.groupName = groupName;
+ this.operName = operName;
+ this.msgType = msgType;
+ this.from = from;
+ this.srcVersion = srcVersion;
+ this.to = to;
+ this.dstVersion = dstVersion;
+ this.data = data;
+ this.simpleGroupName = Utils.simpleName(Utils.getClass(groupName));
+ this.simpleOperName = Utils.simpleName(Utils.getClass(operName));
+ }
+
+ public String getGroupname() {
+ return groupName;
+ }
+
+ public String getOperatorname() {
+ return operName;
+ }
+
+ public String getSimpleOperName() {
+ return simpleOperName;
+ }
+
+ public ReefNetworkGroupCommProtos.GroupCommMessage.Type getType() {
+ return msgType;
+ }
+
+ public String getSrcid() {
+ return from;
+ }
+
+ public int getSrcVersion() {
+ return srcVersion;
+ }
+
+ public String getDestid() {
+ return to;
+ }
+
+ public int getVersion() {
+ return dstVersion;
+ }
+
+ public String getSource() {
+ return "(" + getSrcid() + "," + getSrcVersion() + ")";
+ }
+
+ public String getDestination() {
+ return "(" + getDestid() + "," + getVersion() + ")";
+ }
+
+ public byte[][] getData() {
+ return data;
+ }
+
+ public int getMsgsCount() {
+ return data.length;
+ }
+
+ public boolean hasVersion() {
+ return true;
+ }
+
+ public boolean hasSrcVersion() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + msgType + " from " + getSource() + " to " + getDestination() + " for " + simpleGroupName + ":" + simpleOperName + "]";
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this != obj) {
+ if (obj instanceof GroupCommunicationMessage) {
+ final GroupCommunicationMessage that = (GroupCommunicationMessage) obj;
+ if (!this.groupName.equals(that.groupName)) {
+ return false;
+ }
+ if (!this.operName.equals(that.operName)) {
+ return false;
+ }
+ if (!this.from.equals(that.from)) {
+ return false;
+ }
+ if (this.srcVersion != that.srcVersion) {
+ return false;
+ }
+ if (!this.to.equals(that.to)) {
+ return false;
+ }
+ if (this.dstVersion != that.dstVersion) {
+ return false;
+ }
+ if (!this.msgType.equals(that.msgType)) {
+ return false;
+ }
+ if (this.data.length != that.data.length) {
+ return false;
+ }
+ for (int i = 0; i < data.length; i++) {
+ if (!Arrays.equals(this.data[i], that.data[i])) {
+ return false;
+ }
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessageCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessageCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessageCodec.java
new file mode 100644
index 0000000..8b5225d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessageCodec.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl;
+
+
+import org.apache.reef.io.network.impl.StreamingCodec;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage.Type;
+
+import javax.inject.Inject;
+import java.io.*;
+
+/**
+ * Codec for {@link GroupCommMessage}
+ */
+public class GroupCommunicationMessageCodec implements StreamingCodec<GroupCommunicationMessage> {
+
+ @Inject
+ public GroupCommunicationMessageCodec() {
+ // Intentionally Blank
+ }
+
+ @Override
+ public GroupCommunicationMessage decode(final byte[] data) {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(data)) {
+ try (DataInputStream dais = new DataInputStream(bais)) {
+ return decodeFromStream(dais);
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ }
+
+ @Override
+ public GroupCommunicationMessage decodeFromStream(final DataInputStream stream) {
+ try {
+ final String groupName = stream.readUTF();
+ final String operName = stream.readUTF();
+ final Type msgType = Type.valueOf(stream.readInt());
+ final String from = stream.readUTF();
+ final int srcVersion = stream.readInt();
+ final String to = stream.readUTF();
+ final int dstVersion = stream.readInt();
+ final byte[][] gcmData = new byte[stream.readInt()][];
+ for (int i = 0; i < gcmData.length; i++) {
+ gcmData[i] = new byte[stream.readInt()];
+ stream.readFully(gcmData[i]);
+ }
+ return new GroupCommunicationMessage(
+ groupName,
+ operName,
+ msgType,
+ from,
+ srcVersion,
+ to,
+ dstVersion,
+ gcmData);
+ } catch (final IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ }
+
+ @Override
+ public byte[] encode(final GroupCommunicationMessage msg) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ try (DataOutputStream daos = new DataOutputStream(baos)) {
+ encodeToStream(msg, daos);
+ }
+ return baos.toByteArray();
+ } catch (final IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ }
+
+ @Override
+ public void encodeToStream(final GroupCommunicationMessage msg, final DataOutputStream stream) {
+ try {
+ stream.writeUTF(msg.getGroupname());
+ stream.writeUTF(msg.getOperatorname());
+ stream.writeInt(msg.getType().getNumber());
+ stream.writeUTF(msg.getSrcid());
+ stream.writeInt(msg.getSrcVersion());
+ stream.writeUTF(msg.getDestid());
+ stream.writeInt(msg.getVersion());
+ stream.writeInt(msg.getMsgsCount());
+ for (final byte[] b : msg.getData()) {
+ stream.writeInt(b.length);
+ stream.write(b);
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/BroadcastOperatorSpec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/BroadcastOperatorSpec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/BroadcastOperatorSpec.java
new file mode 100644
index 0000000..11514b1
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/BroadcastOperatorSpec.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config;
+
+import org.apache.reef.io.network.group.api.config.OperatorSpec;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.serialization.Codec;
+
+
+/**
+ * The specification for the broadcast operator
+ */
+public class BroadcastOperatorSpec implements OperatorSpec {
+ private final String senderId;
+
+ /**
+ * Codec to be used to serialize data
+ */
+ private final Class<? extends Codec> dataCodecClass;
+
+
+ public BroadcastOperatorSpec(final String senderId,
+ final Class<? extends Codec> dataCodecClass) {
+ super();
+ this.senderId = senderId;
+ this.dataCodecClass = dataCodecClass;
+ }
+
+ public String getSenderId() {
+ return senderId;
+ }
+
+ @Override
+ public Class<? extends Codec> getDataCodecClass() {
+ return dataCodecClass;
+ }
+
+ @Override
+ public String toString() {
+ return "Broadcast Operator Spec: [sender=" + senderId + "] [dataCodecClass=" + Utils.simpleName(dataCodecClass)
+ + "]";
+ }
+
+ public static Builder newBuilder() {
+ return new BroadcastOperatorSpec.Builder();
+ }
+
+ public static class Builder implements org.apache.reef.util.Builder<BroadcastOperatorSpec> {
+ private String senderId;
+
+ private Class<? extends Codec> dataCodecClass;
+
+
+ public Builder setSenderId(final String senderId) {
+ this.senderId = senderId;
+ return this;
+ }
+
+ public Builder setDataCodecClass(final Class<? extends Codec> codecClazz) {
+ this.dataCodecClass = codecClazz;
+ return this;
+ }
+
+ @Override
+ public BroadcastOperatorSpec build() {
+ return new BroadcastOperatorSpec(senderId, dataCodecClass);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ReduceOperatorSpec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ReduceOperatorSpec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ReduceOperatorSpec.java
new file mode 100644
index 0000000..386ad13
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ReduceOperatorSpec.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config;
+
+import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
+import org.apache.reef.io.network.group.api.config.OperatorSpec;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.serialization.Codec;
+
+/**
+ * The specification for the Reduce operator
+ */
+public class ReduceOperatorSpec implements OperatorSpec {
+
+ private final String receiverId;
+
+ /**
+ * Codec to be used to serialize data
+ */
+ private final Class<? extends Codec> dataCodecClass;
+
+ /**
+ * The reduce function to be used for operations that do reduction
+ */
+ public final Class<? extends ReduceFunction> redFuncClass;
+
+
+ public ReduceOperatorSpec(final String receiverId,
+ final Class<? extends Codec> dataCodecClass,
+ final Class<? extends ReduceFunction> redFuncClass) {
+ super();
+ this.receiverId = receiverId;
+ this.dataCodecClass = dataCodecClass;
+ this.redFuncClass = redFuncClass;
+ }
+
+ public String getReceiverId() {
+ return receiverId;
+ }
+
+ /**
+ * @return the redFuncClass
+ */
+ public Class<? extends ReduceFunction> getRedFuncClass() {
+ return redFuncClass;
+ }
+
+ @Override
+ public Class<? extends Codec> getDataCodecClass() {
+ return dataCodecClass;
+ }
+
+ @Override
+ public String toString() {
+ return "Reduce Operator Spec: [receiver=" + receiverId + "] [dataCodecClass=" + Utils.simpleName(dataCodecClass)
+ + "] [reduceFunctionClass=" + Utils.simpleName(redFuncClass) + "]";
+ }
+
+ public static Builder newBuilder() {
+ return new ReduceOperatorSpec.Builder();
+ }
+
+ public static class Builder implements org.apache.reef.util.Builder<ReduceOperatorSpec> {
+
+ private String receiverId;
+
+ private Class<? extends Codec> dataCodecClass;
+
+ private Class<? extends ReduceFunction> redFuncClass;
+
+ public Builder setReceiverId(final String receiverId) {
+ this.receiverId = receiverId;
+ return this;
+ }
+
+ public Builder setDataCodecClass(final Class<? extends Codec> codecClazz) {
+ this.dataCodecClass = codecClazz;
+ return this;
+ }
+
+ public Builder setReduceFunctionClass(final Class<? extends ReduceFunction> redFuncClass) {
+ this.redFuncClass = redFuncClass;
+ return this;
+ }
+
+ @Override
+ public ReduceOperatorSpec build() {
+ return new ReduceOperatorSpec(receiverId, dataCodecClass, redFuncClass);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/CommunicationGroupName.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/CommunicationGroupName.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/CommunicationGroupName.java
new file mode 100644
index 0000000..49dab96
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/CommunicationGroupName.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "Name of the comm group")
+public final class CommunicationGroupName implements Name<String> {
+ private CommunicationGroupName() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DataCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DataCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DataCodec.java
new file mode 100644
index 0000000..82613a7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DataCodec.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "Codec used to serialize and deserialize data in operators")
+public final class DataCodec implements Name<Codec> {
+ private DataCodec() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/OperatorName.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/OperatorName.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/OperatorName.java
new file mode 100644
index 0000000..9098577
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/OperatorName.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "Name of the operator")
+public final class OperatorName implements Name<String> {
+ private OperatorName() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/ReduceFunctionParam.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/ReduceFunctionParam.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/ReduceFunctionParam.java
new file mode 100644
index 0000000..d37408b
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/ReduceFunctionParam.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The reduce function class that is associated with a reduce operator")
+public final class ReduceFunctionParam implements Name<ReduceFunction> {
+ private ReduceFunctionParam() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedGroupConfigs.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedGroupConfigs.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedGroupConfigs.java
new file mode 100644
index 0000000..1844766
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedGroupConfigs.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+@NamedParameter(doc = "Serialized communication group configurations")
+public final class SerializedGroupConfigs implements Name<Set<String>> {
+ private SerializedGroupConfigs() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedOperConfigs.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedOperConfigs.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedOperConfigs.java
new file mode 100644
index 0000000..e0103f5
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedOperConfigs.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+@NamedParameter(doc = "Serialized operator configurations")
+public final class SerializedOperConfigs implements Name<Set<String>> {
+ private SerializedOperConfigs() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TaskVersion.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TaskVersion.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TaskVersion.java
new file mode 100644
index 0000000..b1caa84
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TaskVersion.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The version that this task is assigned")
+public final class TaskVersion implements Name<Integer> {
+ private TaskVersion() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TreeTopologyFanOut.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TreeTopologyFanOut.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TreeTopologyFanOut.java
new file mode 100644
index 0000000..c759130
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TreeTopologyFanOut.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The fan out for the tree topology", default_value = "2", short_name = "fanout")
+public final class TreeTopologyFanOut implements Name<Integer> {
+ private TreeTopologyFanOut() {
+ }
+}