You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/04/20 20:21:12 UTC

[2/2] incubator-reef git commit: [REEF-262] Passing Configuration to Group Communication Driver

[REEF-262]  Passing Configuration to Group Communication Driver

Today clients pass parameters and inside Group Communication driver, we
create configurations for Codec, Reduce Functions and DataConverters.
This doesn't provide flexibility for clients to set parameters in the
function also make the Group code complex. This PR is move all those
settings to Tang Configuration. Allow clients to bin their own
implementations and parameters, then passing the configuration to Group
Communication Driver.

This PR includes:
  - Added Configuration Module for Codec, ReduceFunction and
    DataConverter
  - Updated GroupCommunicationDriver APIs to accept Configuration
  - Simplified GroupCommunicationDriver to remove unnecessary
    overloading methods
  - Updated specification to wrap Configuration and removed Codec and
    DataConverter
  - Removed generic from specifications as it is not needed any more
  - Removed Codec type as generic from GroupCommunicationDriver API
  - Updated unit tests, functional tests and KMean tests

JIRA:
  [REEF-262](https://issues.apache.org/jira/browse/REEF-262)

Pull Request:
  This closes #151


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/37746a04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/37746a04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/37746a04

Branch: refs/heads/master
Commit: 37746a04477e45235532438725f5c15870a832cd
Parents: 0da6b50
Author: Julia Wang <jw...@yahoo.com>
Authored: Fri Apr 17 09:03:52 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Mon Apr 20 11:16:30 2015 -0700

----------------------------------------------------------------------
 .../KMeans/KMeansDriverHandlers.cs              |  36 ++-
 .../BroadcastReduceDriver.cs                    |  34 ++-
 .../PipelinedBroadcastReduceDriver.cs           |  43 ++--
 .../ScatterReduceDriver.cs                      |  38 +++-
 .../GroupCommunicationTests.cs                  | 115 ++++++++--
 .../GroupCommunicationTreeTopologyTests.cs      |  76 +++++--
 .../Org.Apache.REEF.Network.Tests.csproj        |   4 +
 .../Group/Config/CodecConfiguration.cs          |  42 ++++
 .../PipelineDataConverterConfiguration.cs       |  40 ++++
 .../Group/Config/ReduceFunctionConfiguration.cs |  45 ++++
 .../Group/Driver/ICommunicationGroupDriver.cs   |  62 ++----
 .../Driver/Impl/CommunicationGroupDriver.cs     | 220 ++++++-------------
 .../Group/Operators/IOperatorSpec.cs            |  10 +-
 .../Operators/Impl/BroadcastOperatorSpec.cs     |  43 +---
 .../Group/Operators/Impl/ReduceOperatorSpec.cs  |  49 +----
 .../Group/Operators/Impl/ScatterOperatorSpec.cs |  43 +---
 .../Group/Pipelining/IPipelineDataConverter.cs  |   7 -
 .../Impl/DefaultPipelineDataConverter.cs        |   5 -
 .../Group/Topology/FlatTopology.cs              |  52 ++---
 .../Group/Topology/ITopology.cs                 |   4 +-
 .../Group/Topology/TreeTopology.cs              |  49 ++---
 .../Org.Apache.REEF.Network.csproj              |   3 +
 .../Functional/ML/KMeans/TestKMeans.cs          |   2 +-
 23 files changed, 574 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
index 08173cc..f4c1ca8 100644
--- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
@@ -34,6 +34,7 @@ using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Network.NetworkService.Codec;
 using Org.Apache.REEF.Tang.Annotations;
@@ -43,6 +44,7 @@ using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Network.Group.Topology;
 
 namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
 {
@@ -81,10 +83,38 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
 
             _groupCommDriver = groupCommDriver;
 
+            IConfiguration conf1 = CodecConfiguration<Centroids>.Conf
+                .Set(CodecConfiguration<Centroids>.Codec, GenericType<CentroidsCodec>.Class)
+                .Build();
+
+            IConfiguration dataConverterConfig1 = PipelineDataConverterConfiguration<Centroids>.Conf
+                .Set(PipelineDataConverterConfiguration<Centroids>.DataConverter, GenericType<DefaultPipelineDataConverter<Centroids>>.Class)
+                .Build();
+
+            IConfiguration conf2 = CodecConfiguration<ControlMessage>.Conf
+                .Set(CodecConfiguration<ControlMessage>.Codec, GenericType<ControlMessageCodec>.Class)
+                .Build();
+
+            IConfiguration dataConverterConfig2 = PipelineDataConverterConfiguration<ControlMessage>.Conf
+                .Set(PipelineDataConverterConfiguration<ControlMessage>.DataConverter, GenericType<DefaultPipelineDataConverter<ControlMessage>>.Class)
+                .Build();
+
+            IConfiguration conf3 = CodecConfiguration<ProcessedResults>.Conf
+                .Set(CodecConfiguration<ProcessedResults>.Codec, GenericType<ProcessedResultsCodec>.Class)
+                .Build();
+
+            IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<ProcessedResults>.Conf
+                .Set(ReduceFunctionConfiguration<ProcessedResults>.ReduceFunction, GenericType<KMeansMasterTask.AggregateMeans>.Class)
+                .Build();
+
+            IConfiguration dataConverterConfig3 = PipelineDataConverterConfiguration<ProcessedResults>.Conf
+                .Set(PipelineDataConverterConfiguration<ProcessedResults>.DataConverter, GenericType<DefaultPipelineDataConverter<ProcessedResults>>.Class)
+                .Build();
+
             _commGroup = _groupCommDriver.DefaultGroup
-                   .AddBroadcast<Centroids, CentroidsCodec>(Constants.CentroidsBroadcastOperatorName, Constants.MasterTaskId)
-                   .AddBroadcast<ControlMessage, ControlMessageCodec>(Constants.ControlMessageBroadcastOperatorName, Constants.MasterTaskId)
-                   .AddReduce<ProcessedResults, ProcessedResultsCodec>(Constants.MeansReduceOperatorName, Constants.MasterTaskId, new KMeansMasterTask.AggregateMeans())
+                   .AddBroadcast<Centroids>(Constants.CentroidsBroadcastOperatorName, Constants.MasterTaskId, TopologyTypes.Flat, conf1, dataConverterConfig1)
+                   .AddBroadcast<ControlMessage>(Constants.ControlMessageBroadcastOperatorName, Constants.MasterTaskId, TopologyTypes.Flat, conf2, dataConverterConfig2)
+                   .AddReduce<ProcessedResults>(Constants.MeansReduceOperatorName, Constants.MasterTaskId, TopologyTypes.Flat, conf3, reduceFunctionConfig, dataConverterConfig3)
                    .Build();
 
             _groupCommTaskStarter = new TaskStarter(_groupCommDriver, _totalEvaluators);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
index 760d3b8..fd27557 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
@@ -27,15 +27,20 @@ using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
+using Org.Apache.REEF.Network.Group.Topology;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Remote.Impl;
 
 namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks
@@ -61,14 +66,33 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
             _numEvaluators = numEvaluators;
             _numIterations = numIterations;
             _groupCommDriver = groupCommDriver;
+
+            IConfiguration codecConfig = CodecConfiguration<int>.Conf
+                .Set(CodecConfiguration<int>.Codec, GenericType<IntCodec>.Class)
+                .Build();
+
+            IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<int>.Conf
+                .Set(ReduceFunctionConfiguration<int>.ReduceFunction, GenericType<SumFunction>.Class)
+                .Build();
+
+            IConfiguration dataConverterConfig = PipelineDataConverterConfiguration<int>.Conf
+                .Set(PipelineDataConverterConfiguration<int>.DataConverter, GenericType<DefaultPipelineDataConverter<int>>.Class)
+                .Build();
+
             _commGroup = _groupCommDriver.DefaultGroup
-                    .AddBroadcast<int, IntCodec>(
+                    .AddBroadcast<int>(
                         GroupTestConstants.BroadcastOperatorName,
-                       GroupTestConstants.MasterTaskId)
-                    .AddReduce<int, IntCodec>(
+                        GroupTestConstants.MasterTaskId, 
+                        TopologyTypes.Tree,
+                        codecConfig,
+                        dataConverterConfig)
+                    .AddReduce<int>(
                         GroupTestConstants.ReduceOperatorName,
-                            GroupTestConstants.MasterTaskId,
-                            new SumFunction())
+                        GroupTestConstants.MasterTaskId,
+                        TopologyTypes.Tree,
+                        codecConfig,
+                        reduceFunctionConfig,
+                        dataConverterConfig)
                     .Build();
 
             _groupCommTaskStarter = new TaskStarter(_groupCommDriver, numEvaluators);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
index 5d00fb1..6d06787 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
@@ -27,6 +27,8 @@ using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
+using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
@@ -34,11 +36,13 @@ using Org.Apache.REEF.Network.Group.Pipelining;
 using Org.Apache.REEF.Network.Group.Topology;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
 
 namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks
 {
@@ -61,27 +65,47 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
             [Parameter(typeof(GroupTestConfig.ChunkSize))] int chunkSize,
             GroupCommDriver groupCommDriver)
         {
-            Logger.Log(Level.Info, "*******entering the driver code " + chunkSize);
+            Logger.Log(Level.Info, "entering the driver code " + chunkSize);
 
             Identifier = "BroadcastStartHandler";
             _numEvaluators = numEvaluators;
             _numIterations = numIterations;
             _chunkSize = chunkSize;
 
+            IConfiguration codecConfig = CodecConfiguration<int[]>.Conf
+                .Set(CodecConfiguration<int[]>.Codec, GenericType<IntArrayCodec>.Class)
+                .Build();
+
+            IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<int[]>.Conf
+                .Set(ReduceFunctionConfiguration<int[]>.ReduceFunction, GenericType<ArraySumFunction>.Class)
+                .Build();
+
+            IConfiguration dataConverterConfig = TangFactory.GetTang().NewConfigurationBuilder(
+                PipelineDataConverterConfiguration<int[]>.Conf
+                    .Set(PipelineDataConverterConfiguration<int[]>.DataConverter,
+                        GenericType<PipelineIntDataConverter>.Class)
+                    .Build())
+                .BindNamedParameter<GroupTestConfig.ChunkSize, int>(
+                    GenericType<GroupTestConfig.ChunkSize>.Class,
+                    GroupTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
             _groupCommDriver = groupCommDriver;
 
             _commGroup = _groupCommDriver.DefaultGroup
-                .AddBroadcast<int[], IntArrayCodec>(
+                .AddBroadcast<int[]>(
                     GroupTestConstants.BroadcastOperatorName,
                     GroupTestConstants.MasterTaskId,
                     TopologyTypes.Tree,
-                    new PipelineIntDataConverter(_chunkSize))
-                .AddReduce<int[], IntArrayCodec>(
+                    codecConfig,
+                    dataConverterConfig)
+                .AddReduce<int[]>(
                     GroupTestConstants.ReduceOperatorName,
                     GroupTestConstants.MasterTaskId,
-                    new ArraySumFunction(),
                     TopologyTypes.Tree,
-                    new PipelineIntDataConverter(_chunkSize))
+                    codecConfig,
+                    reduceFunctionConfig,
+                    dataConverterConfig)
                 .Build();
 
             _groupCommTaskStarter = new TaskStarter(_groupCommDriver, numEvaluators);
@@ -308,13 +332,6 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
 
                 return data;
             }
-
-            public IConfiguration GetConfiguration()
-            {
-                return TangFactory.GetTang().NewConfigurationBuilder()
-                .BindNamedParameter<GroupTestConfig.ChunkSize, int>(GenericType<GroupTestConfig.ChunkSize>.Class, _chunkSize.ToString(CultureInfo.InvariantCulture))
-                .Build();
-            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
index 88dac3d..ba812e3 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
@@ -19,6 +19,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Globalization;
 using System.Linq;
 using Org.Apache.REEF.Common.Io;
 using Org.Apache.REEF.Common.Tasks;
@@ -26,12 +27,18 @@ using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
+using Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks;
+using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 using Org.Apache.REEF.Network.Group.Topology;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Logging;
@@ -56,16 +63,35 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
         {
             Identifier = "BroadcastStartHandler";
             _numEvaluators = numEvaluators;
-            _groupCommDriver = groupCommDriver; 
+            _groupCommDriver = groupCommDriver;
+
+            IConfiguration codecConfig = CodecConfiguration<int>.Conf
+                .Set(CodecConfiguration<int>.Codec, GenericType<IntCodec>.Class)
+                .Build();
+
+            IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<int>.Conf
+                .Set(ReduceFunctionConfiguration<int>.ReduceFunction, GenericType<SumFunction>.Class)
+                .Build();
+
+            IConfiguration dataConverterConfig = PipelineDataConverterConfiguration<int>.Conf
+                .Set(PipelineDataConverterConfiguration<int>.DataConverter, GenericType<DefaultPipelineDataConverter<int>>.Class)
+                .Build();
+
             _commGroup = _groupCommDriver.DefaultGroup
-                    .AddScatter<int, IntCodec>(
+                    .AddScatter<int>(
                         GroupTestConstants.ScatterOperatorName,
                             GroupTestConstants.MasterTaskId,
-                            TopologyTypes.Tree)
-                    .AddReduce<int, IntCodec>(
+                            TopologyTypes.Tree, 
+                            codecConfig,
+                            dataConverterConfig)
+                    .AddReduce<int>(
                         GroupTestConstants.ReduceOperatorName,
-                            GroupTestConstants.MasterTaskId,
-                            new SumFunction())
+                        GroupTestConstants.MasterTaskId,
+                        TopologyTypes.Tree, 
+                        codecConfig,
+                        reduceFunctionConfig,
+                        dataConverterConfig)
+
                     .Build();
 
             _groupCommTaskStarter = new TaskStarter(_groupCommDriver, numEvaluators);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
index cb73cd6..b9092aa 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
@@ -26,16 +26,20 @@ using System.Reactive;
 using System.Text;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Examples.MachineLearning.KMeans;
+using Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs;
 using Org.Apache.REEF.Network.Group.Codec;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Network.Group.Topology;
 using Org.Apache.REEF.Network.Naming;
 using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Network.NetworkService.Codec;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
@@ -44,6 +48,7 @@ using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Remote.Impl;
+using Constants = Org.Apache.REEF.Common.Constants;
 
 namespace Org.Apache.REEF.Network.Tests.GroupCommunication
 {
@@ -102,13 +107,19 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommunicationDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommunicationDriver.DefaultGroup
-                .AddBroadcast<int, IntCodec>(
+                .AddBroadcast<int>(
                     broadcastOperatorName,
-                    masterTaskId)
-                .AddReduce<int, IntCodec>(
+                    masterTaskId,
+                    TopologyTypes.Flat,
+                    GetDefaulCodecConfig(),
+                    GetDefaulDataConverterConfig())
+                .AddReduce<int>(
                     reduceOperatorName,
                     masterTaskId,
-                    new SumFunction())
+                    TopologyTypes.Flat,
+                    GetDefaulCodecConfig(),
+                    GetDefaulDataConverterConfig(),
+                    GetDefaulReduceFuncConfig())
                 .Build();
 
             var commGroups = CommGroupClients(groupName, numTasks, groupCommunicationDriver, commGroup);
@@ -157,13 +168,19 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int, IntCodec>(
+                .AddScatter<int>(
                     scatterOperatorName,
-                    masterTaskId)
-                .AddReduce<int, IntCodec>(
-                    reduceOperatorName,
+                    masterTaskId,
+                    TopologyTypes.Flat,
+                    GetDefaulCodecConfig(),
+                    GetDefaulDataConverterConfig())
+                .AddReduce<int>(
+                        reduceOperatorName,
                         masterTaskId,
-                        new SumFunction())
+                        TopologyTypes.Flat,
+                        GetDefaulCodecConfig(),
+                        GetDefaulReduceFuncConfig(),
+                        GetDefaulDataConverterConfig())
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -205,6 +222,43 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
         }
 
         [TestMethod]
+        public void TestCodecConfig()
+        {
+            string groupName = "group1";
+            string masterTaskId = "task0";
+            string driverId = "Driver Id";
+            string meansReduceOperatorName = "MeansReduce";
+            string centroidsBroadcastOperatorName = "CentroidsBroadcast";
+            string controlMessageBroadcastOperatorName = "ControlMessageBroadcast";
+
+
+            IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, 2, 5);
+
+            IConfiguration conf1 = CodecConfiguration<Centroids>.Conf
+                .Set(CodecConfiguration<Centroids>.Codec, GenericType<CentroidsCodec>.Class)
+                .Build();
+
+            IConfiguration conf2 = CodecConfiguration<ControlMessage>.Conf
+                .Set(CodecConfiguration<ControlMessage>.Codec, GenericType<ControlMessageCodec>.Class)
+                .Build();
+
+            IConfiguration conf3 = CodecConfiguration<ProcessedResults>.Conf
+                .Set(CodecConfiguration<ProcessedResults>.Codec, GenericType<ProcessedResultsCodec>.Class)
+                .Build();
+
+            IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<ProcessedResults>.Conf
+                .Set(ReduceFunctionConfiguration<ProcessedResults>.ReduceFunction, GenericType<KMeansMasterTask.AggregateMeans>.Class)
+                .Build();
+
+            IConfiguration merged = Configurations.Merge(conf3, reduceFunctionConfig);
+            var group = groupCommDriver.DefaultGroup
+                   .AddBroadcast<Centroids>(centroidsBroadcastOperatorName, masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
+                   .AddBroadcast<ControlMessage>(controlMessageBroadcastOperatorName, masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
+                   .AddReduce<ProcessedResults>(meansReduceOperatorName, masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig())
+                   .Build();
+        }
+
+        [TestMethod]
         public void TestBroadcastOperator()
         {
             NameServer nameServer = new NameServer(0);
@@ -220,7 +274,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = groupCommDriver.DefaultGroup
-                .AddBroadcast<int, IntCodec>(operatorName, masterTaskId)
+                .AddBroadcast(operatorName, masterTaskId)
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -288,7 +342,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = groupCommDriver.DefaultGroup
-              .AddBroadcast<int, IntCodec>(operatorName, masterTaskId)
+              .AddBroadcast(operatorName, masterTaskId)
               .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -327,7 +381,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = groupCommDriver.DefaultGroup
-                .AddReduce<int, IntCodec>(operatorName, "task0", new SumFunction())
+                .AddReduce<int>(operatorName, "task0", TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig())
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -362,7 +416,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = groupCommDriver.DefaultGroup
-                .AddReduce<int, IntCodec>(operatorName, "task0", new SumFunction())
+                .AddReduce<int>(operatorName, "task0", TopologyTypes.Flat, GetDefaulCodecConfig(),GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig())
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -484,7 +538,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int, IntCodec>(operatorName, masterTaskId)
+                .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig())
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -534,7 +588,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int, IntCodec>(operatorName, masterTaskId)
+                .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -581,7 +635,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             var commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int, IntCodec>(operatorName, masterTaskId)
+                .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
                 .Build();
 
             List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -618,8 +672,8 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
         [TestMethod]
         public void TestConfigurationBroadcastSpec()
         {
-            FlatTopology<int, IntCodec> topology = new FlatTopology<int, IntCodec>("Operator", "Operator", "task1", "driverid",
-                new BroadcastOperatorSpec<int, IntCodec>("Sender"));
+            FlatTopology<int> topology = new FlatTopology<int>("Operator", "Operator", "task1", "driverid",
+                new BroadcastOperatorSpec("Sender", GetDefaulCodecConfig(), GetDefaulDataConverterConfig()));
 
             topology.AddTask("task1");
             var conf = topology.GetTaskConfiguration("task1");
@@ -631,8 +685,8 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
         [TestMethod]
         public void TestConfigurationReduceSpec()
         {
-            FlatTopology<int, IntCodec> topology = new FlatTopology<int, IntCodec>("Operator", "Group", "task1", "driverid",
-                new ReduceOperatorSpec<int, IntCodec>("task1", new SumFunction()));
+            FlatTopology<int> topology = new FlatTopology<int>("Operator", "Group", "task1", "driverid",
+                new ReduceOperatorSpec("task1", Configurations.Merge(GetDefaulCodecConfig(), GetDefaulDataConverterConfig(),  GetDefaulReduceFuncConfig())));
 
             topology.AddTask("task1");
             var conf2 = topology.GetTaskConfiguration("task1");
@@ -712,6 +766,27 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
         {
             return Enumerable.Range(1, n).Sum();
         }
+
+        private IConfiguration GetDefaulCodecConfig()
+        {
+            return CodecConfiguration<int>.Conf
+                .Set(CodecConfiguration<int>.Codec, GenericType<IntCodec>.Class)
+                .Build();
+        }
+
+        private IConfiguration GetDefaulReduceFuncConfig()
+        {
+            return ReduceFunctionConfiguration<int>.Conf
+                .Set(ReduceFunctionConfiguration<int>.ReduceFunction, GenericType<SumFunction>.Class)
+                .Build();
+        }
+
+        private IConfiguration GetDefaulDataConverterConfig()
+        {
+            return PipelineDataConverterConfiguration<int>.Conf
+                .Set(PipelineDataConverterConfiguration<int>.DataConverter, GenericType<DefaultPipelineDataConverter<int>>.Class)
+                .Build();
+        }
     }
 
     public class SumFunction : IReduceFunction<int>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
index b159153..f8af722 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
@@ -18,12 +18,19 @@
  */
 
 using System.Collections.Generic;
+using System.Globalization;
 using System.Linq;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 using Org.Apache.REEF.Network.Group.Topology;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.Remote.Impl;
 
 namespace Org.Apache.REEF.Network.Tests.GroupCommunication
@@ -34,8 +41,8 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
         [TestMethod]
         public void TestTreeTopology()
         {
-            TreeTopology<int, IntCodec> topology = new TreeTopology<int, IntCodec>("Operator", "Operator", "task1", "driverid",
-                new BroadcastOperatorSpec<int, IntCodec>("task1"), 2);
+            TreeTopology<int> topology = new TreeTopology<int>("Operator", "Operator", "task1", "driverid",
+                new BroadcastOperatorSpec("task1", GetDefaulCodecConfig(), GetDefaulDataConverterConfig()), 2);
             for (int i = 1; i < 8; i++)
             {
                 string taskid = "task" + i;
@@ -61,7 +68,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddReduce<int, IntCodec>(operatorName, masterTaskId, new SumFunction(), TopologyTypes.Tree)
+                .AddReduce<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig())
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -117,7 +124,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddBroadcast<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
+                .AddBroadcast<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig())
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -193,15 +200,19 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddBroadcast<int, IntCodec>(
+                .AddBroadcast<int>(
                     broadcastOperatorName,
                     masterTaskId,
-                    TopologyTypes.Tree)
-                .AddReduce<int, IntCodec>(
+                    TopologyTypes.Tree,
+                    GetDefaulCodecConfig(),
+                    GetDefaulDataConverterConfig())
+                .AddReduce<int>(
                     reduceOperatorName,
                     masterTaskId,
-                    new SumFunction(),
-                    TopologyTypes.Tree)
+                    TopologyTypes.Tree,
+                    GetDefaulCodecConfig(),
+                    GetDefaulDataConverterConfig(),
+                    GetDefaulReduceFuncConfig())
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -306,7 +317,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
+                .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -351,7 +362,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
+                .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -405,7 +416,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
+                .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -456,7 +467,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
+                .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -508,7 +519,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
 
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-                .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
+                .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig())
                 .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
@@ -570,17 +581,21 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             int fanOut = 2;
 
             var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
             ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
-              .AddScatter<int, IntCodec>(
+              .AddScatter<int>(
                     scatterOperatorName,
                     masterTaskId,
-                    TopologyTypes.Tree)
-                .AddReduce<int, IntCodec>(
+                    TopologyTypes.Tree,
+                    GetDefaulCodecConfig(),
+                    GetDefaulDataConverterConfig())
+                .AddReduce<int>(
                     reduceOperatorName,
                     masterTaskId,
-                    new SumFunction(),
-                    TopologyTypes.Tree).Build();
+                    TopologyTypes.Tree,
+                    GetDefaulCodecConfig(),
+                    GetDefaulDataConverterConfig(),
+                    GetDefaulReduceFuncConfig())
+                .Build();
 
             var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup);
 
@@ -630,5 +645,26 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             int sum = sumReducer.Reduce();
             Assert.AreEqual(sum, 6325);
         }
+
+        private IConfiguration GetDefaulCodecConfig()
+        {
+            return CodecConfiguration<int>.Conf
+                .Set(CodecConfiguration<int>.Codec, GenericType<IntCodec>.Class)
+                .Build();
+        }
+
+        private IConfiguration GetDefaulReduceFuncConfig()
+        {
+            return ReduceFunctionConfiguration<int>.Conf
+                .Set(ReduceFunctionConfiguration<int>.ReduceFunction, GenericType<SumFunction>.Class)
+                .Build();
+        }
+
+        private IConfiguration GetDefaulDataConverterConfig()
+        {
+            return PipelineDataConverterConfiguration<int>.Conf
+                .Set(PipelineDataConverterConfiguration<int>.DataConverter, GenericType<DefaultPipelineDataConverter<int>>.Class)
+                .Build();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
index f67e0ea..26dd98a 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
@@ -61,6 +61,10 @@ under the License.
       <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project>
       <Name>Org.Apache.REEF.Common</Name>
     </ProjectReference>
+    <ProjectReference Include="..\Org.Apache.REEF.Examples\Org.Apache.REEF.Examples.csproj">
+      <Project>{75503f90-7b82-4762-9997-94b5c68f15db}</Project>
+      <Name>Org.Apache.REEF.Examples</Name>
+    </ProjectReference>
     <ProjectReference Include="..\Org.Apache.REEF.Network\Org.Apache.REEF.Network.csproj">
       <Project>{883ce800-6a6a-4e0a-b7fe-c054f4f2c1dc}</Project>
       <Name>Org.Apache.REEF.Network</Name>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecConfiguration.cs
new file mode 100644
index 0000000..7836c85
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecConfiguration.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Config
+{
+    public class CodecConfiguration<T> : ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// RequiredImpl for Codec. Client needs to set implementation for this paramter
+        /// </summary>
+        public static readonly RequiredImpl<ICodec<T>> Codec = new RequiredImpl<ICodec<T>>();
+
+        /// <summary>
+        /// Configuration Module for Codec
+        /// </summary>
+        public static ConfigurationModule Conf = new CodecConfiguration<T>()
+            .BindImplementation(GenericType<ICodec<T>>.Class, Codec)
+            .BindImplementation(GenericType<ICodec<PipelineMessage<T>>>.Class, GenericType<PipelineMessageCodec<T>>.Class)
+            .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Config/PipelineDataConverterConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/PipelineDataConverterConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/PipelineDataConverterConfiguration.cs
new file mode 100644
index 0000000..a598c26
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/PipelineDataConverterConfiguration.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Network.Group.Config
+{
+    public class PipelineDataConverterConfiguration<T> : ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// Required Imple parameter for Pipeline Data Converter. Client needs to set an implementation for it. 
+        /// </summary>
+        public static readonly RequiredImpl<IPipelineDataConverter<T>> DataConverter = new RequiredImpl<IPipelineDataConverter<T>>();
+
+        /// <summary>
+        /// Confgiuration Module for Pipeline Data Converter
+        /// </summary>
+        public static ConfigurationModule Conf = new PipelineDataConverterConfiguration<T>()
+            .BindImplementation(GenericType<IPipelineDataConverter<T>>.Class, DataConverter)
+            .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Config/ReduceFunctionConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/ReduceFunctionConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/ReduceFunctionConfiguration.cs
new file mode 100644
index 0000000..1fc8ae9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/ReduceFunctionConfiguration.cs
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Network.Group.Config
+{
+    public class ReduceFunctionConfiguration<T> : ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// RequiredImpl for Reduced Function. Client needs to set implementation for this paramter
+        /// </summary>
+        public static readonly RequiredImpl<IReduceFunction<T>> ReduceFunction = new RequiredImpl<IReduceFunction<T>>();
+        
+        /// <summary>
+        /// Configuration Module for Reduced Function
+        /// </summary>
+        public static ConfigurationModule Conf = new ReduceFunctionConfiguration<T>()
+            .BindImplementation(GenericType<IReduceFunction<T>>.Class, ReduceFunction)
+            .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
index b47e076..3cc7270 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
@@ -43,25 +43,13 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <summary>
         /// Adds the Broadcast Group Communication operator to the communication group.
         /// </summary>
-        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
-        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
+        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <param name="configurations">The configuration for task</param>
         /// <param name="operatorName">The name of the broadcast operator</param>
         /// <param name="masterTaskId">The master task id in broadcast operator</param>
         /// <param name="topologyType">The topology type for the operator</param>
-        /// <param name="pipelineDataConverter">The class used to convert data back and forth to pipelined one</param>
         /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
-        ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes topologyType, IPipelineDataConverter<TMessage> pipelineDataConverter) where TMessageCodec : ICodec<TMessage>;
-
-        /// <summary>
-        /// Adds the Broadcast Group Communication operator to the communication group.
-        /// </summary>
-        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
-        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
-        /// <param name="operatorName">The name of the broadcast operator</param>
-        /// <param name="masterTaskId">The master task id in broadcast operator</param>
-        /// <param name="topologyType">The topology type for the operator</param>
-        /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
-        ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>;
+        ICommunicationGroupDriver AddBroadcast<T>(string operatorName, string masterTaskId, TopologyTypes topologyType, params IConfiguration[] configurations);
 
         /// <summary>
         /// Adds the Broadcast Group Communication operator to the communication group. Default to IntCodec
@@ -75,57 +63,33 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <summary>
         /// Adds the Reduce Group Communication operator to the communication group.
         /// </summary>
-        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
-        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
-        /// <param name="operatorName">The name of the reduce operator</param>
-        /// <param name="masterTaskId">The master task id for the typology</param>
-        /// <param name="reduceFunction">The class used to aggregate all messages.</param>
-        /// <param name="topologyType">The topology for the operator</param>
-        /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
-        ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(string operatorName, string masterTaskId, IReduceFunction<TMessage> reduceFunction, TopologyTypes topologyType, IPipelineDataConverter<TMessage> pipelineDataConverter) where TMessageCodec : ICodec<TMessage>;
-
-        /// <summary>
-        /// Adds the Reduce Group Communication operator to the communication group.
-        /// </summary>
-        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
-        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
+        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <param name="configurations">The configurations for task</param>
         /// <param name="operatorName">The name of the reduce operator</param>
         /// <param name="masterTaskId">The master task id for the typology</param>
-        /// <param name="reduceFunction">The class used to aggregate all messages.</param>
         /// <param name="topologyType">The topology for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
-        ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(string operatorName, string masterTaskId, IReduceFunction<TMessage> reduceFunction, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>;
-
+        ICommunicationGroupDriver AddReduce<T>(string operatorName, string masterTaskId, TopologyTypes topologyType, params IConfiguration[] configurations);
 
         /// <summary>
-        /// Adds the Reduce Group Communication operator to the communication group with default IntCodec
-        /// </summary>
-        /// <param name="operatorName">The name of the reduce operator</param>
-        /// <param name="masterTaskId">The master task id for the typology</param>
-        /// <param name="reduceFunction">The class used to aggregate all messages.</param>
-        /// <param name="topologyType">The topology for the operator</param>
-        /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
-        ICommunicationGroupDriver AddReduce(string operatorName, string masterTaskId, IReduceFunction<int> reduceFunction, TopologyTypes topologyType = TopologyTypes.Flat);
-
-        /// <summary>
-        /// Adds the Scatter Group Communication operator to the communication group.
+        /// Adds the Scatter Group Communication operator to the communication group with default Codec
         /// </summary>
-        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
-        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
         /// <param name="operatorName">The name of the scatter operator</param>
         /// <param name="senderId">The sender id</param>
         /// <param name="topologyType">type of topology used in the operaor</param>
         /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
-        ICommunicationGroupDriver AddScatter<TMessage, TMessageCodec>(string operatorName, string senderId, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>;
+        ICommunicationGroupDriver AddScatter(string operatorName, string senderId, TopologyTypes topologyType = TopologyTypes.Flat);
 
         /// <summary>
-        /// Adds the Scatter Group Communication operator to the communication group with default Codec
+        /// Adds the Scatter Group Communication operator to the communication group.
         /// </summary>
+        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <param name="configurations">The configuration for task</param>
         /// <param name="operatorName">The name of the scatter operator</param>
         /// <param name="senderId">The sender id</param>
         /// <param name="topologyType">type of topology used in the operaor</param>
         /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
-        ICommunicationGroupDriver AddScatter(string operatorName, string senderId, TopologyTypes topologyType = TopologyTypes.Flat);
+        ICommunicationGroupDriver AddScatter<T>(string operatorName, string senderId, TopologyTypes topologyType, params IConfiguration[] configurations);
 
         /// <summary>
         /// Finalizes the CommunicationGroupDriver.
@@ -150,4 +114,4 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <returns>The Task Configuration for this communication group</returns>
         IConfiguration GetGroupTaskConfiguration(string taskId);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
index c3923ae..89238f8 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
@@ -19,14 +19,15 @@
 
 using System.Collections.Generic;
 using System.Reflection;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining;
 using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 using Org.Apache.REEF.Network.Group.Topology;
 using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
@@ -43,7 +44,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
     /// </summary>
     public class CommunicationGroupDriver : ICommunicationGroupDriver
     {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupDriver));
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof (CommunicationGroupDriver));
 
         private readonly string _groupName;
         private readonly string _driverId;
@@ -94,33 +95,34 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
 
         /// <summary>
         /// </summary>
-        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
-        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
+        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <param name="configurations">The configuration send to Evaluator</param>
         /// <param name="operatorName">The name of the broadcast operator</param>
         /// <param name="masterTaskId">The master task id in broadcast operator</param>
         /// <param name="topologyType">The topology type for the operator</param>
-        /// <param name="pipelineDataConverter">The class type used to convert data back and forth to pipelined one</param>
         /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
         /// <returns></returns>
-        public ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes topologyType, IPipelineDataConverter<TMessage> pipelineDataConverter) where TMessageCodec : ICodec<TMessage>
+        public ICommunicationGroupDriver AddBroadcast<T>(string operatorName, string masterTaskId, TopologyTypes topologyType, params IConfiguration[] configurations)
         {
             if (_finalized)
             {
                 throw new IllegalStateException("Can't add operators once the spec has been built.");
             }
 
-            var spec = new BroadcastOperatorSpec<TMessage, TMessageCodec>(
+            var spec = new BroadcastOperatorSpec(
                 masterTaskId,
-                pipelineDataConverter);
+                configurations);
 
-            ITopology<TMessage, TMessageCodec> topology;
+            ITopology<T> topology;
             if (topologyType == TopologyTypes.Flat)
             {
-                topology = new FlatTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec);
+                topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId,
+                    spec);
             }
             else
             {
-                topology = new TreeTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec,
+                topology = new TreeTopology<T>(operatorName, _groupName, spec.SenderId, _driverId,
+                    spec,
                     _fanOut);
             }
 
@@ -131,135 +133,52 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         }
 
         /// <summary>
+        /// Adds the Broadcast Group Communication operator to the communication group. Default to Int message type
         /// </summary>
-        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
-        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
         /// <param name="operatorName">The name of the broadcast operator</param>
         /// <param name="masterTaskId">The master task id in broadcast operator</param>
         /// <param name="topologyType">The topology type for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
-        /// <returns></returns>
-        public ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>
-        {
-            if (_finalized)
-            {
-                throw new IllegalStateException("Can't add operators once the spec has been built.");
-            }
-
-            var spec = new BroadcastOperatorSpec<TMessage, TMessageCodec>(
-                masterTaskId,
-                new DefaultPipelineDataConverter<TMessage>());
-
-            ITopology<TMessage, TMessageCodec> topology;
-            if (topologyType == TopologyTypes.Flat)
-            {
-                topology = new FlatTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec);
-            }
-            else
-            {
-                topology = new TreeTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec,
-                    _fanOut);
-            }
-
-            _topologies[operatorName] = topology;
-            _operatorSpecs[operatorName] = spec;
-
-            return this;
-        }
-
-        /// <summary>
-        /// Adds the Broadcast Group Communication operator to the communication group. Default to IntCodec
-        /// </summary>
-        /// <param name="operatorName">The name of the broadcast operator</param>
-        /// <param name="masterTaskId">The master task id in broadcast operator</param>
-        /// <param name="topologyType">The topology type for the operator</param>
-        /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
-        public ICommunicationGroupDriver AddBroadcast(string operatorName, string masterTaskId,
-            TopologyTypes topologyType = TopologyTypes.Flat)
+        public ICommunicationGroupDriver AddBroadcast(string operatorName, string masterTaskId, TopologyTypes topologyType = TopologyTypes.Flat)
         {
-            return AddBroadcast<int, IntCodec>(operatorName, masterTaskId, topologyType);
+            return AddBroadcast<int>( operatorName, masterTaskId, topologyType, GetDefaulConfiguration());
         }
 
         /// <summary>
         /// Adds the Reduce Group Communication operator to the communication group.
         /// </summary>
-        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
-        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
+        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <param name="configurations">The configuration for the reduce operator</param>
         /// <param name="operatorName">The name of the reduce operator</param>
         /// <param name="masterTaskId">The master task id for the typology</param>
-        /// <param name="reduceFunction">The class used to aggregate all messages.</param>
         /// <param name="topologyType">The topology for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
-        public ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(
+        public ICommunicationGroupDriver AddReduce<T>(
             string operatorName,
             string masterTaskId,
-            IReduceFunction<TMessage> reduceFunction,
             TopologyTypes topologyType,
-            IPipelineDataConverter<TMessage> pipelineDataConverter) where TMessageCodec : ICodec<TMessage>
-        {
-            if (_finalized)
-            {
-                throw new IllegalStateException("Can't add operators once the spec has been built.");
-            }
-
-            var spec = new ReduceOperatorSpec<TMessage, TMessageCodec>(
-                masterTaskId,
-                pipelineDataConverter,
-                reduceFunction);
-
-            ITopology<TMessage, TMessageCodec> topology;
-
-            if (topologyType == TopologyTypes.Flat)
-            {
-                topology = new FlatTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.ReceiverId, _driverId, spec);
-            }
-            else
-            {
-                topology = new TreeTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.ReceiverId, _driverId, spec,
-                    _fanOut);
-            }
-
-            _topologies[operatorName] = topology;
-            _operatorSpecs[operatorName] = spec;
-
-            return this;
-        }
-
-               /// <summary>
-        /// Adds the Reduce Group Communication operator to the communication group.
-        /// </summary>
-        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
-        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
-        /// <param name="operatorName">The name of the reduce operator</param>
-        /// <param name="masterTaskId">The master task id for the typology</param>
-        /// <param name="reduceFunction">The class used to aggregate all messages.</param>
-        /// <param name="topologyType">The topology for the operator</param>
-        /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
-        public ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(
-            string operatorName,
-            string masterTaskId,
-            IReduceFunction<TMessage> reduceFunction,
-            TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>
+            params IConfiguration[] configurations) 
         {
             if (_finalized)
             {
                 throw new IllegalStateException("Can't add operators once the spec has been built.");
             }
 
-            var spec = new ReduceOperatorSpec<TMessage, TMessageCodec>(
+            var spec = new ReduceOperatorSpec(
                 masterTaskId,
-                new DefaultPipelineDataConverter<TMessage>(),
-                reduceFunction);
+                configurations);
 
-            ITopology<TMessage, TMessageCodec> topology;
+            ITopology<T> topology;
 
             if (topologyType == TopologyTypes.Flat)
             {
-                topology = new FlatTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.ReceiverId, _driverId, spec);
+                topology = new FlatTopology<T>(operatorName, _groupName, spec.ReceiverId,
+                    _driverId, spec);
             }
             else
             {
-                topology = new TreeTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.ReceiverId, _driverId, spec,
+                topology = new TreeTopology<T>(operatorName, _groupName, spec.ReceiverId,
+                    _driverId, spec,
                     _fanOut);
             }
 
@@ -270,49 +189,35 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         }
 
         /// <summary>
