You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2017/05/12 02:40:05 UTC

reef git commit: [REEF-1735] Define IMetricsSink interface

Repository: reef
Updated Branches:
  refs/heads/master 39adc451b -> b90a3f690


[REEF-1735] Define IMetricsSink interface

* Add IMetricsSink interface and default impl
* Add named parameter for MetricsSinks
* Add MetricsSinks to MetricsService
* Modify test cases

JIRA:
  [REEF-1735](https://issues.apache.org/jira/browse/REEF-1735)

Pull request
  This closes #1262


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/b90a3f69
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/b90a3f69
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/b90a3f69

Branch: refs/heads/master
Commit: b90a3f690e859fa23319abe33fdbc259007fa393
Parents: 39adc45
Author: Julia Wang <ju...@apache.org>
Authored: Tue Mar 14 11:52:53 2017 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Thu May 11 19:39:00 2017 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Common.csproj               |   6 ++
 .../Properties/AssemblyInfo.cs                  |   6 ++
 .../Telemetry/CounterData.cs                    |  77 ++++++++++++++
 .../Telemetry/CounterSinkThreshold.cs           |  26 +++++
 .../Telemetry/CountersData.cs                   | 103 +++++++++++++++++++
 .../Telemetry/DefaultMetricsSink.cs             |  57 ++++++++++
 .../Telemetry/IMetricsSink.cs                   |  33 ++++++
 .../Telemetry/MetricSinks.cs                    |  31 ++++++
 .../Telemetry/MetricsService.cs                 |  63 +++++++++---
 .../MetricsServiceConfigurationModule.cs        |   8 ++
 .../Functional/Telemetry/TestMetricsMessage.cs  |   6 +-
 11 files changed, 399 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 8feb0e1..4bdf723 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -247,12 +247,18 @@ under the License.
     <Compile Include="Tasks\TaskConfigurationOptions.cs" />
     <Compile Include="Tasks\TaskMessage.cs" />
     <Compile Include="Telemetry\Counter.cs" />
+    <Compile Include="Telemetry\CounterData.cs" />
+    <Compile Include="Telemetry\CountersData.cs" />
     <Compile Include="Telemetry\Counters.cs" />
+    <Compile Include="Telemetry\CounterSinkThreshold.cs" />
+    <Compile Include="Telemetry\DefaultMetricsSink.cs" />
     <Compile Include="Telemetry\EvaluatorMetrics.cs" />
     <Compile Include="Telemetry\ICounter.cs" />
     <Compile Include="Telemetry\ICounters.cs" />
     <Compile Include="Telemetry\IEvaluatorMetrics.cs" />
+    <Compile Include="Telemetry\IMetricsSink.cs" />
     <Compile Include="Telemetry\MessageSenderConfigurationModule.cs" />
+    <Compile Include="Telemetry\MetricSinks.cs" />
     <Compile Include="Telemetry\MetricsMessageSender.cs" />
     <Compile Include="Telemetry\MetricsService.cs" />
   </ItemGroup>

http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs
index 4a8c546..d126a7e 100644
--- a/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs
@@ -62,6 +62,12 @@ using System.Runtime.InteropServices;
  "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" +
  "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")]
 
