You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/04/07 01:22:10 UTC

incubator-reef git commit: [REEF-174] Use type instead of instance for Codec in MpiDriver

Repository: incubator-reef
Updated Branches:
  refs/heads/master 940752b9f -> 2a1565ff3


[REEF-174]  Use type instead of instance for Codec in MpiDriver

Currently, a Codec instance is passed to the specifications in
MpiDriver. What we really need is the type that will be bound to Tang
Configuration. This change is to pass generic type into MdiDriver and
Specification instead so that any type of ICodec can be passed in. At
the meantime, we limit the type to be an instance of ICodec to ensure
correct type is passed. If not, catching the error at build time.
IOperatorSpec, Itopology, ICommunicationGroupDriver API and
implementations are modified accordingly.

Test cases are updated as well.

JIRA:
  [REEF-174](https://issues.apache.org/jira/browse/REEF-174)

Pull Request:
  This closes #132


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2a1565ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2a1565ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2a1565ff

Branch: refs/heads/master
Commit: 2a1565ff3c44a97ee4379e2e793e438d62d54460
Parents: 940752b
Author: Julia Wang <jw...@yahoo.com>
Authored: Tue Mar 31 17:48:59 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Mon Apr 6 16:19:59 2015 -0700

----------------------------------------------------------------------
 .../KMeans/KMeansDriverHandlers.cs              |   6 +-
 .../Group/Driver/ICommunicationGroupDriver.cs   |  50 ++----
 .../Driver/Impl/CommunicationGroupDriver.cs     | 161 +++++--------------
 .../Group/Operators/IOperatorSpec.cs            |   7 +-
 .../Operators/Impl/BroadcastOperatorSpec.cs     |   9 +-
 .../Group/Operators/Impl/ReduceOperatorSpec.cs  |  12 +-
 .../Group/Operators/Impl/ScatterOperatorSpec.cs |   9 +-
 .../Group/Topology/FlatTopology.cs              |  34 ++--
 .../Group/Topology/ITopology.cs                 |   5 +-
 .../Group/Topology/TreeTopology.cs              |  34 ++--
 .../BroadcastReduceDriver.cs                    |   8 +-
 .../ScatterReduceTest/ScatterReduceDriver.cs    |   6 +-
 .../Network/GroupCommunicationTests.cs          |  40 +++--
 .../GroupCommunicationTreeTopologyTests.cs      |  32 ++--
 14 files changed, 147 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
index 0c67777..358e3e1 100644
--- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
@@ -82,9 +82,9 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             _mpiDriver = mpiDriver;
 
             _commGroup = _mpiDriver.DefaultGroup
-                   .AddBroadcast(Constants.CentroidsBroadcastOperatorName,Constants.MasterTaskId, new CentroidsCodec())
-                   .AddBroadcast(Constants.ControlMessageBroadcastOperatorName, Constants.MasterTaskId, new ControlMessageCodec())
-                   .AddReduce(Constants.MeansReduceOperatorName, Constants.MasterTaskId, new ProcessedResultsCodec(), new KMeansMasterTask.AggregateMeans())
+                   .AddBroadcast<Centroids, CentroidsCodec>(Constants.CentroidsBroadcastOperatorName, Constants.MasterTaskId)
+                   .AddBroadcast<ControlMessage, ControlMessageCodec>(Constants.ControlMessageBroadcastOperatorName, Constants.MasterTaskId)
+                   .AddReduce<ProcessedResults, ProcessedResultsCodec>(Constants.MeansReduceOperatorName, Constants.MasterTaskId, new KMeansMasterTask.AggregateMeans())
                    .Build();
 
             _mpiTaskStarter = new TaskStarter(_mpiDriver, _totalEvaluators);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
index 5a857e0..0fa4aae 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
@@ -41,28 +41,17 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <summary>
         /// Adds the Broadcast MPI operator to the communication group.
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
-        /// <param name="operatorName">The name of the broadcast operator</param>
-        /// <param name="spec">The specification that defines the Broadcast operator</param>
-        /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
-        [System.Obsolete("use AddBroadcast<T>(string operatorName, string masterTaskId, ICodec<T> codecType, TopologyTypes topologyType = TopologyTypes.Flat)")]
-        ICommunicationGroupDriver AddBroadcast<T>(string operatorName, BroadcastOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat);
-
-        /// <summary>
-        /// Adds the Broadcast MPI operator to the communication group.
-        /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
         /// <param name="operatorName">The name of the broadcast operator</param>
         /// <param name="masterTaskId">The master task id in broadcast operator</param>
-        /// <param name="codecType">The Codec used for serialization</param>
         /// <param name="topologyType">The topology type for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
-        ICommunicationGroupDriver AddBroadcast<T>(string operatorName, string masterTaskId, ICodec<T> codecType, TopologyTypes topologyType = TopologyTypes.Flat);
+        ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>;
 
         /// <summary>
         /// Adds the Broadcast MPI operator to the communication group. Default to IntCodec
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
         /// <param name="operatorName">The name of the broadcast operator</param>
         /// <param name="masterTaskId">The master task id in broadcast operator</param>
         /// <param name="topologyType">The topology type for the operator</param>
@@ -72,46 +61,35 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <summary>
         /// Adds the Reduce MPI operator to the communication group.
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
         /// <param name="operatorName">The name of the reduce operator</param>
-        /// <param name="spec">The specification that defines the Reduce operator</param>
+        /// <param name="masterTaskId">The master task id for the typology</param>
+        /// <param name="reduceFunction">The class used to aggregate all messages.</param>
+        /// <param name="topologyType">The topology for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
-        [System.Obsolete("use AddReduce<T>(string operatorName, string masterTaskId, ICodec<T> codecType, IReduceFunction<T> reduceFunction, TopologyTypes topologyType = TopologyTypes.Flat)")]
-        ICommunicationGroupDriver AddReduce<T>(string operatorName, ReduceOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat);
+        ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(string operatorName, string masterTaskId, IReduceFunction<TMessage> reduceFunction, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>;
 
         /// <summary>
-        /// Adds the Reduce MPI operator to the communication group.
+        /// Adds the Reduce MPI operator to the communication group with default IntCodec
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
         /// <param name="operatorName">The name of the reduce operator</param>
         /// <param name="masterTaskId">The master task id for the typology</param>
-        /// <param name="codecType">The codec used for serializing messages.</param>
         /// <param name="reduceFunction">The class used to aggregate all messages.</param>
         /// <param name="topologyType">The topology for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
-        ICommunicationGroupDriver AddReduce<T>(string operatorName, string masterTaskId, ICodec<T> codecType, IReduceFunction<T> reduceFunction, TopologyTypes topologyType = TopologyTypes.Flat);
-
-        /// <summary>
-        /// Adds the Scatter MPI operator to the communication group.
-        /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
-        /// <param name="operatorName">The name of the scatter operator</param>
-        /// <param name="spec">The specification that defines the Scatter operator</param>
-        /// <param name="topologyType">type of topology used in the operaor</param>
-        /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
-        [System.Obsolete("use AddScatter<T>(string operatorName, string senderId, ICodec<T> codecType, TopologyTypes topologyType = TopologyTypes.Flat)")]
-        ICommunicationGroupDriver AddScatter<T>(string operatorName, ScatterOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat);
+        ICommunicationGroupDriver AddReduce(string operatorName, string masterTaskId, IReduceFunction<int> reduceFunction, TopologyTypes topologyType = TopologyTypes.Flat);
 
         /// <summary>
         /// Adds the Scatter MPI operator to the communication group.
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
         /// <param name="operatorName">The name of the scatter operator</param>
         /// <param name="senderId">The sender id</param>
-        /// <param name="codecType">The codec used for serializing messages.</param>
         /// <param name="topologyType">type of topology used in the operaor</param>
         /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
-        ICommunicationGroupDriver AddScatter<T>(string operatorName, string senderId, ICodec<T> codecType, TopologyTypes topologyType = TopologyTypes.Flat);
+        ICommunicationGroupDriver AddScatter<TMessage, TMessageCodec>(string operatorName, string senderId, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>;
 
         /// <summary>
         /// Adds the Scatter MPI operator to the communication group with default Codec

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
index 154a0f5..6c07598 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
@@ -91,68 +91,32 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         public List<string> TaskIds { get; private set; }
 
         /// <summary>
-        /// Adds the Broadcast MPI operator to the communication group.
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
-        /// <param name="operatorName">The name of the broadcast operator</param>
-        /// <param name="spec">The specification that defines the Broadcast operator</param>
-        /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
-        [System.Obsolete("use AddBroadcast<T>(string operatorName, string masterTaskId, ICodec<T> codecType, TopologyTypes topologyType = TopologyTypes.Flat)")]
-        public ICommunicationGroupDriver AddBroadcast<T>(
-            string operatorName,
-            BroadcastOperatorSpec<T> spec,
-            TopologyTypes topologyType = TopologyTypes.Flat)
-        {
-            if (_finalized)
-            {
-                throw new IllegalStateException("Can't add operators once the spec has been built.");
-            }
-
-            ITopology<T> topology;
-
-            if (topologyType == TopologyTypes.Flat)
-            {
-                topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec);
-            }
-            else
-            {
-                topology = new TreeTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec,
-                    _fanOut);
-            }
-            _topologies[operatorName] = topology;
-            _operatorSpecs[operatorName] = spec;
-
-            return this;
-        }
-
-        /// <summary>
-        /// Adds the Broadcast MPI operator to the communication group.
-        /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
         /// <param name="operatorName">The name of the broadcast operator</param>
         /// <param name="masterTaskId">The master task id in broadcast operator</param>
-        /// <param name="codecType">The Codec used for serialization</param>
         /// <param name="topologyType">The topology type for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
-        public ICommunicationGroupDriver AddBroadcast<T>(string operatorName, string masterTaskId, ICodec<T> codecType, TopologyTypes topologyType = TopologyTypes.Flat)
+        /// <returns></returns>
+        public ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>
         {
             if (_finalized)
             {
                 throw new IllegalStateException("Can't add operators once the spec has been built.");
             }
 
-            var spec = new BroadcastOperatorSpec<T>(
-                masterTaskId,
-                codecType);
+            var spec = new BroadcastOperatorSpec<TMessage, TMessageCodec>(
+                masterTaskId);
 
-            ITopology<T> topology;
+            ITopology<TMessage, TMessageCodec> topology;
             if (topologyType == TopologyTypes.Flat)
             {
-                topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec);
+                topology = new FlatTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec);
             }
             else
             {
-                topology = new TreeTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec,
+                topology = new TreeTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec,
                     _fanOut);
             }
 