-        /// Adds the Reduce Group Communication operator to the communication group with default IntCodec
-        /// </summary>
-        /// <param name="operatorName">The name of the reduce operator</param>
-        /// <param name="masterTaskId">The master task id for the typology</param>
-        /// <param name="reduceFunction">The class used to aggregate all messages.</param>
-        /// <param name="topologyType">The topology for the operator</param>
-        /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
-        public ICommunicationGroupDriver AddReduce(
-            string operatorName,
-            string masterTaskId,
-            IReduceFunction<int> reduceFunction,
-            TopologyTypes topologyType = TopologyTypes.Flat)
-        {
-            return AddReduce<int, IntCodec>(operatorName, masterTaskId, reduceFunction, topologyType);
-        }
-
-        /// <summary>
         /// Adds the Scatter Group Communication operator to the communication group.
         /// </summary>
-        /// <typeparam name="TMessage">The type of messages that operators will send</typeparam>
-        /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam>
+        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <param name="configurations">The configuration for the scatter operator</param>
         /// <param name="operatorName">The name of the scatter operator</param>
         /// <param name="senderId">The sender id</param>
         /// <param name="topologyType">type of topology used in the operaor</param>
         /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
-        public ICommunicationGroupDriver AddScatter<TMessage, TMessageCodec>(string operatorName, string senderId, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>
+        public ICommunicationGroupDriver AddScatter<T>(string operatorName, string senderId,
+            TopologyTypes topologyType, params IConfiguration[] configurations) 
         {
             if (_finalized)
             {
                 throw new IllegalStateException("Can't add operators once the spec has been built.");
             }
 
-            var spec = new ScatterOperatorSpec<TMessage, TMessageCodec>(senderId);
+            var spec = new ScatterOperatorSpec(senderId, configurations);
 
-            ITopology<TMessage, TMessageCodec> topology;
+            ITopology<T> topology;
 
             if (topologyType == TopologyTypes.Flat)
             {
-                topology = new FlatTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec);
+                topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId,
+                    spec);
             }
             else
             {
-                topology = new TreeTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec,
+                topology = new TreeTopology<T>(operatorName, _groupName, spec.SenderId, _driverId,
+                    spec,
                     _fanOut);
             }
             _topologies[operatorName] = topology;