+[assembly: InternalsVisibleTo("Org.Apache.REEF.Tests, publickey=" +
+ "00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9" +
+ "9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc" +
+ "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" +
+ "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")]
+
 // Allow NSubstitute to create proxy implementations
 [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=002400000480000" +
  "0940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a36" +

http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs
new file mode 100644
index 0000000..5f23262
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// This class wraps a Counter object and the increment value since last sink
+    /// </summary>
+    internal sealed class CounterData
+    {
+        /// <summary>
+        /// Counter object
+        /// </summary>
+        private ICounter _counter;
+
+        /// <summary>
+        /// Counter increment value since last sink
+        /// </summary>
+        internal int IncrementSinceLastSink { get; private set; }
+
+        /// <summary>
+        /// Constructor for CounterData
+        /// </summary>
+        /// <param name="counter"></param>
+        /// <param name="initialValue"></param>
+        internal CounterData(ICounter counter, int initialValue)
+        {
+            _counter = counter;
+            IncrementSinceLastSink = initialValue;
+        }
+
+        /// <summary>
+        /// clear the increment since last sink
+        /// </summary>
+        internal void ResetSinceLastSink()
+        {
+            IncrementSinceLastSink = 0;
+        }
+
+        internal void UpdateCounter(ICounter counter)
+        {
+            IncrementSinceLastSink += counter.Value - _counter.Value;
+
+            //// TODO: [REEF-1748] The following cases need to be considered in determine how to update the counter:
+            //// if evaluator contains the aggregated values, the value will override existing value
+            //// if evaluator only keep delta, the value should be added at here. But the value in the evaluator should be reset after message is sent
+            //// For the counters from multiple evaluators with the same counter name, the value should be aggregated here
+            //// We also need to consider failure cases.  
+            _counter = counter;
+        }
+
+        /// <summary>
+        /// Get count name and value as KeyValuePair
+        /// </summary>
+        /// <returns></returns>
+        internal KeyValuePair<string, string> GetKeyValuePair()
+        {
+            return new KeyValuePair<string, string>(_counter.Name, _counter.Value.ToString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
new file mode 100644
index 0000000..0f458c0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
+    public class CounterSinkThreshold : Name<int>
+    {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
new file mode 100644
index 0000000..55393b0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// This class maintains a collection of the data for all the counters for metrics service. 
+    /// When new counter data is received, the data in the collection will be updated.
+    /// After the data is processed, the increment since last process will be reset.
+    /// </summary>
+    internal sealed class CountersData
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(CountersData));
+
+        /// <summary>
+        /// Registration of counters
+        /// </summary>
+        private readonly IDictionary<string, CounterData> _counterMap = new ConcurrentDictionary<string, CounterData>();
+
+        [Inject]
+        private CountersData()
+        {            
+        }
+
+        /// <summary>
+        /// Update counters 
+        /// </summary>
+        /// <param name="counters"></param>
+        internal void Update(ICounters counters)
+        {
+            foreach (var counter in counters.GetCounters())
+            {
+                CounterData counterData;
+                if (_counterMap.TryGetValue(counter.Name, out counterData))
+                {
+                    counterData.UpdateCounter(counter);
+                }
+                else
+                {
+                    _counterMap.Add(counter.Name, new CounterData(counter, counter.Value));
+                }
+
+                Logger.Log(Level.Verbose, "Counter name: {0}, value: {1}, description: {2}, time: {3},  incrementSinceLastSink: {4}.",
+                    counter.Name, counter.Value, counter.Description, new DateTime(counter.Timestamp), _counterMap[counter.Name].IncrementSinceLastSink);
+            }
+        }
+
+        /// <summary>
+        /// Reset increment since last sink for each counter
+        /// </summary>
+        internal void Reset()
+        {
+            foreach (var c in _counterMap.Values)
+            {
+                c.ResetSinceLastSink();
+            }
+        }
+
+        /// <summary>
+        /// Convert the counter data into ISet for sink
+        /// </summary>
+        /// <returns></returns>
+        internal ISet<KeyValuePair<string, string>> GetCounterData()
+        {
+            var set = new HashSet<KeyValuePair<string, string>>();
+            foreach (var c in _counterMap)
+            {
+                set.Add(c.Value.GetKeyValuePair());
+            }
+            return set;
+        }
+
+        /// <summary>
+        /// The condition that triggers the sink. The condition can be modified later.
+        /// </summary>
+        /// <returns></returns>
+        internal bool TriggerSink(int counterSinkThreshold)
+        {
+            return _counterMap.Values.Sum(e => e.IncrementSinceLastSink) > counterSinkThreshold;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
new file mode 100644
index 0000000..d302812
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// This default IMetricsSink is just an example of IMetricsSink
+    /// Here the data is logged in Sink() method
+    /// It is more useful in test
+    /// </summary>
+    internal sealed class DefaultMetricsSink : IMetricsSink
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(DefaultMetricsSink));
+
+        [Inject]
+        private DefaultMetricsSink()
+        {
+        }
+
+        /// <summary>
+        /// Simple sink for metrics data
+        /// </summary>
+        /// <param name="metrics"></param>
+        public void Sink(ISet<KeyValuePair<string, string>> metrics)
+        {
+            foreach (var m in metrics)
+            {
+                Logger.Log(Level.Info, "Metrics - Name:{0}, Value:{1}.", m.Key, m.Value);
+            }
+        }
+
+        /// <summary>
+        /// This is intentionally empty as we don't have any resource to release in the implementation.
+        /// </summary>
+        public void Dispose()
+        {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
new file mode 100644
index 0000000..b27bd3d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// Interface for metrics sink.
+    /// It is used to output IMRU metrics.
+    /// </summary>
+    [DefaultImplementation(typeof(DefaultMetricsSink))]
+    public interface IMetricsSink : IDisposable
+    {
+        void Sink(ISet<KeyValuePair<string, string>> metrics);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinks.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinks.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinks.cs
new file mode 100644
index 0000000..09b7598
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinks.cs
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// A named parameter for a set of IMetricsSink.
+    /// </summary>
+    [NamedParameter(DefaultClasses = new Type[] { typeof(DefaultMetricsSink) })]
+    public sealed class MetricSinks : Name<ISet<IMetricsSink>>
+    {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
index 7ff3c26..75c8cc2 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
@@ -16,9 +16,9 @@
 // under the License.
 
 using System;
-using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading.Tasks;
 using Org.Apache.REEF.Common.Context;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities;
@@ -34,14 +34,35 @@ namespace Org.Apache.REEF.Common.Telemetry
     internal sealed class MetricsService : IObserver<IContextMessage>
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsService));
-        private readonly IDictionary<string, ICounter> _counters = new ConcurrentDictionary<string, ICounter>();
+
+        /// <summary>
+        /// Contains Counters received in the Metrics service
+        /// </summary>
+        private readonly CountersData _countersData;
+
+        /// <summary>
+        /// A set of metrics sinks
+        /// </summary>
+        private readonly ISet<IMetricsSink> _metricsSinks;
+
+        /// <summary>
+        /// The threshold that triggers the sinks. 
+        /// Currently only one threshold is defined for all the counters. Later, it can be extended to define a threshold per counter.
+        /// </summary>
+        private readonly int _counterSinkThreshold;
 
         /// <summary>
         /// It can be bound with driver configuration as a context message handler
         /// </summary>
         [Inject]
-        private MetricsService()
+        private MetricsService(
+            [Parameter(typeof(MetricSinks))] ISet<IMetricsSink> metricsSinks,
+            [Parameter(typeof(CounterSinkThreshold))] int counterSinkThreshold,
+            CountersData countersData)
         {
+            _metricsSinks = metricsSinks;
+            _counterSinkThreshold = counterSinkThreshold;
+            _countersData = countersData;
         }
 
         /// <summary>
@@ -54,24 +75,34 @@ namespace Org.Apache.REEF.Common.Telemetry
             var counters = new EvaluatorMetrics(msgReceived).GetMetricsCounters();
             Logger.Log(Level.Info, "Received {0} counters with context message: {1}.", counters.GetCounters().Count(), msgReceived);
 
-            foreach (var counter in counters.GetCounters())
+            _countersData.Update(counters);
+
+            if (_countersData.TriggerSink(_counterSinkThreshold))
             {
-                ICounter c;
-                if (_counters.TryGetValue(counter.Name, out c))
+                Sink(_countersData.GetCounterData());
+                _countersData.Reset();
+            }
+        }
+
+        /// <summary>
+        /// Call each Sink to sink the data in the counters
+        /// </summary>
+        private void Sink(ISet<KeyValuePair<string, string>> set)
+        {
+            foreach (var s in _metricsSinks)
+            {
+                try
                 {
-                    //// TODO: [REEF-1748] The following cases need to be considered in determine how to update the counter:
-                    //// if evaluator contains the aggregated values, the value will override existing value
-                    //// if evaluator only keep delta, the value should be added at here. But the value in the evaluator should be reset after message is sent
-                    //// For the counters from multiple evaluators with the same counter name, the value should be aggregated here
-                    //// We also need to consider failure cases.  
-                    _counters[counter.Name] = counter;
+                    Task.Run(() => s.Sink(set));
                 }
-                else
+                catch (Exception e)
                 {
-                    _counters.Add(counter.Name, counter);
+                    Logger.Log(Level.Error, "Exception happens during the sink for Sink {0} with Exception: {1}.", s.GetType().AssemblyQualifiedName, e);
+                }
+                finally
+                {
+                    s.Dispose();
                 }
-
-                Logger.Log(Level.Verbose, "Counter name: {0}, value: {1}, description: {2}, time: {3}.", counter.Name, counter.Value, counter.Description, new DateTime(counter.Timestamp));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
index b7e75d5..957d505 100644
--- a/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
@@ -24,8 +24,14 @@ using Org.Apache.REEF.Tang.Util;
 
 namespace Org.Apache.REEF.Driver
 {
+    /// <summary>
+    /// Configuration module for MetricsService.
+    /// </summary>
     public sealed class MetricsServiceConfigurationModule : ConfigurationModuleBuilder
     {
+        public static readonly OptionalImpl<IMetricsSink> OnMetricsSink = new OptionalImpl<IMetricsSink>();
+        public static readonly OptionalParameter<int> CounterSinkThreshold = new OptionalParameter<int>();
+
         /// <summary>
         /// It provides the configuration for MetricsService
         /// </summary>
@@ -33,6 +39,8 @@ namespace Org.Apache.REEF.Driver
             .BindSetEntry<DriverBridgeConfigurationOptions.ContextMessageHandlers, MetricsService, IObserver<IContextMessage>>(
                 GenericType<DriverBridgeConfigurationOptions.ContextMessageHandlers>.Class,
                 GenericType<MetricsService>.Class)
+            .BindSetEntry(GenericType<MetricSinks>.Class, OnMetricsSink)
+            .BindNamedParameter(GenericType<CounterSinkThreshold>.Class, CounterSinkThreshold)
             .Build();
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/b90a3f69/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
index 12eb9f9..f447f75 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using Org.Apache.REEF.Common.Telemetry;
 using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Interface;
@@ -51,7 +52,10 @@ namespace Org.Apache.REEF.Tests.Functional.Telemetry
                 .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
                 .Build();
 
-            var c2 = MetricsServiceConfigurationModule.ConfigurationModule.Build();
+            var c2 = MetricsServiceConfigurationModule.ConfigurationModule
+                .Set(MetricsServiceConfigurationModule.OnMetricsSink, GenericType<DefaultMetricsSink>.Class)
+                .Set(MetricsServiceConfigurationModule.CounterSinkThreshold, "5")
+                .Build();
 
             return Configurations.Merge(c1, c2);
         }