@@ -165,7 +129,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <summary>
         /// Adds the Broadcast MPI operator to the communication group. Default to IntCodec
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
         /// <param name="operatorName">The name of the broadcast operator</param>
         /// <param name="masterTaskId">The master task id in broadcast operator</param>
         /// <param name="topologyType">The topology type for the operator</param>
@@ -173,45 +136,43 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         public ICommunicationGroupDriver AddBroadcast(string operatorName, string masterTaskId,
             TopologyTypes topologyType = TopologyTypes.Flat)
         {
-            return AddBroadcast(operatorName, masterTaskId, new IntCodec(), topologyType);
+            return AddBroadcast<int,IntCodec>(operatorName, masterTaskId, topologyType);
         }
 
         /// <summary>
         /// Adds the Reduce MPI operator to the communication group.
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
         /// <param name="operatorName">The name of the reduce operator</param>
         /// <param name="masterTaskId">The master task id for the typology</param>
-        /// <param name="codecType">The codec used for serializing messages.</param>
         /// <param name="reduceFunction">The class used to aggregate all messages.</param>
         /// <param name="topologyType">The topology for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
-        public ICommunicationGroupDriver AddReduce<T>(
+        public ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(
             string operatorName,
             string masterTaskId,
-            ICodec<T> codecType,
-            IReduceFunction<T> reduceFunction,
-            TopologyTypes topologyType = TopologyTypes.Flat)
+            IReduceFunction<TMessage> reduceFunction,
+            TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>
         {
             if (_finalized)
             {
                 throw new IllegalStateException("Can't add operators once the spec has been built.");
             }
 
-            var spec = new ReduceOperatorSpec<T>(
+            var spec = new ReduceOperatorSpec<TMessage, TMessageCodec>(
                 masterTaskId,
-                codecType,
                 reduceFunction);
 
-            ITopology<T> topology;
+            ITopology<TMessage, TMessageCodec> topology;
 
             if (topologyType == TopologyTypes.Flat)
             {
-                topology = new FlatTopology<T>(operatorName, _groupName, spec.ReceiverId, _driverId, spec);
+                topology = new FlatTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.ReceiverId, _driverId, spec);
             }
             else
             {
-                topology = new TreeTopology<T>(operatorName, _groupName, spec.ReceiverId, _driverId, spec,
+                topology = new TreeTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.ReceiverId, _driverId, spec,
                     _fanOut);
             }
 
@@ -222,99 +183,49 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         }
 
         /// <summary>
-        /// Adds the Reduce MPI operator to the communication group.
+        /// Adds the Reduce MPI operator to the communication group with default IntCodec
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
         /// <param name="operatorName">The name of the reduce operator</param>
-        /// <param name="spec">The specification that defines the Reduce operator</param>
+        /// <param name="masterTaskId">The master task id for the typology</param>
+        /// <param name="reduceFunction">The class used to aggregate all messages.</param>
+        /// <param name="topologyType">The topology for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
-        [System.Obsolete("use AddReduce<T>(string operatorName, string masterTaskId, ICodec<T> codecType, IReduceFunction<T> reduceFunction, TopologyTypes topologyType = TopologyTypes.Flat)")]
-        public ICommunicationGroupDriver AddReduce<T>(
+        public ICommunicationGroupDriver AddReduce(
             string operatorName,
-            ReduceOperatorSpec<T> spec,
+            string masterTaskId,
+            IReduceFunction<int> reduceFunction,
             TopologyTypes topologyType = TopologyTypes.Flat)
         {
-            if (_finalized)
-            {
-                throw new IllegalStateException("Can't add operators once the spec has been built.");
-            }
-
-            ITopology<T> topology;
-
-            if (topologyType == TopologyTypes.Flat)
-            {
-                topology = new FlatTopology<T>(operatorName, _groupName, spec.ReceiverId, _driverId, spec);
-            }
-            else
-            {
-                topology = new TreeTopology<T>(operatorName, _groupName, spec.ReceiverId, _driverId, spec,
-                    _fanOut);
-            }
-            _topologies[operatorName] = topology;
-            _operatorSpecs[operatorName] = spec;
-
-            return this;
-        }
-
-        /// <summary>
-        /// Adds the Scatter MPI operator to the communication group.
-        /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
-        /// <param name="operatorName">The name of the scatter operator</param>
-        /// <param name="spec">The specification that defines the Scatter operator</param>
-        /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
-        [System.Obsolete("use AddScatter<T>(string operatorName, string senderId, ICodec<T> codecType, TopologyTypes topologyType = TopologyTypes.Flat)")]
-        public ICommunicationGroupDriver AddScatter<T>(string operatorName, ScatterOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat)
-        {
-            if (_finalized)
-            {
-                throw new IllegalStateException("Can't add operators once the spec has been built.");
-            }
-
-            ITopology<T> topology;
-
-            if (topologyType == TopologyTypes.Flat)
-            {
-                topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec);
-            }
-            else
-            {
-                topology = new TreeTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec,
-                    _fanOut);
-            }
-            _topologies[operatorName] = topology;
-            _operatorSpecs[operatorName] = spec;
-
-            return this;
+            return AddReduce<int, IntCodec>(operatorName, masterTaskId, reduceFunction, topologyType);
         }
 
         /// <summary>
         /// Adds the Scatter MPI operator to the communication group.
         /// </summary>
