You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2017/03/10 05:38:19 UTC
reef git commit: [REEF-1734] Sending Evaluator metrics as
ContextMessage and receiving it
Repository: reef
Updated Branches:
refs/heads/master 9ab622fe4 -> bcfafbc34
[REEF-1734] Sending Evaluator metrics as ContextMessage and receiving it
* Add MetricsMessageSender as IContextMessageSource
* Add MetricsService as a ContextMessag ehandler
* Add test case
JIRA:
[REEF-1734](https://issues.apache.org/jira/browse/REEF-1734)
Pull Request:
This closes #1258
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/bcfafbc3
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/bcfafbc3
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/bcfafbc3
Branch: refs/heads/master
Commit: bcfafbc341f17abea6a0836435ae240710783a8c
Parents: 9ab622f
Author: Julia Wang <ju...@apache.org>
Authored: Thu Feb 16 11:46:00 2017 -0800
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Mar 9 21:37:35 2017 -0800
----------------------------------------------------------------------
.../Org.Apache.REEF.Common.csproj | 3 +
.../MessageSenderConfigurationModule.cs | 34 +++++++
.../Telemetry/MetricsMessageSender.cs | 75 ++++++++++++++
.../Telemetry/MetricsService.cs | 86 ++++++++++++++++
.../MetricsServiceConfigurationModule.cs | 38 +++++++
.../Org.Apache.REEF.Driver.csproj | 1 +
.../Functional/Telemetry/MetricsDriver.cs | 102 +++++++++++++++++++
.../Functional/Telemetry/MetricsTask.cs | 61 +++++++++++
.../Functional/Telemetry/TestMetricsMessage.cs | 59 +++++++++++
.../Org.Apache.REEF.Tests.csproj | 3 +
10 files changed, 462 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index dc868af..115672b 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -252,6 +252,9 @@ under the License.
<Compile Include="Telemetry\ICounter.cs" />
<Compile Include="Telemetry\ICounters.cs" />
<Compile Include="Telemetry\IEvaluatorMetrics.cs" />
+ <Compile Include="Telemetry\MessageSenderConfigurationModule.cs" />
+ <Compile Include="Telemetry\MetricsMessageSender.cs" />
+ <Compile Include="Telemetry\MetricsService.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Avro\README.md" />
http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs
new file mode 100644
index 0000000..6230b65
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs
@@ -0,0 +1,34 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// It provides ConfigurationModule for MessageSender
+ /// </summary>
+ public sealed class MessageSenderConfigurationModule : ConfigurationModuleBuilder
+ {
+ public static ConfigurationModule ConfigurationModule = new MessageSenderConfigurationModule()
+ .BindSetEntry<ContextConfigurationOptions.ContextMessageSources, MetricsMessageSender, IContextMessageSource>(
+ GenericType<ContextConfigurationOptions.ContextMessageSources>.Class, GenericType<MetricsMessageSender>.Class)
+ .Build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs
new file mode 100644
index 0000000..640c483
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs
@@ -0,0 +1,75 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// This class implements IContextMessageSource that is responsible to send context message
+ /// </summary>
+ [Unstable("0.16", "The metrics API is in development.")]
+ internal sealed class MetricsMessageSender : IContextMessageSource
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsMessageSender));
+
+ /// <summary>
+ /// IEvaluatorMetrics reference. It keeps the EvaluatorMetrics at context level
+ /// </summary>
+ private readonly IEvaluatorMetrics _evaluatorMetrics;
+
+ /// <summary>
+ /// Id of the context message source
+ /// </summary>
+ private const string MessageSourceId = "ContextMessageSourceID";
+
+ /// <summary>
+ /// The object should be bound as part of the context configuration when submitting context
+ /// </summary>
+ /// <param name="evaluatorMetrics">IEvaluatorMetrics injected to the constructor.</param>
+ [Inject]
+ private MetricsMessageSender(IEvaluatorMetrics evaluatorMetrics)
+ {
+ _evaluatorMetrics = evaluatorMetrics;
+ }
+
+ /// <summary>
+ /// Returns the serialized EvaluatorMetrics as ContextMessage
+ /// </summary>
+ public Optional<ContextMessage> Message
+ {
+ get
+ {
+ var s = _evaluatorMetrics.Serialize();
+ if (s != null)
+ {
+ return Optional<ContextMessage>.Of(
+ ContextMessage.From(MessageSourceId,
+ ByteUtilities.StringToByteArrays(s)));
+ }
+ else
+ {
+ return Optional<ContextMessage>.Empty();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
new file mode 100644
index 0000000..7ff3c26
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
@@ -0,0 +1,86 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// Metrics service. It is also a context message handler.
+ /// </summary>
+ [Unstable("0.16", "This is a simple MetricsService. More functionalities will be added.")]
+ internal sealed class MetricsService : IObserver<IContextMessage>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsService));
+ private readonly IDictionary<string, ICounter> _counters = new ConcurrentDictionary<string, ICounter>();
+
+ /// <summary>
+ /// It can be bound with driver configuration as a context message handler
+ /// </summary>
+ [Inject]
+ private MetricsService()
+ {
+ }
+
+ /// <summary>
+ /// It is called whenever context message is received
+ /// </summary>
+ /// <param name="contextMessage">Serialized EvaluatorMetrics</param>
+ public void OnNext(IContextMessage contextMessage)
+ {
+ var msgReceived = ByteUtilities.ByteArraysToString(contextMessage.Message);
+ var counters = new EvaluatorMetrics(msgReceived).GetMetricsCounters();
+ Logger.Log(Level.Info, "Received {0} counters with context message: {1}.", counters.GetCounters().Count(), msgReceived);
+
+ foreach (var counter in counters.GetCounters())
+ {
+ ICounter c;
+ if (_counters.TryGetValue(counter.Name, out c))
+ {
+ //// TODO: [REEF-1748] The following cases need to be considered in determine how to update the counter:
+ //// if evaluator contains the aggregated values, the value will override existing value
+ //// if evaluator only keep delta, the value should be added at here. But the value in the evaluator should be reset after message is sent
+ //// For the counters from multiple evaluators with the same counter name, the value should be aggregated here
+ //// We also need to consider failure cases.
+ _counters[counter.Name] = counter;
+ }
+ else
+ {
+ _counters.Add(counter.Name, counter);
+ }
+
+ Logger.Log(Level.Verbose, "Counter name: {0}, value: {1}, description: {2}, time: {3}.", counter.Name, counter.Value, counter.Description, new DateTime(counter.Timestamp));
+ }
+ }
+
+ public void OnCompleted()
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
new file mode 100644
index 0000000..b7e75d5
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
@@ -0,0 +1,38 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Telemetry;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Driver
+{
+ public sealed class MetricsServiceConfigurationModule : ConfigurationModuleBuilder
+ {
+ /// <summary>
+ /// It provides the configuration for MetricsService
+ /// </summary>
+ public static ConfigurationModule ConfigurationModule = new MetricsServiceConfigurationModule()
+ .BindSetEntry<DriverBridgeConfigurationOptions.ContextMessageHandlers, MetricsService, IObserver<IContextMessage>>(
+ GenericType<DriverBridgeConfigurationOptions.ContextMessageHandlers>.Class,
+ GenericType<MetricsService>.Class)
+ .Build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
index 61955b9..724582d 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
+++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
@@ -143,6 +143,7 @@ under the License.
<Compile Include="Evaluator\IEvaluatorRequestor.cs" />
<Compile Include="Evaluator\IFailedEvaluator.cs" />
<Compile Include="IJobCancelled.cs" />
+ <Compile Include="MetricsServiceConfigurationModule.cs" />
<Compile Include="Task\JavaTaskException.cs" />
<Compile Include="IDriver.cs" />
<Compile Include="IDriverRestarted.cs" />
http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
new file mode 100644
index 0000000..eb88cf0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
@@ -0,0 +1,102 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Services;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Telemetry;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Tests.Functional.Messaging;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Telemetry
+{
+ class MetricsDriver :
+ IObserver<IDriverStarted>,
+ IObserver<IAllocatedEvaluator>,
+ IObserver<IActiveContext>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(MessageDriver));
+ private readonly IEvaluatorRequestor _evaluatorRequestor;
+
+ [Inject]
+ public MetricsDriver(IEvaluatorRequestor evaluatorRequestor)
+ {
+ _evaluatorRequestor = evaluatorRequestor;
+ }
+
+ public void OnNext(IDriverStarted value)
+ {
+ var request =
+ _evaluatorRequestor.NewBuilder()
+ .SetNumber(1)
+ .SetMegabytes(512)
+ .SetCores(2)
+ .SetRackName("WonderlandRack")
+ .SetEvaluatorBatchId("MetricsEvaluator")
+ .Build();
+ _evaluatorRequestor.Submit(request);
+ }
+
+ public void OnNext(IAllocatedEvaluator value)
+ {
+ Logger.Log(Level.Info, "Received IAllocatedEvaluator");
+ const string contextId = "ContextID";
+
+ var serviceConfiguration = ServiceConfiguration.ConfigurationModule
+ .Build();
+
+ var contextConfiguration1 = ContextConfiguration.ConfigurationModule
+ .Set(ContextConfiguration.Identifier, contextId)
+ ////.Set(ContextConfiguration.OnSendMessage, GenericType<MetricsMessageSender>.Class)
+ .Build();
+
+ var contextConfiguration2 = MessageSenderConfigurationModule.ConfigurationModule.Build();
+
+ var contextConfiguration = Configurations.Merge(contextConfiguration1, contextConfiguration2);
+ value.SubmitContextAndService(contextConfiguration, serviceConfiguration);
+ }
+
+ public void OnNext(IActiveContext activeContext)
+ {
+ Logger.Log(Level.Info, "Received IActiveContext");
+
+ const string taskId = "TaskID";
+ var taskConfiguration = TaskConfiguration.ConfigurationModule
+ .Set(TaskConfiguration.Identifier, taskId)
+ .Set(TaskConfiguration.Task, GenericType<MetricsTask>.Class)
+ .Build();
+ activeContext.SubmitTask(taskConfiguration);
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs
new file mode 100644
index 0000000..88e9461
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs
@@ -0,0 +1,61 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Threading;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Telemetry;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Telemetry
+{
+ /// <summary>
+ /// A test task that is to add counter information during the task execution.
+ /// </summary>
+ public class MetricsTask : ITask
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsTask));
+
+ public const string TestCounter1 = "TestCounter1";
+ public const string TestCounter2 = "TestCounter2";
+
+ private readonly ICounters _counters;
+
+ [Inject]
+ private MetricsTask(IEvaluatorMetrics evaluatorMetrics)
+ {
+ _counters = evaluatorMetrics.GetMetricsCounters();
+ _counters.TryRegisterCounter(TestCounter1, "This is " + TestCounter1);
+ _counters.TryRegisterCounter(TestCounter2, "This is " + TestCounter2);
+ }
+
+ public byte[] Call(byte[] memento)
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ _counters.Increment(TestCounter1, 1);
+ _counters.Increment(TestCounter2, 2);
+ Thread.Sleep(100);
+ }
+ return null;
+ }
+
+ public void Dispose()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
new file mode 100644
index 0000000..12eb9f9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
@@ -0,0 +1,59 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Telemetry
+{
+ [Collection("FunctionalTests")]
+ public class TestMetricsMessage : ReefFunctionalTest
+ {
+ [Fact]
+ [Trait("Priority", "1")]
+ [Trait("Category", "FunctionalGated")]
+ [Trait("Description", "Test Evaluator Metrics send from evaluator to Metrics Service.")]
+ public void TestMetricsMessages()
+ {
+ string testFolder = DefaultRuntimeFolder + TestId;
+ TestRun(DriverConfigurations(), typeof(MetricsDriver), 1, "sendMessages", "local", testFolder);
+ ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
+ string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
+ var receivedCounterMessage = GetMessageCount(lines, "Received 2 counters with context message:");
+ Assert.True(receivedCounterMessage > 1);
+ CleanUp(testFolder);
+ }
+
+ private static IConfiguration DriverConfigurations()
+ {
+ var c1 = DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted, GenericType<MetricsDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<MetricsDriver>.Class)
+ .Set(DriverConfiguration.OnContextActive, GenericType<MetricsDriver>.Class)
+ .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
+ .Build();
+
+ var c2 = MetricsServiceConfigurationModule.ConfigurationModule.Build();
+
+ return Configurations.Merge(c1, c2);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bcfafbc3/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index a9f034f..9a35b98 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -161,6 +161,9 @@ under the License.
<Compile Include="Functional\RuntimeName\RuntimeNameTask.cs" />
<Compile Include="Functional\RuntimeName\RuntimeNameTest.cs" />
<Compile Include="Functional\Bridge\TestDisposeTasks.cs" />
+ <Compile Include="Functional\Telemetry\MetricsDriver.cs" />
+ <Compile Include="Functional\Telemetry\MetricsTask.cs" />
+ <Compile Include="Functional\Telemetry\TestMetricsMessage.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Utility\TestDriverConfigGenerator.cs" />
<Compile Include="Utility\TestExceptions.cs" />