@@ -322,15 +227,16 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         }
 
         /// <summary>
-        /// Adds the Scatter Group Communication operator to the communication group with default IntCodec
+        /// Adds the Scatter Group Communication operator to the communication group with default Int message type
         /// </summary>
         /// <param name="operatorName">The name of the scatter operator</param>
         /// <param name="senderId">The sender id</param>
         /// <param name="topologyType">type of topology used in the operaor</param>
         /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
-        public ICommunicationGroupDriver AddScatter(string operatorName, string senderId, TopologyTypes topologyType = TopologyTypes.Flat)
+        public ICommunicationGroupDriver AddScatter(string operatorName, string senderId,
+            TopologyTypes topologyType = TopologyTypes.Flat)
         {
-            return AddScatter<int, IntCodec>(operatorName, senderId, topologyType);
+            return AddScatter<int>(operatorName, senderId, topologyType, GetDefaulConfiguration());
         }
 
         /// <summary>
@@ -354,7 +260,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         {
             if (!_finalized)
             {
-                throw new IllegalStateException("CommunicationGroupDriver must call Build() before adding tasks to the group.");
+                throw new IllegalStateException(
+                    "CommunicationGroupDriver must call Build() before adding tasks to the group.");
             }
 
             lock (_topologyLock)
@@ -362,7 +269,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
                 _tasksAdded++;
                 if (_tasksAdded > _numTasks)
                 {
-                    throw new IllegalStateException("Added too many tasks to Communication Group, expected: " + _numTasks);
+                    throw new IllegalStateException("Added too many tasks to Communication Group, expected: " +
+                                                    _numTasks);
                 }
 
                 TaskIds.Add(taskId);
@@ -406,14 +314,15 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
 
             foreach (var operatorName in _topologies.Keys)
             {
-                var innerConf = TangFactory.GetTang().NewConfigurationBuilder(GetOperatorConfiguration(operatorName, taskId))
-                    .BindNamedParameter<GroupCommConfigurationOptions.DriverId, string>(
-                        GenericType<GroupCommConfigurationOptions.DriverId>.Class,
-                        _driverId)
-                    .BindNamedParameter<GroupCommConfigurationOptions.OperatorName, string>(
-                        GenericType<GroupCommConfigurationOptions.OperatorName>.Class,
-                        operatorName)
-                    .Build();
+                var innerConf =
+                    TangFactory.GetTang().NewConfigurationBuilder(GetOperatorConfiguration(operatorName, taskId))
+                        .BindNamedParameter<GroupCommConfigurationOptions.DriverId, string>(
+                            GenericType<GroupCommConfigurationOptions.DriverId>.Class,
+                            _driverId)
+                        .BindNamedParameter<GroupCommConfigurationOptions.OperatorName, string>(
+                            GenericType<GroupCommConfigurationOptions.OperatorName>.Class,
+                            operatorName)
+                        .Build();
 
                 confBuilder.BindSetEntry<GroupCommConfigurationOptions.SerializedOperatorConfigs, string>(
                     GenericType<GroupCommConfigurationOptions.SerializedOperatorConfigs>.Class,
@@ -427,14 +336,31 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         {
             var topology = _topologies[operatorName];
             MethodInfo info = topology.GetType().GetMethod("AddTask");
-            info.Invoke(topology, new[] { (object)taskId });
+            info.Invoke(topology, new[] {(object) taskId});
         }
 
         private IConfiguration GetOperatorConfiguration(string operatorName, string taskId)
         {
             var topology = _topologies[operatorName];
             MethodInfo info = topology.GetType().GetMethod("GetTaskConfiguration");
-            return (IConfiguration)info.Invoke(topology, new[] { (object)taskId });
+            return (IConfiguration) info.Invoke(topology, new[] {(object) taskId});
+        }
+
+        private IConfiguration[] GetDefaulConfiguration()
+        {
+            List<IConfiguration> list = new List<IConfiguration>(); 
+            IConfiguration codecConfig = CodecConfiguration<int>.Conf
+                .Set(CodecConfiguration<int>.Codec, GenericType<IntCodec>.Class)
+                .Build();
+
+            IConfiguration dataConverterConfig = PipelineDataConverterConfiguration<int>.Conf
+                .Set(PipelineDataConverterConfiguration<int>.DataConverter, GenericType<DefaultPipelineDataConverter<int>>.Class)
+                .Build();
+
+            list.Add(codecConfig);
+            list.Add(dataConverterConfig);
+
+            return list.ToArray();
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
index 4a1bfd7..920873b 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
@@ -17,19 +17,15 @@
  * under the License.
  */
 
-using System;
-using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Tang.Interface;
 
 namespace Org.Apache.REEF.Network.Group.Operators
 {
     /// <summary>
     /// The specification used to define Broadcast Operators.
     /// </summary>
-    public interface IOperatorSpec<T1, T2> where T2 : ICodec<T1>
+    public interface IOperatorSpec
     {
-        /// <summary>
-        /// Returns the codec type used to serialize and deserialize messages.
-        /// </summary>
-        Type Codec { get; }
+        IConfiguration Configiration { get; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/37746a04/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
index cd8122a..ea62588 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
@@ -17,58 +17,35 @@
  * under the License.
  */
 
-using System;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Network.Group.Pipelining;
-using Org.Apache.REEF.Network.Group.Pipelining.Impl;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Interface;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
 {
     /// <summary>
     /// The specification used to define Broadcast Operators.
     /// </summary>
-    public class BroadcastOperatorSpec<T1, T2> : IOperatorSpec<T1, T2> where T2 : ICodec<T1>
+    public class BroadcastOperatorSpec : IOperatorSpec
     {
         /// <summary>
-        /// Create a new BroadcastOperatorSpec.
+        /// Specification for Broadcast Operator
         /// </summary>
-        /// <param name="senderId">The identifier of the root sending Task.</param>
-        /// <param name="codecType">The codec used to serialize messages.</param>
-        public BroadcastOperatorSpec(string senderId)
+        /// <param name="senderId"></param>
+        /// <param name="configurations"></param>
+        public BroadcastOperatorSpec(string senderId, params IConfiguration[] configurations)
         {
             SenderId = senderId;
-             Codec = typeof(T2);
-            PipelineDataConverter = new DefaultPipelineDataConverter<T1>();
+            Configiration = Configurations.Merge(configurations);
         }
 
         /// <summary>
-        /// Create a new BroadcastOperatorSpec.
-        /// </summary>
-        /// <param name="senderId">The identifier of the root sending Task.</param>
-        /// <param name="dataConverter">The converter used to convert original
-        /// message to pipelined ones and vice versa.</param>
-        public BroadcastOperatorSpec(
-            string senderId,
-            IPipelineDataConverter<T1> dataConverter)
-        {
-            SenderId = senderId;
-            Codec = typeof(T2);;
-            PipelineDataConverter = dataConverter ?? new DefaultPipelineDataConverter<T1>();
-        }
-
-        /// <summary>
-        /// Returns the IPipelineDataConverter class type used to convert messages to pipeline form and vice-versa
-        /// </summary>
-        public IPipelineDataConverter<T1> PipelineDataConverter { get; private set; }
-
-        /// <summary>
         /// Returns the identifier of the Task that will broadcast data to other Tasks.
         /// </summary>
         public string SenderId { get; private set; }
 
         /// <summary>
-        /// Returns the ICodec used to serialize messages.
+        /// Returns the Configuration for Codec, ReduceFunction and DataConverter
         /// </summary>
-        public Type Codec { get; private set; }
+        public IConfiguration Configiration { get; private set; }
     }
 }