-        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
         /// <param name="operatorName">The name of the scatter operator</param>
         /// <param name="senderId">The sender id</param>
-        /// <param name="codecType">The codec used for serializing messages.</param>
         /// <param name="topologyType">type of topology used in the operaor</param>
         /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
-        public ICommunicationGroupDriver AddScatter<T>(string operatorName, string senderId, ICodec<T> codecType, TopologyTypes topologyType = TopologyTypes.Flat)
+        public ICommunicationGroupDriver AddScatter<TMessage, TMessageCodec>(string operatorName, string senderId, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>
         {
             if (_finalized)
             {
                 throw new IllegalStateException("Can't add operators once the spec has been built.");
             }
 
-            var spec = new ScatterOperatorSpec<T>(senderId, codecType);
+            var spec = new ScatterOperatorSpec<TMessage, TMessageCodec>(senderId);
 
-            ITopology<T> topology;
+            ITopology<TMessage, TMessageCodec> topology;
 
             if (topologyType == TopologyTypes.Flat)
             {
-                topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec);
+                topology = new FlatTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec);
             }
             else
             {
-                topology = new TreeTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec,
+                topology = new TreeTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec,
                     _fanOut);
             }
             _topologies[operatorName] = topology;
@@ -324,7 +235,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         }
 
         /// <summary>
