You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2018/03/10 01:33:14 UTC

[05/28] reef git commit: [REEF-1733] Define Driver Metrics and observer

[REEF-1733] Define Driver Metrics and observer

   * Add interface for DriverMetrics and impl
   * Add DriverMetricsObservers and configuration module
   * Let MetricsService implement IDriverMetrics observer
   * Update test driver and test case

JIRA: [REEF-1733](https://issues.apache.org/jira/browse/REEF-1733)

This closes #1342


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/af6c39a2
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/af6c39a2
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/af6c39a2

Branch: refs/heads/REEF-335
Commit: af6c39a214321f7c1969baf694641af3ee9ce828
Parents: e90ffbe
Author: Julia Wang <ju...@apache.org>
Authored: Tue Jul 25 19:29:42 2017 -0700
Committer: Sergiy Matusevych <mo...@apache.org>
Committed: Wed Nov 1 12:06:55 2017 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Common.csproj               |  4 ++
 .../Telemetry/CountersData.cs                   |  9 +--
 .../Telemetry/DefaultMetricsSink.cs             |  4 +-
 .../Telemetry/DriverMetrics.cs                  | 39 +++++++++++++
 .../DriverMetricsObserverConfigurationModule.cs | 45 +++++++++++++++
 .../Telemetry/DriverMetricsObservers.cs         | 31 ++++++++++
 .../Telemetry/IDriverMetrics.cs                 | 38 +++++++++++++
 .../Telemetry/IMetricsSink.cs                   |  2 +-
 .../Telemetry/MetricsService.cs                 | 34 ++++++++---
 .../Functional/Telemetry/MetricsDriver.cs       | 60 ++++++++++++++++++--
 .../Functional/Telemetry/TestMetricsMessage.cs  | 13 ++++-
 11 files changed, 255 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 518bdfb..8a82948 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -253,9 +253,13 @@ under the License.
     <Compile Include="Telemetry\Counters.cs" />
     <Compile Include="Telemetry\CounterSinkThreshold.cs" />
     <Compile Include="Telemetry\DefaultMetricsSink.cs" />
+    <Compile Include="Telemetry\DriverMetrics.cs" />
+    <Compile Include="Telemetry\DriverMetricsObserverConfigurationModule.cs" />
+    <Compile Include="Telemetry\DriverMetricsObservers.cs" />
     <Compile Include="Telemetry\EvaluatorMetrics.cs" />
     <Compile Include="Telemetry\ICounter.cs" />
     <Compile Include="Telemetry\ICounters.cs" />
+    <Compile Include="Telemetry\IDriverMetrics.cs" />
     <Compile Include="Telemetry\IEvaluatorMetrics.cs" />
     <Compile Include="Telemetry\IMetricsSink.cs" />
     <Compile Include="Telemetry\MessageSenderConfigurationModule.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
index 55393b0..b8c22c8 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
@@ -81,14 +81,9 @@ namespace Org.Apache.REEF.Common.Telemetry
         /// Convert the counter data into ISet for sink
         /// </summary>
         /// <returns></returns>
-        internal ISet<KeyValuePair<string, string>> GetCounterData()
+        internal IEnumerable<KeyValuePair<string, string>> GetCounterData()
         {
-            var set = new HashSet<KeyValuePair<string, string>>();
-            foreach (var c in _counterMap)
-            {
-                set.Add(c.Value.GetKeyValuePair());
-            }
-            return set;
+            return _counterMap.Select(counter => counter.Value.GetKeyValuePair());
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
index d302812..7f4fd95 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
@@ -38,8 +38,8 @@ namespace Org.Apache.REEF.Common.Telemetry
         /// <summary>
         /// Simple sink for metrics data
         /// </summary>
-        /// <param name="metrics"></param>
-        public void Sink(ISet<KeyValuePair<string, string>> metrics)
+        /// <param name="metrics">A collection of metrics data in Key value pair format.</param>
+        public void Sink(IEnumerable<KeyValuePair<string, string>> metrics)
         {
             foreach (var m in metrics)
             {

http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs
new file mode 100644
index 0000000..2d634e3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// A simple driver metrics.
+    /// It contains system state for now.
+    /// It can be extended later to include more driver metrics data.
+    /// </summary>
+    public sealed class DriverMetrics : IDriverMetrics
+    {
+        public DriverMetrics(string systemState, DateTime timeUpdated)
+        {
+            SystemState = systemState;
+            TimeUpdated = timeUpdated;
+        }
+
+        public string SystemState { get; private set; }
+
+        public DateTime TimeUpdated { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs
new file mode 100644
index 0000000..d7e00e5
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// It provides ConfigurationModule for DriverMetrics observers
+    /// </summary>
+    public sealed class DriverMetricsObserverConfigurationModule : ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// Observer of driver metrics
+        /// </summary>
+        public static readonly OptionalImpl<IObserver<IDriverMetrics>> OnDriverMetrics =
+            new OptionalImpl<IObserver<IDriverMetrics>>();
+
+        /// <summary>
+        /// Configuration module for driver metrics observer.
+        /// MetricsService is added as an observer.
+        /// User can set more observers with this configuration module.
+        /// </summary>
+        public readonly static ConfigurationModule ConfigurationModule = new DriverMetricsObserverConfigurationModule()
+            .BindSetEntry(GenericType<DriverMetricsObservers>.Class, OnDriverMetrics)
+            .BindSetEntry<DriverMetricsObservers, MetricsService, IObserver<IDriverMetrics>>(GenericType<DriverMetricsObservers>.Class, GenericType<MetricsService>.Class)
+            .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs
new file mode 100644
index 0000000..21a3f4a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// Named parameter for a set of driver metrics observers
+    /// </summary>
+    [NamedParameter(documentation: "Observers for Driver Metrics", shortName: "DriverMetricsObservers")]
+    public class DriverMetricsObservers : Name<ISet<IObserver<IDriverMetrics>>>
+    {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs
new file mode 100644
index 0000000..4f2c05d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    [Unstable("0.16", "This is to build a simple metrics with system state only. More metrics will be added in future.")]
+    [DefaultImplementation(typeof(DriverMetrics))]
+    public interface IDriverMetrics
+    {
+        /// <summary>
+        /// System state
+        /// </summary>
+        string SystemState { get; }
+
+        /// <summary>
+        /// DateTime that the system state is updated
+        /// </summary>
+        DateTime TimeUpdated { get; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
index b27bd3d..eef54db 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
@@ -28,6 +28,6 @@ namespace Org.Apache.REEF.Common.Telemetry
     [DefaultImplementation(typeof(DefaultMetricsSink))]
     public interface IMetricsSink : IDisposable
     {
-        void Sink(ISet<KeyValuePair<string, string>> metrics);
+        void Sink(IEnumerable<KeyValuePair<string, string>> metrics);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
index 75c8cc2..d0bf196 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
@@ -31,7 +31,7 @@ namespace Org.Apache.REEF.Common.Telemetry
     /// Metrics service. It is also a context message handler.
     /// </summary>
     [Unstable("0.16", "This is a simple MetricsService. More functionalities will be added.")]
-    internal sealed class MetricsService : IObserver<IContextMessage>
+    internal sealed class MetricsService : IObserver<IContextMessage>, IObserver<IDriverMetrics>
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsService));
 
@@ -47,7 +47,8 @@ namespace Org.Apache.REEF.Common.Telemetry
 
         /// <summary>
         /// The threshold that triggers the sinks. 
-        /// Currently only one threshold is defined for all the counters. Later, it can be extended to define a threshold per counter.
+        /// Currently only one threshold is defined for all the counters.
+        /// Later, it can be extended to define a threshold per counter.
         /// </summary>
         private readonly int _counterSinkThreshold;
 
@@ -73,7 +74,9 @@ namespace Org.Apache.REEF.Common.Telemetry
         {
             var msgReceived = ByteUtilities.ByteArraysToString(contextMessage.Message);
             var counters = new EvaluatorMetrics(msgReceived).GetMetricsCounters();
-            Logger.Log(Level.Info, "Received {0} counters with context message: {1}.", counters.GetCounters().Count(), msgReceived);
+
+            Logger.Log(Level.Verbose, "Received {0} counters with context message: {1}.",
+                counters.GetCounters().Count(), msgReceived);
 
             _countersData.Update(counters);
 
@@ -87,17 +90,17 @@ namespace Org.Apache.REEF.Common.Telemetry
         /// <summary>
         /// Call each Sink to sink the data in the counters
         /// </summary>
-        private void Sink(ISet<KeyValuePair<string, string>> set)
+        private void Sink(IEnumerable<KeyValuePair<string, string>> metrics)
         {
             foreach (var s in _metricsSinks)
             {
                 try
                 {
-                    Task.Run(() => s.Sink(set));
+                    Task.Run(() => s.Sink(metrics));
                 }
                 catch (Exception e)
                 {
-                    Logger.Log(Level.Error, "Exception happens during the sink for Sink {0} with Exception: {1}.", s.GetType().AssemblyQualifiedName, e);
+                    Logger.Log(Level.Error, "Exception in Sink " + s.GetType().AssemblyQualifiedName, e);
                 }
                 finally
                 {
@@ -108,10 +111,27 @@ namespace Org.Apache.REEF.Common.Telemetry
 
         public void OnCompleted()
         {
+            Logger.Log(Level.Info, "Completed");
         }
 
         public void OnError(Exception error)
         {
+            Logger.Log(Level.Error, "MetricService error", error);
+        }
+
+        /// <summary>
+        /// Observer of IDriverMetrics.
+        /// When Driver metrics data is changed, this method will be called.
+        /// It calls Sink to store/log the metrics data.
+        /// </summary>
+        /// <param name="driverMetrics">driver metrics data.</param>
+        public void OnNext(IDriverMetrics driverMetrics)
+        {
+            Sink(new Dictionary<string, string>()
+            {
+                { "SystemState", driverMetrics.SystemState },
+                { "TimeUpdated", driverMetrics.TimeUpdated.ToLongTimeString() }
+            });
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
index eb88cf0..73df585 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Collections.Generic;
 using Org.Apache.REEF.Common.Context;
 using Org.Apache.REEF.Common.Services;
 using Org.Apache.REEF.Common.Tasks;
@@ -23,6 +24,7 @@ using Org.Apache.REEF.Common.Telemetry;
 using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Util;
@@ -31,22 +33,42 @@ using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Tests.Functional.Telemetry
 {
+    /// <summary>
+    /// Test driver to test metrics
+    /// </summary>
     class MetricsDriver :
         IObserver<IDriverStarted>,
         IObserver<IAllocatedEvaluator>,
-        IObserver<IActiveContext>
+        IObserver<IActiveContext>,
+        IObserver<ICompletedTask>
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(MessageDriver));
         private readonly IEvaluatorRequestor _evaluatorRequestor;
+        internal const string EventPrefix = "TestState";
 
+        /// <summary>
+        /// a set of driver metrics observers.
+        /// </summary>
+        private readonly ISet<IObserver<IDriverMetrics>> _driverMetricsObservers;
+
+        /// <summary>
+        /// This driver inject DriverMetricsObservers and IDriverMetrics.
+        /// It keeps updating the driver metrics when receiving events. 
+        /// </summary>
+        /// <param name="evaluatorRequestor"></param>
+        /// <param name="driverMetricsObservers"></param>
         [Inject]
-        public MetricsDriver(IEvaluatorRequestor evaluatorRequestor)
+        public MetricsDriver(IEvaluatorRequestor evaluatorRequestor,
+            [Parameter(typeof(DriverMetricsObservers))] ISet<IObserver<IDriverMetrics>> driverMetricsObservers)
         {
             _evaluatorRequestor = evaluatorRequestor;
+            _driverMetricsObservers = driverMetricsObservers;
         }
 
         public void OnNext(IDriverStarted value)
         {
+            UpdateMetrics(TestSystemState.DriverStarted);
+
             var request =
                 _evaluatorRequestor.NewBuilder()
                     .SetNumber(1)
@@ -61,14 +83,14 @@ namespace Org.Apache.REEF.Tests.Functional.Telemetry
         public void OnNext(IAllocatedEvaluator value)
         {
             Logger.Log(Level.Info, "Received IAllocatedEvaluator");
-            const string contextId = "ContextID";
+            UpdateMetrics(TestSystemState.EvaluatorAllocated);
 
+            const string contextId = "ContextID";
             var serviceConfiguration = ServiceConfiguration.ConfigurationModule
                 .Build();
 
             var contextConfiguration1 = ContextConfiguration.ConfigurationModule
                 .Set(ContextConfiguration.Identifier, contextId)
-                ////.Set(ContextConfiguration.OnSendMessage, GenericType<MetricsMessageSender>.Class)
                 .Build();
 
             var contextConfiguration2 = MessageSenderConfigurationModule.ConfigurationModule.Build();
@@ -80,6 +102,7 @@ namespace Org.Apache.REEF.Tests.Functional.Telemetry
         public void OnNext(IActiveContext activeContext)
         {
             Logger.Log(Level.Info, "Received IActiveContext");
+            UpdateMetrics(TestSystemState.ActiveContextReceived);
 
             const string taskId = "TaskID";
             var taskConfiguration = TaskConfiguration.ConfigurationModule
@@ -89,6 +112,14 @@ namespace Org.Apache.REEF.Tests.Functional.Telemetry
             activeContext.SubmitTask(taskConfiguration);
         }
 
+        public void OnNext(ICompletedTask value)
+        {
+            Logger.Log(Level.Info, "Received ICompletedTask");
+            UpdateMetrics(TestSystemState.TaskCompleted);
+
+            value.ActiveContext.Dispose();
+        }
+
         public void OnCompleted()
         {
             throw new NotImplementedException();
@@ -98,5 +129,26 @@ namespace Org.Apache.REEF.Tests.Functional.Telemetry
         {
             throw new NotImplementedException();
         }
+
+        /// <summary>
+        /// Call metrics observers with driver metrics data
+        /// </summary>
+        private void UpdateMetrics(TestSystemState systemState)
+        {
+            var driverMetrics = new DriverMetrics(EventPrefix + systemState, DateTime.Now);
+
+            foreach (var metricsObserver in _driverMetricsObservers)
+            {
+                metricsObserver.OnNext(driverMetrics);
+            }
+        }
+    }
+
+    internal enum TestSystemState
+    {
+        DriverStarted,
+        EvaluatorAllocated,
+        ActiveContextReceived,
+        TaskCompleted
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
index f447f75..0f10ac8 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
@@ -40,24 +40,31 @@ namespace Org.Apache.REEF.Tests.Functional.Telemetry
             string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
             var receivedCounterMessage = GetMessageCount(lines, "Received 2 counters with context message:");
             Assert.True(receivedCounterMessage > 1);
+
+            var messageCount = GetMessageCount(lines, MetricsDriver.EventPrefix);
+            Assert.Equal(4, messageCount);
+
             CleanUp(testFolder);
         }
 
         private static IConfiguration DriverConfigurations()
         {
-            var c1 = DriverConfiguration.ConfigurationModule
+            var driverBasicConfig = DriverConfiguration.ConfigurationModule
                 .Set(DriverConfiguration.OnDriverStarted, GenericType<MetricsDriver>.Class)
                 .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<MetricsDriver>.Class)
                 .Set(DriverConfiguration.OnContextActive, GenericType<MetricsDriver>.Class)
+                .Set(DriverConfiguration.OnTaskCompleted, GenericType<MetricsDriver>.Class)
                 .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
                 .Build();
 
-            var c2 = MetricsServiceConfigurationModule.ConfigurationModule
+            var metricServiceConfig = MetricsServiceConfigurationModule.ConfigurationModule
                 .Set(MetricsServiceConfigurationModule.OnMetricsSink, GenericType<DefaultMetricsSink>.Class)
                 .Set(MetricsServiceConfigurationModule.CounterSinkThreshold, "5")
                 .Build();
 
-            return Configurations.Merge(c1, c2);
+            var driverMetricConfig = DriverMetricsObserverConfigurationModule.ConfigurationModule.Build();
+
+            return Configurations.Merge(driverBasicConfig, metricServiceConfig, driverMetricConfig);
         }
     }
 }