You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2018/03/10 01:33:14 UTC
[05/28] reef git commit: [REEF-1733] Define Driver Metrics and
observer
[REEF-1733] Define Driver Metrics and observer
* Add interface for DriverMetrics and impl
* Add DriverMetricsObservers and configuration module
* Let MetricsService implement IDriverMetrics observer
* Update test driver and test case
JIRA: [REEF-1733](https://issues.apache.org/jira/browse/REEF-1733)
This closes #1342
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/af6c39a2
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/af6c39a2
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/af6c39a2
Branch: refs/heads/REEF-335
Commit: af6c39a214321f7c1969baf694641af3ee9ce828
Parents: e90ffbe
Author: Julia Wang <ju...@apache.org>
Authored: Tue Jul 25 19:29:42 2017 -0700
Committer: Sergiy Matusevych <mo...@apache.org>
Committed: Wed Nov 1 12:06:55 2017 -0700
----------------------------------------------------------------------
.../Org.Apache.REEF.Common.csproj | 4 ++
.../Telemetry/CountersData.cs | 9 +--
.../Telemetry/DefaultMetricsSink.cs | 4 +-
.../Telemetry/DriverMetrics.cs | 39 +++++++++++++
.../DriverMetricsObserverConfigurationModule.cs | 45 +++++++++++++++
.../Telemetry/DriverMetricsObservers.cs | 31 ++++++++++
.../Telemetry/IDriverMetrics.cs | 38 +++++++++++++
.../Telemetry/IMetricsSink.cs | 2 +-
.../Telemetry/MetricsService.cs | 34 ++++++++---
.../Functional/Telemetry/MetricsDriver.cs | 60 ++++++++++++++++++--
.../Functional/Telemetry/TestMetricsMessage.cs | 13 ++++-
11 files changed, 255 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 518bdfb..8a82948 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -253,9 +253,13 @@ under the License.
<Compile Include="Telemetry\Counters.cs" />
<Compile Include="Telemetry\CounterSinkThreshold.cs" />
<Compile Include="Telemetry\DefaultMetricsSink.cs" />
+ <Compile Include="Telemetry\DriverMetrics.cs" />
+ <Compile Include="Telemetry\DriverMetricsObserverConfigurationModule.cs" />
+ <Compile Include="Telemetry\DriverMetricsObservers.cs" />
<Compile Include="Telemetry\EvaluatorMetrics.cs" />
<Compile Include="Telemetry\ICounter.cs" />
<Compile Include="Telemetry\ICounters.cs" />
+ <Compile Include="Telemetry\IDriverMetrics.cs" />
<Compile Include="Telemetry\IEvaluatorMetrics.cs" />
<Compile Include="Telemetry\IMetricsSink.cs" />
<Compile Include="Telemetry\MessageSenderConfigurationModule.cs" />
http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
index 55393b0..b8c22c8 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
@@ -81,14 +81,9 @@ namespace Org.Apache.REEF.Common.Telemetry
/// Convert the counter data into ISet for sink
/// </summary>
/// <returns></returns>
- internal ISet<KeyValuePair<string, string>> GetCounterData()
+ internal IEnumerable<KeyValuePair<string, string>> GetCounterData()
{
- var set = new HashSet<KeyValuePair<string, string>>();
- foreach (var c in _counterMap)
- {
- set.Add(c.Value.GetKeyValuePair());
- }
- return set;
+ return _counterMap.Select(counter => counter.Value.GetKeyValuePair());
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
index d302812..7f4fd95 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
@@ -38,8 +38,8 @@ namespace Org.Apache.REEF.Common.Telemetry
/// <summary>
/// Simple sink for metrics data
/// </summary>
- /// <param name="metrics"></param>
- public void Sink(ISet<KeyValuePair<string, string>> metrics)
+ /// <param name="metrics">A collection of metrics data in Key value pair format.</param>
+ public void Sink(IEnumerable<KeyValuePair<string, string>> metrics)
{
foreach (var m in metrics)
{
http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs
new file mode 100644
index 0000000..2d634e3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// A simple driver metrics.
+ /// It contains system state for now.
+ /// It can be extended later to include more driver metrics data.
+ /// </summary>
+ public sealed class DriverMetrics : IDriverMetrics
+ {
+ public DriverMetrics(string systemState, DateTime timeUpdated)
+ {
+ SystemState = systemState;
+ TimeUpdated = timeUpdated;
+ }
+
+ public string SystemState { get; private set; }
+
+ public DateTime TimeUpdated { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs
new file mode 100644
index 0000000..d7e00e5
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// It provides ConfigurationModule for DriverMetrics observers
+ /// </summary>
+ public sealed class DriverMetricsObserverConfigurationModule : ConfigurationModuleBuilder
+ {
+ /// <summary>
+ /// Observer of driver metrics
+ /// </summary>
+ public static readonly OptionalImpl<IObserver<IDriverMetrics>> OnDriverMetrics =
+ new OptionalImpl<IObserver<IDriverMetrics>>();
+
+ /// <summary>
+ /// Configuration module for driver metrics observer.
+ /// MetricsService is added as an observer.
+ /// User can set more observers with this configuration module.
+ /// </summary>
+ public readonly static ConfigurationModule ConfigurationModule = new DriverMetricsObserverConfigurationModule()
+ .BindSetEntry(GenericType<DriverMetricsObservers>.Class, OnDriverMetrics)
+ .BindSetEntry<DriverMetricsObservers, MetricsService, IObserver<IDriverMetrics>>(GenericType<DriverMetricsObservers>.Class, GenericType<MetricsService>.Class)
+ .Build();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs
new file mode 100644
index 0000000..21a3f4a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// Named parameter for a set of driver metrics observers
+ /// </summary>
+ [NamedParameter(documentation: "Observers for Driver Metrics", shortName: "DriverMetricsObservers")]
+ public class DriverMetricsObservers : Name<ISet<IObserver<IDriverMetrics>>>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs
new file mode 100644
index 0000000..4f2c05d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ [Unstable("0.16", "This is to build a simple metrics with system state only. More metrics will be added in future.")]
+ [DefaultImplementation(typeof(DriverMetrics))]
+ public interface IDriverMetrics
+ {
+ /// <summary>
+ /// System state
+ /// </summary>
+ string SystemState { get; }
+
+ /// <summary>
+ /// DateTime that the system state is updated
+ /// </summary>
+ DateTime TimeUpdated { get; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
index b27bd3d..eef54db 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
@@ -28,6 +28,6 @@ namespace Org.Apache.REEF.Common.Telemetry
[DefaultImplementation(typeof(DefaultMetricsSink))]
public interface IMetricsSink : IDisposable
{
- void Sink(ISet<KeyValuePair<string, string>> metrics);
+ void Sink(IEnumerable<KeyValuePair<string, string>> metrics);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
index 75c8cc2..d0bf196 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
@@ -31,7 +31,7 @@ namespace Org.Apache.REEF.Common.Telemetry
/// Metrics service. It is also a context message handler.
/// </summary>
[Unstable("0.16", "This is a simple MetricsService. More functionalities will be added.")]
- internal sealed class MetricsService : IObserver<IContextMessage>
+ internal sealed class MetricsService : IObserver<IContextMessage>, IObserver<IDriverMetrics>
{
private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsService));
@@ -47,7 +47,8 @@ namespace Org.Apache.REEF.Common.Telemetry
/// <summary>
/// The threshold that triggers the sinks.
- /// Currently only one threshold is defined for all the counters. Later, it can be extended to define a threshold per counter.
+ /// Currently only one threshold is defined for all the counters.
+ /// Later, it can be extended to define a threshold per counter.
/// </summary>
private readonly int _counterSinkThreshold;
@@ -73,7 +74,9 @@ namespace Org.Apache.REEF.Common.Telemetry
{
var msgReceived = ByteUtilities.ByteArraysToString(contextMessage.Message);
var counters = new EvaluatorMetrics(msgReceived).GetMetricsCounters();
- Logger.Log(Level.Info, "Received {0} counters with context message: {1}.", counters.GetCounters().Count(), msgReceived);
+
+ Logger.Log(Level.Verbose, "Received {0} counters with context message: {1}.",
+ counters.GetCounters().Count(), msgReceived);
_countersData.Update(counters);
@@ -87,17 +90,17 @@ namespace Org.Apache.REEF.Common.Telemetry
/// <summary>
/// Call each Sink to sink the data in the counters
/// </summary>
- private void Sink(ISet<KeyValuePair<string, string>> set)
+ private void Sink(IEnumerable<KeyValuePair<string, string>> metrics)
{
foreach (var s in _metricsSinks)
{
try
{
- Task.Run(() => s.Sink(set));
+ Task.Run(() => s.Sink(metrics));
}
catch (Exception e)
{
- Logger.Log(Level.Error, "Exception happens during the sink for Sink {0} with Exception: {1}.", s.GetType().AssemblyQualifiedName, e);
+ Logger.Log(Level.Error, "Exception in Sink " + s.GetType().AssemblyQualifiedName, e);
}
finally
{
@@ -108,10 +111,27 @@ namespace Org.Apache.REEF.Common.Telemetry
public void OnCompleted()
{
+ Logger.Log(Level.Info, "Completed");
}
public void OnError(Exception error)
{
+ Logger.Log(Level.Error, "MetricService error", error);
+ }
+
+ /// <summary>
+ /// Observer of IDriverMetrics.
+ /// When Driver metrics data is changed, this method will be called.
+ /// It calls Sink to store/log the metrics data.
+ /// </summary>
+ /// <param name="driverMetrics">driver metrics data.</param>
+ public void OnNext(IDriverMetrics driverMetrics)
+ {
+ Sink(new Dictionary<string, string>()
+ {
+ { "SystemState", driverMetrics.SystemState },
+ { "TimeUpdated", driverMetrics.TimeUpdated.ToLongTimeString() }
+ });
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
index eb88cf0..73df585 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
@@ -16,6 +16,7 @@
// under the License.
using System;
+using System.Collections.Generic;
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Common.Services;
using Org.Apache.REEF.Common.Tasks;
@@ -23,6 +24,7 @@ using Org.Apache.REEF.Common.Telemetry;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Util;
@@ -31,22 +33,42 @@ using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.Tests.Functional.Telemetry
{
+ /// <summary>
+ /// Test driver to test metrics
+ /// </summary>
class MetricsDriver :
IObserver<IDriverStarted>,
IObserver<IAllocatedEvaluator>,
- IObserver<IActiveContext>
+ IObserver<IActiveContext>,
+ IObserver<ICompletedTask>
{
private static readonly Logger Logger = Logger.GetLogger(typeof(MessageDriver));
private readonly IEvaluatorRequestor _evaluatorRequestor;
+ internal const string EventPrefix = "TestState";
+ /// <summary>
+ /// a set of driver metrics observers.
+ /// </summary>
+ private readonly ISet<IObserver<IDriverMetrics>> _driverMetricsObservers;
+
+ /// <summary>
+ /// This driver inject DriverMetricsObservers and IDriverMetrics.
+ /// It keeps updating the driver metrics when receiving events.
+ /// </summary>
+ /// <param name="evaluatorRequestor"></param>
+ /// <param name="driverMetricsObservers"></param>
[Inject]
- public MetricsDriver(IEvaluatorRequestor evaluatorRequestor)
+ public MetricsDriver(IEvaluatorRequestor evaluatorRequestor,
+ [Parameter(typeof(DriverMetricsObservers))] ISet<IObserver<IDriverMetrics>> driverMetricsObservers)
{
_evaluatorRequestor = evaluatorRequestor;
+ _driverMetricsObservers = driverMetricsObservers;
}
public void OnNext(IDriverStarted value)
{
+ UpdateMetrics(TestSystemState.DriverStarted);
+
var request =
_evaluatorRequestor.NewBuilder()
.SetNumber(1)
@@ -61,14 +83,14 @@ namespace Org.Apache.REEF.Tests.Functional.Telemetry
public void OnNext(IAllocatedEvaluator value)
{
Logger.Log(Level.Info, "Received IAllocatedEvaluator");
- const string contextId = "ContextID";
+ UpdateMetrics(TestSystemState.EvaluatorAllocated);
+ const string contextId = "ContextID";
var serviceConfiguration = ServiceConfiguration.ConfigurationModule
.Build();
var contextConfiguration1 = ContextConfiguration.ConfigurationModule
.Set(ContextConfiguration.Identifier, contextId)
- ////.Set(ContextConfiguration.OnSendMessage, GenericType<MetricsMessageSender>.Class)
.Build();
var contextConfiguration2 = MessageSenderConfigurationModule.ConfigurationModule.Build();
@@ -80,6 +102,7 @@ namespace Org.Apache.REEF.Tests.Functional.Telemetry
public void OnNext(IActiveContext activeContext)
{
Logger.Log(Level.Info, "Received IActiveContext");
+ UpdateMetrics(TestSystemState.ActiveContextReceived);
const string taskId = "TaskID";
var taskConfiguration = TaskConfiguration.ConfigurationModule
@@ -89,6 +112,14 @@ namespace Org.Apache.REEF.Tests.Functional.Telemetry
activeContext.SubmitTask(taskConfiguration);
}
+ public void OnNext(ICompletedTask value)
+ {
+ Logger.Log(Level.Info, "Received ICompletedTask");
+ UpdateMetrics(TestSystemState.TaskCompleted);
+
+ value.ActiveContext.Dispose();
+ }
+
public void OnCompleted()
{
throw new NotImplementedException();
@@ -98,5 +129,26 @@ namespace Org.Apache.REEF.Tests.Functional.Telemetry
{
throw new NotImplementedException();
}
+
+ /// <summary>
+ /// Call metrics observers with driver metrics data
+ /// </summary>
+ private void UpdateMetrics(TestSystemState systemState)
+ {
+ var driverMetrics = new DriverMetrics(EventPrefix + systemState, DateTime.Now);
+
+ foreach (var metricsObserver in _driverMetricsObservers)
+ {
+ metricsObserver.OnNext(driverMetrics);
+ }
+ }
+ }
+
+ internal enum TestSystemState
+ {
+ DriverStarted,
+ EvaluatorAllocated,
+ ActiveContextReceived,
+ TaskCompleted
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/af6c39a2/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
index f447f75..0f10ac8 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
@@ -40,24 +40,31 @@ namespace Org.Apache.REEF.Tests.Functional.Telemetry
string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
var receivedCounterMessage = GetMessageCount(lines, "Received 2 counters with context message:");
Assert.True(receivedCounterMessage > 1);
+
+ var messageCount = GetMessageCount(lines, MetricsDriver.EventPrefix);
+ Assert.Equal(4, messageCount);
+
CleanUp(testFolder);
}
private static IConfiguration DriverConfigurations()
{
- var c1 = DriverConfiguration.ConfigurationModule
+ var driverBasicConfig = DriverConfiguration.ConfigurationModule
.Set(DriverConfiguration.OnDriverStarted, GenericType<MetricsDriver>.Class)
.Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<MetricsDriver>.Class)
.Set(DriverConfiguration.OnContextActive, GenericType<MetricsDriver>.Class)
+ .Set(DriverConfiguration.OnTaskCompleted, GenericType<MetricsDriver>.Class)
.Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
.Build();
- var c2 = MetricsServiceConfigurationModule.ConfigurationModule
+ var metricServiceConfig = MetricsServiceConfigurationModule.ConfigurationModule
.Set(MetricsServiceConfigurationModule.OnMetricsSink, GenericType<DefaultMetricsSink>.Class)
.Set(MetricsServiceConfigurationModule.CounterSinkThreshold, "5")
.Build();
- return Configurations.Merge(c1, c2);
+ var driverMetricConfig = DriverMetricsObserverConfigurationModule.ConfigurationModule.Build();
+
+ return Configurations.Merge(driverBasicConfig, metricServiceConfig, driverMetricConfig);
}
}
}