-        /// Adds the Scatter MPI operator to the communication group with default Codec
+        /// Adds the Scatter MPI operator to the communication group with default IntCodec
         /// </summary>
         /// <param name="operatorName">The name of the scatter operator</param>
         /// <param name="senderId">The sender id</param>
@@ -332,7 +243,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
         public ICommunicationGroupDriver AddScatter(string operatorName, string senderId, TopologyTypes topologyType = TopologyTypes.Flat)
         {
-            return AddScatter(operatorName, senderId, new IntCodec(), topologyType);
+            return AddScatter<int, IntCodec>(operatorName, senderId, topologyType);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
index b1c119c..4a1bfd7 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System;
 using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Operators
@@ -24,11 +25,11 @@ namespace Org.Apache.REEF.Network.Group.Operators
     /// <summary>
     /// The specification used to define Broadcast Operators.
     /// </summary>
-    public interface IOperatorSpec<T>
+    public interface IOperatorSpec<T1, T2> where T2 : ICodec<T1>
     {
         /// <summary>
-        /// Returns the codec used to serialize and deserialize messages.
+        /// Returns the codec type used to serialize and deserialize messages.
         /// </summary>
-        ICodec<T> Codec { get; }
+        Type Codec { get; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
index 904e4ef..15a4374 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System;
 using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
@@ -24,17 +25,17 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// <summary>
     /// The specification used to define Broadcast Operators.
     /// </summary>
-    public class BroadcastOperatorSpec<T> : IOperatorSpec<T>
+    public class BroadcastOperatorSpec<T1, T2> : IOperatorSpec<T1, T2> where T2 : ICodec<T1>
     {
         /// <summary>
         /// Create a new BroadcastOperatorSpec.
         /// </summary>
         /// <param name="senderId">The identifier of the root sending Task.</param>
         /// <param name="codecType">The codec used to serialize messages.</param>
-        public BroadcastOperatorSpec(string senderId, ICodec<T> codecType)
+        public BroadcastOperatorSpec(string senderId)
         {
             SenderId = senderId;
-            Codec = codecType;
+            Codec = typeof(T2);
         }
 
         /// <summary>
@@ -45,6 +46,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// Returns the ICodec used to serialize messages.
         /// </summary>
-        public ICodec<T> Codec { get; private set; }
+        public Type Codec { get; private set; }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
index 37b6ce7..f72cea5 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System;
 using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
@@ -24,7 +25,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// <summary>
     /// The specification used to define Reduce MPI Operators.
     /// </summary>
-    public class ReduceOperatorSpec<T> : IOperatorSpec<T>
+    public class ReduceOperatorSpec<T1, T2> : IOperatorSpec<T1, T2> where T2 : ICodec<T1>
     {
         /// <summary>
         /// Creates a new ReduceOperatorSpec.
@@ -35,11 +36,10 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="reduceFunction">The class used to aggregate all messages.</param>
         public ReduceOperatorSpec(
             string receiverId, 
-            ICodec<T> codec, 
-            IReduceFunction<T> reduceFunction)
+            IReduceFunction<T1> reduceFunction)
         {
             ReceiverId = receiverId;
-            Codec = codec;
+            Codec = typeof(T2);
             ReduceFunction = reduceFunction;
         }
 
@@ -52,11 +52,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// The codec used to serialize and deserialize messages.
         /// </summary>
-        public ICodec<T> Codec { get; private set; }
+        public Type Codec { get; private set; }
 
         /// <summary>
         /// The class used to aggregate incoming messages.
         /// </summary>
-        public IReduceFunction<T> ReduceFunction { get; private set; } 
+        public IReduceFunction<T1> ReduceFunction { get; private set; }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
index 57cd3a9..158a6c5 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System;
 using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
@@ -24,7 +25,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// <summary>
     /// The specification used to define Scatter MPI Operators.
     /// </summary>
-    public class ScatterOperatorSpec<T> : IOperatorSpec<T>
+    public class ScatterOperatorSpec<T1, T2> : IOperatorSpec<T1, T2> where T2 : ICodec<T1>
     {
         /// <summary>
         /// Creates a new ScatterOperatorSpec.
@@ -33,10 +34,10 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// be sending messages</param>
         /// <param name="codec">The codec used to serialize and 
         /// deserialize messages</param>
-        public ScatterOperatorSpec(string senderId, ICodec<T> codec)
+        public ScatterOperatorSpec(string senderId)
         {
             SenderId = senderId;
-            Codec = codec;
+            Codec = typeof(T2);
         }
 
         /// <summary>
@@ -48,6 +49,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// The codec used to serialize and deserialize messages.
         /// </summary>
-        public ICodec<T> Codec { get; private set; }
+        public Type Codec { get; private set; }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
index 892c82c..83990d7 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
@@ -34,7 +34,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
     /// nodes: the root and all children extending from the root.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public class FlatTopology<T> : ITopology<T>
+    public class FlatTopology<T1, T2> : ITopology<T1, T2> where T2 : ICodec<T1>
     {
         private readonly string _groupName;
         private readonly string _operatorName;
@@ -58,7 +58,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
             string groupName, 
             string rootId,
             string driverId,
-            IOperatorSpec<T> operatorSpec)
+            IOperatorSpec<T1, T2> operatorSpec)
         {
             _groupName = groupName;
             _operatorName = operatorName;
@@ -73,7 +73,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
         /// <summary>
         /// Gets the Operator specification
         /// </summary>
-        public IOperatorSpec<T> OperatorSpec { get; set; }
+        public IOperatorSpec<T1, T2> OperatorSpec { get; set; }
 
         /// <summary>
         /// Gets the task configuration for the operator topology.
@@ -83,7 +83,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
         public IConfiguration GetTaskConfiguration(string taskId)
         {
             var confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
-                .BindImplementation(typeof(ICodec<T>), OperatorSpec.Codec.GetType())
+                .BindImplementation(typeof(ICodec<T1>), OperatorSpec.Codec)
                 .BindNamedParameter<MpiConfigurationOptions.TopologyRootTaskId, string>(
                     GenericType<MpiConfigurationOptions.TopologyRootTaskId>.Class,
                     _rootId);
@@ -101,42 +101,42 @@ namespace Org.Apache.REEF.Network.Group.Topology
                 }
             }
 
-            if (OperatorSpec is BroadcastOperatorSpec<T>)
+            if (OperatorSpec is BroadcastOperatorSpec<T1, T2>)
             {
-                BroadcastOperatorSpec<T> broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T>;
+                BroadcastOperatorSpec<T1, T2> broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T1, T2>;
                 if (taskId.Equals(broadcastSpec.SenderId))
                 {
-                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<BroadcastSender<T>>.Class);
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<BroadcastSender<T1>>.Class);
                 }
                 else
                 {
-                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<BroadcastReceiver<T>>.Class);
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<BroadcastReceiver<T1>>.Class);
                 }
             }
