You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2017/03/10 05:38:19 UTC

reef git commit: [REEF-1734] Sending Evaluator metrics as ContextMessage and receiving it

Repository: reef
Updated Branches:
  refs/heads/master 9ab622fe4 -> bcfafbc34


[REEF-1734] Sending Evaluator metrics as ContextMessage and receiving it

  * Add MetricsMessageSender as IContextMessageSource
  * Add MetricsService as a ContextMessag ehandler
  * Add test case

JIRA:
  [REEF-1734](https://issues.apache.org/jira/browse/REEF-1734)

Pull Request:
  This closes #1258


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/bcfafbc3
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/bcfafbc3
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/bcfafbc3

Branch: refs/heads/master
Commit: bcfafbc341f17abea6a0836435ae240710783a8c
Parents: 9ab622f
Author: Julia Wang <ju...@apache.org>
Authored: Thu Feb 16 11:46:00 2017 -0800
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Mar 9 21:37:35 2017 -0800

----------------------------------------------------------------------
 .../Org.Apache.REEF.Common.csproj               |   3 +
 .../MessageSenderConfigurationModule.cs         |  34 +++++++
 .../Telemetry/MetricsMessageSender.cs           |  75 ++++++++++++++
 .../Telemetry/MetricsService.cs                 |  86 ++++++++++++++++
 .../MetricsServiceConfigurationModule.cs        |  38 +++++++
 .../Org.Apache.REEF.Driver.csproj               |   1 +
 .../Functional/Telemetry/MetricsDriver.cs       | 102 +++++++++++++++++++
 .../Functional/Telemetry/MetricsTask.cs         |  61 +++++++++++
 .../Functional/Telemetry/TestMetricsMessage.cs  |  59 +++++++++++
 .../Org.Apache.REEF.Tests.csproj                |   3 +
 10 files changed, 462 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index dc868af..115672b 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -252,6 +252,9 @@ under the License.
     <Compile Include="Telemetry\ICounter.cs" />
     <Compile Include="Telemetry\ICounters.cs" />
     <Compile Include="Telemetry\IEvaluatorMetrics.cs" />
+    <Compile Include="Telemetry\MessageSenderConfigurationModule.cs" />
+    <Compile Include="Telemetry\MetricsMessageSender.cs" />
+    <Compile Include="Telemetry\MetricsService.cs" />
   </ItemGroup>
   <ItemGroup>
     <None Include="Avro\README.md" />

http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs
new file mode 100644
index 0000000..6230b65
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs
@@ -0,0 +1,34 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// It provides ConfigurationModule for MessageSender
+    /// </summary>
+    public sealed class MessageSenderConfigurationModule : ConfigurationModuleBuilder
+    {
+        public static ConfigurationModule ConfigurationModule = new MessageSenderConfigurationModule()
+            .BindSetEntry<ContextConfigurationOptions.ContextMessageSources, MetricsMessageSender, IContextMessageSource>(
+                GenericType<ContextConfigurationOptions.ContextMessageSources>.Class, GenericType<MetricsMessageSender>.Class)
+            .Build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs
new file mode 100644
index 0000000..640c483
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs
@@ -0,0 +1,75 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// This class implements IContextMessageSource that is responsible to send context message
+    /// </summary>
+    [Unstable("0.16", "The metrics API is in development.")]
+    internal sealed class MetricsMessageSender : IContextMessageSource
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsMessageSender));
+
+        /// <summary>
+        /// IEvaluatorMetrics reference. It keeps the EvaluatorMetrics at context level
+        /// </summary>
+        private readonly IEvaluatorMetrics _evaluatorMetrics;
+
+        /// <summary>
+        /// Id of the context message source
+        /// </summary>
+        private const string MessageSourceId = "ContextMessageSourceID";
+
+        /// <summary>
+        /// The object should be bound as part of the context configuration when submitting context
+        /// </summary>
+        /// <param name="evaluatorMetrics">IEvaluatorMetrics injected to the constructor.</param>
+        [Inject]
+        private MetricsMessageSender(IEvaluatorMetrics evaluatorMetrics)
+        {
+            _evaluatorMetrics = evaluatorMetrics;
+        }
+
+        /// <summary>
+        /// Returns the serialized EvaluatorMetrics as ContextMessage
+        /// </summary>
+        public Optional<ContextMessage> Message
+        {
+            get
+            {
+                var s = _evaluatorMetrics.Serialize();
+                if (s != null)
+                {
+                    return Optional<ContextMessage>.Of(
+                        ContextMessage.From(MessageSourceId,
+                            ByteUtilities.StringToByteArrays(s)));
+                }
+                else
+                {
+                    return Optional<ContextMessage>.Empty();
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
new file mode 100644
index 0000000..7ff3c26
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
@@ -0,0 +1,86 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// Metrics service. It is also a context message handler.
+    /// </summary>
+    [Unstable("0.16", "This is a simple MetricsService. More functionalities will be added.")]
+    internal sealed class MetricsService : IObserver<IContextMessage>
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsService));
+        private readonly IDictionary<string, ICounter> _counters = new ConcurrentDictionary<string, ICounter>();
+
+        /// <summary>
+        /// It can be bound with driver configuration as a context message handler
+        /// </summary>
+        [Inject]
+        private MetricsService()
+        {
+        }
+
+        /// <summary>
+        /// It is called whenever context message is received
+        /// </summary>
+        /// <param name="contextMessage">Serialized EvaluatorMetrics</param>
+        public void OnNext(IContextMessage contextMessage)
+        {
+            var msgReceived = ByteUtilities.ByteArraysToString(contextMessage.Message);
+            var counters = new EvaluatorMetrics(msgReceived).GetMetricsCounters();
+            Logger.Log(Level.Info, "Received {0} counters with context message: {1}.", counters.GetCounters().Count(), msgReceived);
+
+            foreach (var counter in counters.GetCounters())
+            {
+                ICounter c;
+                if (_counters.TryGetValue(counter.Name, out c))
+                {
+                    //// TODO: [REEF-1748] The following cases need to be considered in determine how to update the counter:
+                    //// if evaluator contains the aggregated values, the value will override existing value
+                    //// if evaluator only keep delta, the value should be added at here. But the value in the evaluator should be reset after message is sent
+                    //// For the counters from multiple evaluators with the same counter name, the value should be aggregated here
+                    //// We also need to consider failure cases.  
+                    _counters[counter.Name] = counter;
+                }
+                else
+                {
+                    _counters.Add(counter.Name, counter);
+                }
+
+                Logger.Log(Level.Verbose, "Counter name: {0}, value: {1}, description: {2}, time: {3}.", counter.Name, counter.Value, counter.Description, new DateTime(counter.Timestamp));
+            }
+        }
+
+        public void OnCompleted()
+        {
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
new file mode 100644
index 0000000..b7e75d5
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
@@ -0,0 +1,38 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Telemetry;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Driver
+{
+    public sealed class MetricsServiceConfigurationModule : ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// It provides the configuration for MetricsService
+        /// </summary>
+        public static ConfigurationModule ConfigurationModule = new MetricsServiceConfigurationModule()
+            .BindSetEntry<DriverBridgeConfigurationOptions.ContextMessageHandlers, MetricsService, IObserver<IContextMessage>>(
+                GenericType<DriverBridgeConfigurationOptions.ContextMessageHandlers>.Class,
+                GenericType<MetricsService>.Class)
+            .Build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
index 61955b9..724582d 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
+++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
@@ -143,6 +143,7 @@ under the License.
     <Compile Include="Evaluator\IEvaluatorRequestor.cs" />
     <Compile Include="Evaluator\IFailedEvaluator.cs" />
     <Compile Include="IJobCancelled.cs" />
+    <Compile Include="MetricsServiceConfigurationModule.cs" />
     <Compile Include="Task\JavaTaskException.cs" />
     <Compile Include="IDriver.cs" />
     <Compile Include="IDriverRestarted.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
new file mode 100644
index 0000000..eb88cf0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
@@ -0,0 +1,102 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Services;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Telemetry;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Tests.Functional.Messaging;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Telemetry
+{
+    class MetricsDriver :
+        IObserver<IDriverStarted>,
+        IObserver<IAllocatedEvaluator>,
+        IObserver<IActiveContext>
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(MessageDriver));
+        private readonly IEvaluatorRequestor _evaluatorRequestor;
+
+        [Inject]
+        public MetricsDriver(IEvaluatorRequestor evaluatorRequestor)
+        {
+            _evaluatorRequestor = evaluatorRequestor;
+        }
+
+        public void OnNext(IDriverStarted value)
+        {
+            var request =
+                _evaluatorRequestor.NewBuilder()
+                    .SetNumber(1)
+                    .SetMegabytes(512)
+                    .SetCores(2)
+                    .SetRackName("WonderlandRack")
+                    .SetEvaluatorBatchId("MetricsEvaluator")
+                    .Build();
+            _evaluatorRequestor.Submit(request);
+        }
+
+        public void OnNext(IAllocatedEvaluator value)
+        {
+            Logger.Log(Level.Info, "Received IAllocatedEvaluator");
+            const string contextId = "ContextID";
+
+            var serviceConfiguration = ServiceConfiguration.ConfigurationModule
+                .Build();
+
+            var contextConfiguration1 = ContextConfiguration.ConfigurationModule
+                .Set(ContextConfiguration.Identifier, contextId)
+                ////.Set(ContextConfiguration.OnSendMessage, GenericType<MetricsMessageSender>.Class)
+                .Build();
+
+            var contextConfiguration2 = MessageSenderConfigurationModule.ConfigurationModule.Build();
+
+            var contextConfiguration = Configurations.Merge(contextConfiguration1, contextConfiguration2);
+            value.SubmitContextAndService(contextConfiguration, serviceConfiguration);
+        }
+
+        public void OnNext(IActiveContext activeContext)
+        {
+            Logger.Log(Level.Info, "Received IActiveContext");
+
+            const string taskId = "TaskID";
+            var taskConfiguration = TaskConfiguration.ConfigurationModule
+                .Set(TaskConfiguration.Identifier, taskId)
+                .Set(TaskConfiguration.Task, GenericType<MetricsTask>.Class)
+                .Build();
+            activeContext.SubmitTask(taskConfiguration);
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs
new file mode 100644
index 0000000..88e9461
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs
@@ -0,0 +1,61 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Threading;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Telemetry;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Telemetry
+{
+    /// <summary>
+    /// A test task that is to add counter information during the task execution.
+    /// </summary>
+    public class MetricsTask : ITask
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsTask));
+
+        public const string TestCounter1 = "TestCounter1";
+        public const string TestCounter2 = "TestCounter2";
+
+        private readonly ICounters _counters;
+
+        [Inject]
+        private MetricsTask(IEvaluatorMetrics evaluatorMetrics)
+        {
+            _counters = evaluatorMetrics.GetMetricsCounters();
+            _counters.TryRegisterCounter(TestCounter1, "This is " + TestCounter1);
+            _counters.TryRegisterCounter(TestCounter2, "This is " + TestCounter2);
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            for (int i = 0; i < 100; i++)
+            {
+                _counters.Increment(TestCounter1, 1);
+                _counters.Increment(TestCounter2, 2);
+                Thread.Sleep(100);
+            }
+            return null;
+        }
+
+        public void Dispose()
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
new file mode 100644
index 0000000..12eb9f9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
@@ -0,0 +1,59 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Telemetry
+{
+    [Collection("FunctionalTests")]
+    public class TestMetricsMessage : ReefFunctionalTest
+    {
+        [Fact]
+        [Trait("Priority", "1")]
+        [Trait("Category", "FunctionalGated")]
+        [Trait("Description", "Test Evaluator Metrics send from evaluator to Metrics Service.")]
+        public void TestMetricsMessages()
+        {
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestRun(DriverConfigurations(), typeof(MetricsDriver), 1, "sendMessages", "local", testFolder);
+            ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
+            var receivedCounterMessage = GetMessageCount(lines, "Received 2 counters with context message:");
+            Assert.True(receivedCounterMessage > 1);
+            CleanUp(testFolder);
+        }
+
+        private static IConfiguration DriverConfigurations()
+        {
+            var c1 = DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, GenericType<MetricsDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<MetricsDriver>.Class)
+                .Set(DriverConfiguration.OnContextActive, GenericType<MetricsDriver>.Class)
+                .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
+                .Build();
+
+            var c2 = MetricsServiceConfigurationModule.ConfigurationModule.Build();
+
+            return Configurations.Merge(c1, c2);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index a9f034f..9a35b98 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -161,6 +161,9 @@ under the License.
     <Compile Include="Functional\RuntimeName\RuntimeNameTask.cs" />
     <Compile Include="Functional\RuntimeName\RuntimeNameTest.cs" />
     <Compile Include="Functional\Bridge\TestDisposeTasks.cs" />
+    <Compile Include="Functional\Telemetry\MetricsDriver.cs" />
+    <Compile Include="Functional\Telemetry\MetricsTask.cs" />
+    <Compile Include="Functional\Telemetry\TestMetricsMessage.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="Utility\TestDriverConfigGenerator.cs" />
     <Compile Include="Utility\TestExceptions.cs" />