-            else if (OperatorSpec is ReduceOperatorSpec<T>)
+            else if (OperatorSpec is ReduceOperatorSpec<T1, T2>)
             {
-                ReduceOperatorSpec<T> reduceSpec = OperatorSpec as ReduceOperatorSpec<T>;
-                confBuilder.BindImplementation(typeof(IReduceFunction<T>), reduceSpec.ReduceFunction.GetType());
+                ReduceOperatorSpec<T1, T2> reduceSpec = OperatorSpec as ReduceOperatorSpec<T1, T2>;
+                confBuilder.BindImplementation(typeof(IReduceFunction<T1>), reduceSpec.ReduceFunction.GetType());
                 
                 if (taskId.Equals(reduceSpec.ReceiverId))
                 {
-                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ReduceReceiver<T>>.Class);
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<ReduceReceiver<T1>>.Class);
                 }
                 else
                 {
-                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ReduceSender<T>>.Class);
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<ReduceSender<T1>>.Class);
                 }
             }
-            else if (OperatorSpec is ScatterOperatorSpec<T>)
+            else if (OperatorSpec is ScatterOperatorSpec<T1, T2>)
             {
-                ScatterOperatorSpec<T> scatterSpec = OperatorSpec as ScatterOperatorSpec<T>;
+                ScatterOperatorSpec<T1, T2> scatterSpec = OperatorSpec as ScatterOperatorSpec<T1, T2>;
                 if (taskId.Equals(scatterSpec.SenderId))
                 {
-                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ScatterSender<T>>.Class);
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<ScatterSender<T1>>.Class);
                 }
                 else
                 {
-                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ScatterReceiver<T>>.Class);
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<ScatterReceiver<T1>>.Class);
                 }
             }
             else

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
index 32fe5cc..3f15318 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
@@ -19,15 +19,16 @@
 
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Topology
 {
     /// <summary>
     /// Represents a topology graph for IMpiOperators.
     /// </summary>
-    public interface ITopology<T>
+    public interface ITopology<T1, T2> where T2 : ICodec<T1>
     {
-        IOperatorSpec<T> OperatorSpec { get; }
+        IOperatorSpec<T1, T2> OperatorSpec { get; }
 
         IConfiguration GetTaskConfiguration(string taskId);
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
index 50b2636..bc324cf 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
@@ -29,7 +29,7 @@ using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.Group.Topology
 {
-    public class TreeTopology<T> : ITopology<T>
+    public class TreeTopology<T1, T2> : ITopology<T1, T2> where T2 : ICodec<T1>
     {
         private readonly string _groupName;
         private readonly string _operatorName;
@@ -58,7 +58,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
             string groupName, 
             string rootId,
             string driverId,
-            IOperatorSpec<T> operatorSpec,
+            IOperatorSpec<T1, T2> operatorSpec,
             int fanOut)
         {
             _groupName = groupName;
@@ -72,7 +72,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
             _nodes = new Dictionary<string, TaskNode>(); 
         }
 
-        public IOperatorSpec<T> OperatorSpec { get; set; }
+        public IOperatorSpec<T1, T2> OperatorSpec { get; set; }
 
         /// <summary>
         /// Gets the task configuration for the operator topology.
@@ -105,7 +105,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
 
             //add parentid, if no parent, add itself
             var confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
-                .BindImplementation(typeof(ICodec<T>), OperatorSpec.Codec.GetType())
+                .BindImplementation(typeof(ICodec<T1>), OperatorSpec.Codec)
                 .BindNamedParameter<MpiConfigurationOptions.TopologyRootTaskId, string>(
                     GenericType<MpiConfigurationOptions.TopologyRootTaskId>.Class,
                     parentId);
@@ -118,42 +118,42 @@ namespace Org.Apache.REEF.Network.Group.Topology
                     childNode.TaskId);
             }
 
-            if (OperatorSpec is BroadcastOperatorSpec<T>)
+            if (OperatorSpec is BroadcastOperatorSpec<T1, T2>)
             {
-                BroadcastOperatorSpec<T> broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T>;
+                BroadcastOperatorSpec<T1, T2> broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T1, T2>;
                 if (taskId.Equals(broadcastSpec.SenderId))
                 {
-                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<BroadcastSender<T>>.Class);
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<BroadcastSender<T1>>.Class);
                 }
                 else
                 {
-                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<BroadcastReceiver<T>>.Class);
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<BroadcastReceiver<T1>>.Class);
                 }
             }
-            else if (OperatorSpec is ReduceOperatorSpec<T>)
+            else if (OperatorSpec is ReduceOperatorSpec<T1, T2>)
             {
-                ReduceOperatorSpec<T> reduceSpec = OperatorSpec as ReduceOperatorSpec<T>;
-                confBuilder.BindImplementation(typeof(IReduceFunction<T>), reduceSpec.ReduceFunction.GetType());
+                ReduceOperatorSpec<T1, T2> reduceSpec = OperatorSpec as ReduceOperatorSpec<T1, T2>;
+                confBuilder.BindImplementation(typeof(IReduceFunction<T1>), reduceSpec.ReduceFunction.GetType());
 
                 if (taskId.Equals(reduceSpec.ReceiverId))
                 {
-                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ReduceReceiver<T>>.Class);
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<ReduceReceiver<T1>>.Class);
                 }
                 else
                 {
-                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ReduceSender<T>>.Class);
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<ReduceSender<T1>>.Class);
                 }
             }
-            else if (OperatorSpec is ScatterOperatorSpec<T>)
+            else if (OperatorSpec is ScatterOperatorSpec<T1, T2>)
             {
-                ScatterOperatorSpec<T> scatterSpec = OperatorSpec as ScatterOperatorSpec<T>;
+                ScatterOperatorSpec<T1, T2> scatterSpec = OperatorSpec as ScatterOperatorSpec<T1, T2>;
                 if (taskId.Equals(scatterSpec.SenderId))
                 {
-                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ScatterSender<T>>.Class);
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<ScatterSender<T1>>.Class);
                 }
                 else
                 {
-                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ScatterReceiver<T>>.Class);
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<ScatterReceiver<T1>>.Class);
                 }
             }
             else

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
index ed3e7b5..8cc32b8 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
@@ -65,14 +65,12 @@ namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
             _numIterations = numIterations;
             _mpiDriver = mpiDriver;
             _commGroup = _mpiDriver.DefaultGroup
-                    .AddBroadcast(
+                    .AddBroadcast<int, IntCodec>(
                         MpiTestConstants.BroadcastOperatorName,
-                       MpiTestConstants.MasterTaskId,
-                            new IntCodec())
-                    .AddReduce(
+                       MpiTestConstants.MasterTaskId)
+                    .AddReduce<int, IntCodec>(
                         MpiTestConstants.ReduceOperatorName,
                             MpiTestConstants.MasterTaskId,
-                            new IntCodec(), 
                             new SumFunction())
                     .Build();
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
index 6029bfe..a71e886 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
@@ -63,15 +63,13 @@ namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
             _numEvaluators = numEvaluators;
             _mpiDriver = mpiDriver; 
             _commGroup = _mpiDriver.DefaultGroup
-                    .AddScatter(
+                    .AddScatter<int, IntCodec>(
                         MpiTestConstants.ScatterOperatorName,
                             MpiTestConstants.MasterTaskId,
-                            new IntCodec(),
                             TopologyTypes.Tree)
-                    .AddReduce(
+                    .AddReduce<int, IntCodec>(
                         MpiTestConstants.ReduceOperatorName,
                             MpiTestConstants.MasterTaskId,
-                            new IntCodec(), 
                             new SumFunction())
                     .Build();
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
index 2dc9445..5f931f2 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
@@ -102,14 +102,12 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddBroadcast<int>(
+                .AddBroadcast<int, IntCodec>(
                     broadcastOperatorName,
-                    masterTaskId,
-                    new IntCodec())
-                .AddReduce<int>(
+                    masterTaskId)
+                .AddReduce<int, IntCodec>(
                     reduceOperatorName,
                     masterTaskId,
-                    new IntCodec(),
                     new SumFunction())
                 .Build();
 
@@ -159,14 +157,12 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddScatter<int>(
+                .AddScatter<int, IntCodec>(
                     scatterOperatorName,
-                    masterTaskId,
-                    new IntCodec())
-                .AddReduce(
+                    masterTaskId)
+                .AddReduce<int, IntCodec>(
                     reduceOperatorName,
                         masterTaskId,
-                        new IntCodec(),
                         new SumFunction())
                 .Build();
 
@@ -224,7 +220,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddBroadcast(operatorName, masterTaskId, new IntCodec())
+                .AddBroadcast<int, IntCodec>(operatorName, masterTaskId)
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -292,7 +288,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-              .AddBroadcast(operatorName, masterTaskId, new IntCodec())
+              .AddBroadcast<int, IntCodec>(operatorName, masterTaskId)
               .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -331,7 +327,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddReduce(operatorName, "task0", new IntCodec(), new SumFunction())
+                .AddReduce<int, IntCodec>(operatorName, "task0", new SumFunction())
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -366,7 +362,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddReduce(operatorName, "task0", new IntCodec(), new SumFunction())
+                .AddReduce<int, IntCodec>(operatorName, "task0", new SumFunction())
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -410,7 +406,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec())
+                .AddScatter(operatorName, masterTaskId)
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -488,7 +484,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec())
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId)
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -538,7 +534,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec())
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId)
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -585,7 +581,7 @@ namespace Org.Apache.REEF.Tests.Network
             IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec())
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId)
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -622,8 +618,8 @@ namespace Org.Apache.REEF.Tests.Network
         [TestMethod]
         public void TestConfigurationBroadcastSpec()
         {
-            FlatTopology<int> topology = new FlatTopology<int>("Operator", "Operator", "task1", "driverid",
-                new BroadcastOperatorSpec<int>("Sender", new IntCodec()));
+            FlatTopology<int, IntCodec> topology = new FlatTopology<int, IntCodec>("Operator", "Operator", "task1", "driverid",
+                new BroadcastOperatorSpec<int, IntCodec>("Sender"));
 
             topology.AddTask("task1");
             var conf = topology.GetTaskConfiguration("task1");
@@ -635,8 +631,8 @@ namespace Org.Apache.REEF.Tests.Network
         [TestMethod]
         public void TestConfigurationReduceSpec()
         {
-            FlatTopology<int> topology = new FlatTopology<int>("Operator", "Group", "task1", "driverid",
-                new ReduceOperatorSpec<int>("task1", new IntCodec(), new SumFunction()));
+            FlatTopology<int, IntCodec> topology = new FlatTopology<int, IntCodec>("Operator", "Group", "task1", "driverid",
+                new ReduceOperatorSpec<int, IntCodec>("task1", new SumFunction()));
 
             topology.AddTask("task1");
             var conf2 = topology.GetTaskConfiguration("task1");

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a1565ff/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
index e74c506..667e45f 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
@@ -34,8 +34,8 @@ namespace Org.Apache.REEF.Tests.Network
         [TestMethod]
         public void TestTreeTopology()
         {
-            TreeTopology<int> topology = new TreeTopology<int>("Operator", "Operator", "task1", "driverid",
-                new BroadcastOperatorSpec<int>("task1", new IntCodec()), 2);
+            TreeTopology<int, IntCodec> topology = new TreeTopology<int, IntCodec>("Operator", "Operator", "task1", "driverid",
+                new BroadcastOperatorSpec<int, IntCodec>("task1"), 2);
             for (int i = 1; i < 8; i++)
             {
                 string taskid = "task" + i;
@@ -61,7 +61,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddReduce(operatorName, masterTaskId, new IntCodec(), new SumFunction(), TopologyTypes.Tree)
+                .AddReduce<int, IntCodec>(operatorName, masterTaskId, new SumFunction(), TopologyTypes.Tree)
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -117,7 +117,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddBroadcast(operatorName, masterTaskId, new IntCodec(), TopologyTypes.Tree)
+                .AddBroadcast<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -193,15 +193,13 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddBroadcast<int>(
+                .AddBroadcast<int, IntCodec>(
                     broadcastOperatorName,
                     masterTaskId,
-                    new IntCodec(), 
                     TopologyTypes.Tree)
-                .AddReduce<int>(
+                .AddReduce<int, IntCodec>(
                     reduceOperatorName,
                     masterTaskId,
-                    new IntCodec(),
                     new SumFunction(),
                     TopologyTypes.Tree)
                 .Build();
@@ -308,7 +306,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec(), TopologyTypes.Tree)
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -353,7 +351,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec(), TopologyTypes.Tree)
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -407,7 +405,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec(), TopologyTypes.Tree)
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -458,7 +456,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec(), TopologyTypes.Tree)
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -510,7 +508,7 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-                .AddScatter(operatorName, masterTaskId, new IntCodec(), TopologyTypes.Tree)
+                .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
@@ -574,16 +572,14 @@ namespace Org.Apache.REEF.Tests.Network
             var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
-              .AddScatter(
+              .AddScatter<int, IntCodec>(
                     scatterOperatorName,
                     masterTaskId,
-                    new IntCodec(), 
                     TopologyTypes.Tree)
-                .AddReduce(
+                .AddReduce<int, IntCodec>(
                     reduceOperatorName,
                     masterTaskId,
-                    new IntCodec(),
-                    new SumFunction(), 
+                    new SumFunction(),
                     TopologyTypes.Tree).Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);