You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:05:43 UTC
[22/51] [partial] incubator-reef git commit: [REEF-131] Towards the
new .Net project structure This is to change .Net project structure for Tang,
Wake, REEF utilities, Common and Driver:
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs
deleted file mode 100644
index 052764d..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Common.Context;
-using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
-using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
-using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
-using Org.Apache.Reef.Evaluator;
-using Org.Apache.Reef.Utilities;
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Tang.Annotations;
-using Org.Apache.Reef.Tang.Interface;
-using Org.Apache.Reef.Wake.Remote;
-using Org.Apache.Reef.Wake.Time;
-using Org.Apache.Reef.Wake.Time.Runtime.Event;
-using System;
-using System.Globalization;
-
-namespace Org.Apache.Reef.Common
-{
- public class EvaluatorRuntime : IObserver<RuntimeStart>, IObserver<RuntimeStop>, IObserver<REEFMessage>
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorRuntime));
-
- private readonly string _evaluatorId;
-
- private readonly ContextManager _contextManager;
-
- private readonly HeartBeatManager _heartBeatManager;
-
- private readonly IRemoteManager<REEFMessage> _remoteManager;
-
- private readonly IClock _clock;
-
- private State _state = State.INIT;
-
- private IDisposable _evaluatorControlChannel;
-
- [Inject]
- public EvaluatorRuntime(
- ContextManager contextManager,
- HeartBeatManager heartBeatManager)
- {
- using (LOGGER.LogFunction("EvaluatorRuntime::EvaluatorRuntime"))
- {
- _clock = heartBeatManager.EvaluatorSettings.RuntimeClock;
- _heartBeatManager = heartBeatManager;
- _contextManager = contextManager;
- _evaluatorId = heartBeatManager.EvaluatorSettings.EvalutorId;
- _remoteManager = heartBeatManager.EvaluatorSettings.RemoteManager;
-
- ReefMessageProtoObserver driverObserver = new ReefMessageProtoObserver();
-
- // subscribe to driver proto message
- driverObserver.Subscribe(o => OnNext(o.Message));
-
- // register the driver observer
- _evaluatorControlChannel = _remoteManager.RegisterObserver(driverObserver);
-
- // start the hearbeat
- _clock.ScheduleAlarm(0, heartBeatManager);
- }
- }
-
- public State State
- {
- get
- {
- return _state;
- }
- }
-
- public void Handle(EvaluatorControlProto message)
- {
- lock (_heartBeatManager)
- {
- LOGGER.Log(Level.Info, "Handle Evaluator control message");
- if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase))
- {
- Handle(new InvalidOperationException(
- string.Format(CultureInfo.InvariantCulture, "Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", message.identifier, _evaluatorId)));
- }
- else if (_state != State.RUNNING)
- {
- Handle(new InvalidOperationException(
- string.Format(CultureInfo.InvariantCulture, "Evaluator received a control message but its state is not {0} but rather {1}", State.RUNNING, _state)));
- }
- else
- {
- if (message.context_control != null)
- {
- LOGGER.Log(Level.Info, "Send task control message to ContextManager");
- try
- {
- _contextManager.HandleTaskControl(message.context_control);
- if (_contextManager.ContextStackIsEmpty() && _state == State.RUNNING)
- {
- LOGGER.Log(Level.Info, "Context stack is empty, done");
- _state = State.DONE;
- _heartBeatManager.OnNext(GetEvaluatorStatus());
- _clock.Dispose();
- }
- }
- catch (Exception e)
- {
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
- Handle(e);
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(e.ToString(), e), LOGGER);
- }
- }
- if (message.kill_evaluator != null)
- {
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by the driver.", _evaluatorId));
- _state = State.KILLED;
- _clock.Dispose();
- }
- }
- }
- }
-
- public EvaluatorStatusProto GetEvaluatorStatus()
- {
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state : {0}", _state));
- EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto()
- {
- evaluator_id = _evaluatorId,
- state = _state
- };
- return evaluatorStatusProto;
- }
-
- public void OnNext(RuntimeStart runtimeStart)
- {
- lock (_evaluatorId)
- {
- try
- {
- LOGGER.Log(Level.Info, "Runtime start");
- if (_state != State.INIT)
- {
- var e = new InvalidOperationException("State should be init.");
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
- }
- _state = State.RUNNING;
- _contextManager.Start();
- _heartBeatManager.OnNext();
- }
- catch (Exception e)
- {
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
- Handle(e);
- }
- }
- }
-
- void IObserver<RuntimeStart>.OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- void IObserver<REEFMessage>.OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- void IObserver<REEFMessage>.OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- void IObserver<RuntimeStop>.OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- void IObserver<RuntimeStop>.OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- void IObserver<RuntimeStart>.OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- public void OnNext(RuntimeStop runtimeStop)
- {
- LOGGER.Log(Level.Info, "Runtime stop");
- _contextManager.Dispose();
-
- if (_state == State.RUNNING)
- {
- _state = State.DONE;
- _heartBeatManager.OnNext();
- }
- try
- {
- _evaluatorControlChannel.Dispose();
- }
- catch (Exception e)
- {
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", LOGGER);
- }
- LOGGER.Log(Level.Info, "EvaluatorRuntime shutdown complete");
- }
-
- public void OnNext(REEFMessage value)
- {
- if (value != null && value.evaluatorControl != null)
- {
- LOGGER.Log(Level.Info, "Received a REEFMessage with EvaluatorControl");
- Handle(value.evaluatorControl);
- }
- }
-
- private void Handle(Exception e)
- {
- lock (_heartBeatManager)
- {
- LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with exception", _evaluatorId), e);
- _state = State.FAILED;
- string errorMessage = string.Format(
- CultureInfo.InvariantCulture,
- "failed with error [{0}] with mesage [{1}] and stack trace [{2}]",
- e,
- e.Message,
- e.StackTrace);
- EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto()
- {
- evaluator_id = _evaluatorId,
- error = ByteUtilities.StringToByteArrays(errorMessage),
- state = _state
- };
- _heartBeatManager.OnNext(evaluatorStatusProto);
- _contextManager.Dispose();
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs
deleted file mode 100644
index 067a0a0..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Common.Evaluator;
-using Org.Apache.Reef.Common.Evaluator.Context;
-using Org.Apache.Reef.Common.io;
-using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
-using Org.Apache.Reef.Tang.Interface;
-using Org.Apache.Reef.Wake.Remote;
-using Org.Apache.Reef.Wake.Time;
-using System;
-
-namespace Org.Apache.Reef.Evaluator
-{
- // TODO: merge with EvaluatorConfigurations class
- public class EvaluatorSettings
- {
- private string _applicationId;
-
- private string _evaluatorId;
-
- private int _heartBeatPeriodInMs;
-
- private int _maxHeartbeatRetries;
-
- private ContextConfiguration _rootContextConfig;
-
- private IClock _clock;
-
- private IRemoteManager<REEFMessage> _remoteManager;
-
- private IInjector _injector;
-
- private EvaluatorOperationState _operationState;
-
- private INameClient _nameClient;
-
- public EvaluatorSettings(
- string applicationId,
- string evaluatorId,
- int heartbeatPeriodInMs,
- int maxHeartbeatRetries,
- ContextConfiguration rootContextConfig,
- IClock clock,
- IRemoteManager<REEFMessage> remoteManager,
- IInjector injecor)
- {
- if (string.IsNullOrWhiteSpace(evaluatorId))
- {
- throw new ArgumentNullException("evaluatorId");
- }
- if (rootContextConfig == null)
- {
- throw new ArgumentNullException("rootContextConfig");
- }
- if (clock == null)
- {
- throw new ArgumentNullException("clock");
- }
- if (remoteManager == null)
- {
- throw new ArgumentNullException("remoteManager");
- }
- if (injecor == null)
- {
- throw new ArgumentNullException("injecor");
- }
- _applicationId = applicationId;
- _evaluatorId = evaluatorId;
- _heartBeatPeriodInMs = heartbeatPeriodInMs;
- _maxHeartbeatRetries = maxHeartbeatRetries;
- _rootContextConfig = rootContextConfig;
- _clock = clock;
- _remoteManager = remoteManager;
- _injector = injecor;
- _operationState = EvaluatorOperationState.OPERATIONAL;
- }
-
- public EvaluatorOperationState OperationState
- {
- get
- {
- return _operationState;
- }
-
- set
- {
- _operationState = value;
- }
- }
-
- public string EvalutorId
- {
- get
- {
- return _evaluatorId;
- }
- }
-
- public int HeartBeatPeriodInMs
- {
- get
- {
- return _heartBeatPeriodInMs;
- }
- }
-
- public string ApplicationId
- {
- get
- {
- return _applicationId;
- }
- }
-
- public int MaxHeartbeatFailures
- {
- get
- {
- return _maxHeartbeatRetries;
- }
- }
-
- public ContextConfiguration RootContextConfig
- {
- get
- {
- return _rootContextConfig;
- }
- }
-
- public IClock RuntimeClock
- {
- get
- {
- return _clock;
- }
- }
-
- public INameClient NameClient
- {
- get
- {
- return _nameClient;
- }
-
- set
- {
- _nameClient = value;
- }
- }
-
- public IRemoteManager<REEFMessage> RemoteManager
- {
- get
- {
- return _remoteManager;
- }
- }
-
- public IInjector Injector
- {
- get
- {
- return _injector;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs
deleted file mode 100644
index e495fda..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs
+++ /dev/null
@@ -1,390 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Common.Context;
-using Org.Apache.Reef.Common.Evaluator;
-using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
-using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
-using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
-using Org.Apache.Reef.Common.Runtime;
-using Org.Apache.Reef.Evaluator;
-using Org.Apache.Reef.Utilities;
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Wake.Remote;
-using Org.Apache.Reef.Wake.Remote.Impl;
-using Org.Apache.Reef.Wake.Time;
-using System;
-using System.Collections.Generic;
-using System.Diagnostics.CodeAnalysis;
-using System.Globalization;
-using System.Linq;
-using System.Net;
-using System.Threading;
-
-namespace Org.Apache.Reef.Common
-{
- public class HeartBeatManager : IObserver<Alarm>
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager));
-
- private static readonly MachineStatus MachineStatus = new MachineStatus();
-
- private readonly IRemoteManager<REEFMessage> _remoteManager;
-
- private readonly IClock _clock;
-
- private readonly int _heartBeatPeriodInMillSeconds;
-
- private readonly int _maxHeartbeatRetries = 0;
-
- private readonly string _evaluatorId;
-
- private IRemoteIdentifier _remoteId;
-
- private IObserver<REEFMessage> _observer;
-
- private int _heartbeatFailures = 0;
-
- private IDriverConnection _driverConnection;
-
- private EvaluatorSettings _evaluatorSettings;
-
- // the queue can only contains the following:
- // 1. all failed heartbeats (regular and event-based) before entering RECOVERY state
- // 2. event-based heartbeats generated in RECOVERY state (since there will be no attempt to send regular heartbeat)
- private Queue<EvaluatorHeartbeatProto> _queuedHeartbeats = new Queue<EvaluatorHeartbeatProto>();
-
- public HeartBeatManager(EvaluatorSettings settings, IRemoteIdentifier remoteId)
- {
- using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager"))
- {
- _remoteManager = settings.RemoteManager;
- _remoteId = remoteId;
- _evaluatorId = settings.EvalutorId;
- _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId));
- _clock = settings.RuntimeClock;
- _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs;
- _maxHeartbeatRetries = settings.MaxHeartbeatFailures;
- EvaluatorSettings = settings;
- MachineStatus.ToString(); // kick start the CPU perf counter
- }
- }
-
- [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")]
- public EvaluatorRuntime _evaluatorRuntime { get; set; }
-
- [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")]
- public ContextManager _contextManager { get; set; }
-
- public EvaluatorSettings EvaluatorSettings
- {
- get
- {
- return _evaluatorSettings;
- }
-
- private set
- {
- _evaluatorSettings = value;
- }
- }
-
- public void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto)
- {
- lock (_queuedHeartbeats)
- {
- if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY)
- {
- LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "In RECOVERY mode, heartbeat queued as [{0}]. ", evaluatorHeartbeatProto));
- _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
- return;
- }
-
- // NOT during recovery, try to send
- REEFMessage payload = new REEFMessage(evaluatorHeartbeatProto);
- try
- {
- _observer.OnNext(payload);
- _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures
- }
- catch (Exception e)
- {
- if (evaluatorHeartbeatProto.task_status == null || evaluatorHeartbeatProto.task_status.state != State.RUNNING)
- {
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER);
- }
-
- _heartbeatFailures++;
-
- _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
- LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e);
-
- if (_heartbeatFailures >= _maxHeartbeatRetries)
- {
- LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Heartbeat communications to driver reached max of {0} failures. \n==== Driver is considered dead/unreachable. === \n=========== Entering RECOVERY mode. ===========", _heartbeatFailures));
- try
- {
- _driverConnection = _evaluatorSettings.Injector.GetInstance<IDriverConnection>();
- }
- catch (Exception ex)
- {
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER);
- }
- LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection);
- _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY;
-
- // clean heartbeat failure
- _heartbeatFailures = 0;
- }
- }
- }
- }
-
- /// <summary>
- /// Assemble a complete new heartbeat and send it out.
- /// </summary>
- public void OnNext()
- {
- LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext()");
- lock (this)
- {
- LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext()");
- EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto();
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
- Send(heartbeatProto);
- }
- }
-
- /// <summary>
- /// Called with a specific TaskStatus that must be delivered to the driver
- /// </summary>
- /// <param name="taskStatusProto"></param>
- public void OnNext(TaskStatusProto taskStatusProto)
- {
- LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(TaskStatusProto)");
- lock (this)
- {
- LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(TaskStatusProto)");
- EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(
- _evaluatorRuntime.GetEvaluatorStatus(),
- _contextManager.GetContextStatusCollection(),
- Optional<TaskStatusProto>.Of(taskStatusProto));
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
- Send(heartbeatProto);
- }
- }
-
- /// <summary>
- /// Called with a specific ContextStatusProto that must be delivered to the driver
- /// </summary>
- /// <param name="contextStatusProto"></param>
- public void OnNext(ContextStatusProto contextStatusProto)
- {
- LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(ContextStatusProto)");
- lock (this)
- {
- LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(ContextStatusProto)");
- List<ContextStatusProto> contextStatusProtos = new List<ContextStatusProto>();
- contextStatusProtos.Add(contextStatusProto);
- contextStatusProtos.AddRange(_contextManager.GetContextStatusCollection());
- EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(
- _evaluatorRuntime.GetEvaluatorStatus(),
- contextStatusProtos,
- Optional<TaskStatusProto>.Empty());
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
- Send(heartbeatProto);
- }
- }
-
- /// <summary>
- /// Called with a specific EvaluatorStatus that must be delivered to the driver
- /// </summary>
- /// <param name="evaluatorStatusProto"></param>
- public void OnNext(EvaluatorStatusProto evaluatorStatusProto)
- {
- LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)");
- lock (this)
- {
- LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(EvaluatorStatusProto)");
- EvaluatorHeartbeatProto heartbeatProto = new EvaluatorHeartbeatProto()
- {
- timestamp = CurrentTimeMilliSeconds(),
- evaluator_status = evaluatorStatusProto
- };
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
- Send(heartbeatProto);
- }
- }
-
- public void OnNext(Alarm value)
- {
- LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(Alarm)");
- lock (this)
- {
- LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)");
- if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && _evaluatorRuntime.State == State.RUNNING)
- {
- EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto();
- LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString()));
- Send(evaluatorHeartbeatProto);
- _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this);
- }
- else
- {
- LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", _evaluatorSettings.OperationState, _evaluatorRuntime.State));
- try
- {
- DriverInformation driverInformation = _driverConnection.GetDriverInformation(_evaluatorSettings.ApplicationId);
- if (driverInformation == null)
- {
- LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later.");
- }
- else
- {
- LOGGER.Log(
- Level.Info,
- string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId));
- Recover(driverInformation);
- }
- }
- catch (Exception e)
- {
- // we do not want any exception to stop the query for driver status
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER);
- }
- _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this);
- }
- }
- }
-
- public void OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- public void OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- private static long CurrentTimeMilliSeconds()
- {
- // this is an implmenation to get current time milli second counted from Jan 1st, 1970
- // it is chose as such to be compatible with java implmentation
- DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
- return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds;
- }
-
- private void Recover(DriverInformation driverInformation)
- {
- IPEndPoint driverEndpoint = NetUtilities.ParseIpEndpoint(driverInformation.DriverRemoteIdentifier);
- _remoteId = new SocketRemoteIdentifier(driverEndpoint);
- _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId));
- lock (_evaluatorSettings)
- {
- if (_evaluatorSettings.NameClient != null)
- {
- try
- {
- LOGGER.Log(Level.Verbose, "Trying to reset and reconnect to name server" + driverInformation.NameServerId);
- _evaluatorSettings.NameClient.Restart(NetUtilities.ParseIpEndpoint(driverInformation.NameServerId));
- LOGGER.Log(Level.Info, "Reconnected to name server: " + driverInformation.NameServerId);
- }
- catch (Exception e)
- {
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
- }
- }
- }
-
- lock (_queuedHeartbeats)
- {
- bool firstHeartbeatInQueue = true;
- while (_queuedHeartbeats.Any())
- {
- LOGGER.Log(Level.Info, "Sending cached recovery heartbeats to " + _remoteId);
- try
- {
- if (firstHeartbeatInQueue)
- {
- // first heartbeat is specially construted to include the recovery flag
- EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue());
- LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat);
- _observer.OnNext(new REEFMessage(recoveryHeartbeat));
- firstHeartbeatInQueue = false;
- }
- else
- {
- _observer.OnNext(new REEFMessage(_queuedHeartbeats.Dequeue()));
- }
- }
- catch (Exception e)
- {
- // we do not handle failures during RECOVERY
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(
- e,
- Level.Error,
- string.Format(CultureInfo.InvariantCulture, "Hearbeat attempt failed in RECOVERY mode to Driver {0} , giving up...", _remoteId),
- LOGGER);
- }
- Thread.Sleep(500);
- }
- }
- _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL;
- LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ===========");
- }
-
- private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat)
- {
- heartbeat.recovery = true;
- heartbeat.context_status.ForEach(c => c.recovery = true);
- heartbeat.task_status.recovery = true;
- return heartbeat;
- }
-
- private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto()
- {
- return GetEvaluatorHeartbeatProto(
- _evaluatorRuntime.GetEvaluatorStatus(),
- _contextManager.GetContextStatusCollection(),
- _contextManager.GetTaskStatus());
- }
-
- private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto(
- EvaluatorStatusProto evaluatorStatusProto,
- ICollection<ContextStatusProto> contextStatusProtos,
- Optional<TaskStatusProto> taskStatusProto)
- {
- EvaluatorHeartbeatProto evaluatorHeartbeatProto = new EvaluatorHeartbeatProto()
- {
- timestamp = CurrentTimeMilliSeconds(),
- evaluator_status = evaluatorStatusProto
- };
- foreach (ContextStatusProto contextStatusProto in contextStatusProtos)
- {
- evaluatorHeartbeatProto.context_status.Add(contextStatusProto);
- }
- if (taskStatusProto.IsPresent())
- {
- evaluatorHeartbeatProto.task_status = taskStatusProto.Value;
- }
- return evaluatorHeartbeatProto;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs
deleted file mode 100644
index 8a7aa94..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
-using Org.Apache.Reef.Utilities;
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Wake.Remote;
-using System;
-using System.Globalization;
-using System.Threading;
-
-namespace Org.Apache.Reef.Common
-{
- public class ReefMessageProtoObserver :
- IObserver<IRemoteMessage<REEFMessage>>,
- IObservable<IRemoteMessage<REEFMessage>>,
- IDisposable
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(ReefMessageProtoObserver));
- private volatile IObserver<IRemoteMessage<REEFMessage>> _observer = null;
- private long _count = 0;
- private DateTime _begin;
- private DateTime _origBegin;
-
- public void OnCompleted()
- {
- }
-
- public void OnError(Exception error)
- {
- }
-
- public void OnNext(IRemoteMessage<REEFMessage> value)
- {
- REEFMessage remoteEvent = value.Message;
- IRemoteIdentifier id = value.Identifier;
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "receive a ReefMessage from {0} Driver at {1}.", remoteEvent, id));
-
- if (remoteEvent.evaluatorControl != null)
- {
- if (remoteEvent.evaluatorControl.context_control != null)
- {
- string context_message = null;
- string task_message = null;
-
- if (remoteEvent.evaluatorControl.context_control.context_message != null)
- {
- context_message = remoteEvent.evaluatorControl.context_control.context_message.ToString();
- }
- if (remoteEvent.evaluatorControl.context_control.task_message != null)
- {
- task_message = ByteUtilities.ByteArrarysToString(remoteEvent.evaluatorControl.context_control.task_message);
- }
-
- if (!(string.IsNullOrEmpty(context_message) && string.IsNullOrEmpty(task_message)))
- {
- LOGGER.Log(Level.Info,
- string.Format(CultureInfo.InvariantCulture, "Control protobuf with context message [{0}] and task message [{1}]", context_message, task_message));
- }
- else if (remoteEvent.evaluatorControl.context_control.remove_context != null)
- {
- LOGGER.Log(Level.Info,
- string.Format(CultureInfo.InvariantCulture, "Control protobuf to remove context {0}", remoteEvent.evaluatorControl.context_control.remove_context.context_id));
- }
- else if (remoteEvent.evaluatorControl.context_control.add_context != null)
- {
- LOGGER.Log(Level.Info,
- string.Format(CultureInfo.InvariantCulture, "Control protobuf to add a context on top of {0}", remoteEvent.evaluatorControl.context_control.add_context.parent_context_id));
- }
- else if (remoteEvent.evaluatorControl.context_control.start_task != null)
- {
- LOGGER.Log(Level.Info,
- string.Format(CultureInfo.InvariantCulture, "Control protobuf to start an task in {0}", remoteEvent.evaluatorControl.context_control.start_task.context_id));
- }
- else if (remoteEvent.evaluatorControl.context_control.stop_task != null)
- {
- LOGGER.Log(Level.Info, "Control protobuf to stop task");
- }
- else if (remoteEvent.evaluatorControl.context_control.suspend_task != null)
- {
- LOGGER.Log(Level.Info, "Control protobuf to suspend task");
- }
- }
- }
- if (_count == 0)
- {
- _begin = DateTime.Now;
- _origBegin = _begin;
- }
- var count = Interlocked.Increment(ref _count);
-
- int printBatchSize = 100000;
- if (count % printBatchSize == 0)
- {
- DateTime end = DateTime.Now;
- var diff = (end - _begin).TotalMilliseconds;
- double seconds = diff / 1000.0;
- long eventsPerSecond = (long)(printBatchSize / seconds);
- _begin = DateTime.Now;
- }
-
- var observer = _observer;
- if (observer != null)
- {
- observer.OnNext(value);
- }
- }
-
- public IDisposable Subscribe(IObserver<IRemoteMessage<REEFMessage>> observer)
- {
- if (_observer != null)
- {
- return null;
- }
- _observer = observer;
- return this;
- }
-
- public void Dispose()
- {
- _observer = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs
deleted file mode 100644
index 31194a7..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Utilities;
-using Org.Apache.Reef.Tang.Interface;
-using System;
-
-namespace Org.Apache.Reef.Common.Context
-{
- public class ContextClientCodeException : Exception
- {
- private readonly string _contextId;
- private readonly Optional<string> _parentId;
-
- /// <summary>
- /// construt the exception that caused the error
- /// </summary>
- /// <param name="contextId"> the id of the failed context.</param>
- /// <param name="parentId"> the id of the failed context's parent, if any.</param>
- /// <param name="message"> the error message </param>
- /// <param name="cause"> the exception that caused the error</param>
- public ContextClientCodeException(
- string contextId,
- Optional<string> parentId,
- string message,
- Exception cause)
- : base("Failure in context '" + contextId + "': " + message, cause)
- {
- _contextId = contextId;
- _parentId = parentId;
- }
-
- public string ContextId
- {
- get { return _contextId; }
- }
-
- public Optional<string> ParentId
- {
- get { return _parentId; }
- }
-
- /// <summary>
- /// Extracts a context id from the given configuration.
- /// </summary>
- /// <param name="c"></param>
- /// <returns>the context id in the given configuration.</returns>
- public static string GetId(IConfiguration c)
- {
- // TODO: update after TANG is available
- return string.Empty;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs
deleted file mode 100644
index ca6b949..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Tang.Formats;
-using Org.Apache.Reef.Tang.Interface;
-using System;
-using System.Collections.Generic;
-using System.Globalization;
-using System.IO;
-using Org.Apache.Reef.Tang.Types;
-
-namespace Org.Apache.Reef.Common.Evaluator.Context
-{
- public class ContextConfiguration : IConfiguration
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextConfiguration));
-
- private Dictionary<string, string> _settings;
-
- public ContextConfiguration(string configString)
- {
- using (LOGGER.LogFunction("ContextConfiguration::ContextConfigurationn"))
- {
- ContainerDirectory = Directory.GetCurrentDirectory();
-
- _settings = new Dictionary<string, string>();
- AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString);
- foreach (ConfigurationEntry config in avroConfiguration.Bindings)
- {
- if (config.key.Contains(Reef.Evaluator.Constants.ContextIdentifier))
- {
- config.key = Reef.Evaluator.Constants.ContextIdentifier;
- LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "{0} detected for context id with value {1}", config.key, config.value));
- }
- _settings.Add(config.key, config.value);
- }
- if (!_settings.ContainsKey(Reef.Evaluator.Constants.ContextIdentifier))
- {
- string msg = "Required parameter ContextIdentifier not provided.";
- LOGGER.Log(Level.Error, msg);
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER);
- }
- }
- }
-
- public string Id
- {
- get { return _settings[Reef.Evaluator.Constants.ContextIdentifier]; }
- }
-
- public string ContainerDirectory { get; set; }
-
- public IConfigurationBuilder newBuilder()
- {
- throw new NotImplementedException();
- }
-
- public string GetNamedParameter(INamedParameterNode np)
- {
- throw new NotImplementedException();
- }
-
- public IClassHierarchy GetClassHierarchy()
- {
- throw new NotImplementedException();
- }
-
- public ISet<object> GetBoundSet(INamedParameterNode np)
- {
- throw new NotImplementedException();
- }
-
- public IClassNode GetBoundConstructor(IClassNode cn)
- {
- throw new NotImplementedException();
- }
-
- public IClassNode GetBoundImplementation(IClassNode cn)
- {
- throw new NotImplementedException();
- }
-
- public IConstructorDef GetLegacyConstructor(IClassNode cn)
- {
- throw new NotImplementedException();
- }
-
- public ICollection<IClassNode> GetBoundImplementations()
- {
- throw new NotImplementedException();
- }
-
- public ICollection<IClassNode> GetBoundConstructors()
- {
- throw new NotImplementedException();
- }
-
- public ICollection<INamedParameterNode> GetNamedParameters()
- {
- throw new NotImplementedException();
- }
-
- public ICollection<IClassNode> GetLegacyConstructors()
- {
- throw new NotImplementedException();
- }
-
- public IList<object> GetBoundList(INamedParameterNode np)
- {
- throw new NotImplementedException();
- }
-
- public IEnumerator<KeyValuePair<INamedParameterNode, object>> GetBoundSets()
- {
- throw new NotImplementedException();
- }
-
- public IDictionary<INamedParameterNode, IList<object>> GetBoundList()
- {
- throw new NotImplementedException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs
deleted file mode 100644
index 9967258..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Common.Events;
-using System;
-using System.Collections.Generic;
-
-namespace Org.Apache.Reef.Common.Context
-{
- /// <summary>
- /// This class is used to trigger all the context life-cycle dependent events.
- /// </summary>
- class ContextLifeCycle
- {
- private HashSet<IObserver<IContextStart>> _contextStartHandlers;
-
- private HashSet<IObserver<IContextStop>> _contextStopHandlers;
-
- private HashSet<IContextMessageSource> _contextMessageSources;
-
- // @Inject
- public ContextLifeCycle(
- string id,
- HashSet<IObserver<IContextStart>> contextStartHandlers,
- HashSet<IObserver<IContextStop>> contextStopHandlers,
- HashSet<IContextMessageSource> contextMessageSources)
- {
- Id = id;
- _contextStartHandlers = contextStartHandlers;
- _contextStopHandlers = contextStopHandlers;
- _contextMessageSources = contextMessageSources;
- }
-
- public ContextLifeCycle(string contextId)
- {
- Id = contextId;
- _contextStartHandlers = new HashSet<IObserver<IContextStart>>();
- _contextStopHandlers = new HashSet<IObserver<IContextStop>>();
- _contextMessageSources = new HashSet<IContextMessageSource>();
- }
-
- public string Id { get; private set; }
-
- public HashSet<IContextMessageSource> ContextMessageSources
- {
- get { return _contextMessageSources; }
- }
-
- /// <summary>
- /// Fires ContextStart to all registered event handlers.
- /// </summary>
- public void Start()
- {
- IContextStart contextStart = new ContextStartImpl(Id);
-
- // TODO: enable
- //foreach (IObserver<IContextStart> startHandler in _contextStartHandlers)
- //{
- // startHandler.OnNext(contextStart);
- //}
- }
-
- /// <summary>
- /// Fires ContextStop to all registered event handlers.
- /// </summary>
- public void Close()
- {
- //IContextStop contextStop = new ContextStopImpl(Id);
- //foreach (IObserver<IContextStop> startHandler in _contextStopHandlers)
- //{
- // startHandler.OnNext(contextStop);
- //}
- }
-
- public void HandleContextMessage(byte[] message)
- {
- //contextMessageHandler.onNext(message);
- }
-
- /// <summary>
- /// get the set of ContextMessageSources configured
- /// </summary>
- /// <returns>(a shallow copy of) the set of ContextMessageSources configured.</returns>
- public HashSet<IContextMessageSource> GetContextMessageSources()
- {
- return new HashSet<IContextMessageSource>(_contextMessageSources);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs
deleted file mode 100644
index 15b09b9..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs
+++ /dev/null
@@ -1,362 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Common.Evaluator.Context;
-using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
-using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
-using Org.Apache.Reef.Common.Task;
-using Org.Apache.Reef.Evaluator;
-using Org.Apache.Reef.Services;
-using Org.Apache.Reef.Tasks;
-using Org.Apache.Reef.Utilities;
-using Org.Apache.Reef.Utilities.Logging;
-using System;
-using System.Collections.Generic;
-using System.Collections.ObjectModel;
-using System.Globalization;
-using System.Linq;
-
-namespace Org.Apache.Reef.Common.Context
-{
- public class ContextManager : IDisposable
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextManager));
-
- private readonly Stack<ContextRuntime> _contextStack = new Stack<ContextRuntime>();
-
- private readonly HeartBeatManager _heartBeatManager;
-
- private RootContextLauncher _rootContextLauncher;
-
- public ContextManager(HeartBeatManager heartBeatManager, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig)
- {
- using (LOGGER.LogFunction("ContextManager::ContextManager"))
- {
- _heartBeatManager = heartBeatManager;
- _rootContextLauncher = new RootContextLauncher(_heartBeatManager.EvaluatorSettings.RootContextConfig, rootServiceConfig, rootTaskConfig);
- }
- }
-
- /// <summary>
- /// Start the context manager. This initiates the root context.
- /// </summary>
- public void Start()
- {
- lock (_contextStack)
- {
- ContextRuntime rootContext = _rootContextLauncher.GetRootContext();
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id {0}", rootContext.Id));
- _contextStack.Push(rootContext);
-
- if (_rootContextLauncher.RootTaskConfig.IsPresent())
- {
- LOGGER.Log(Level.Info, "Launching the initial Task");
- try
- {
- _contextStack.Peek().StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.RootContextConfig.Id, _heartBeatManager);
- }
- catch (TaskClientCodeException e)
- {
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Exception when trying to start a task.", LOGGER);
- HandleTaskException(e);
- }
- }
- }
- }
-
- public bool ContextStackIsEmpty()
- {
- lock (_contextStack)
- {
- return (_contextStack.Count == 0);
- }
- }
-
- // TODO: codes here are slightly different from java since the protobuf.net does not generate the HasXXX method, may want to switch to proto-port later
-
- /// <summary>
- /// Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts.
- /// This also triggers the HeartBeatManager to send a heartbeat with the result of this operation.
- /// </summary>
- /// <param name="controlMessage"></param>
- public void HandleTaskControl(ContextControlProto controlMessage)
- {
- try
- {
- byte[] message = controlMessage.task_message;
- if (controlMessage.add_context != null && controlMessage.remove_context != null)
- {
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Received a message with both add and remove context. This is unsupported."), LOGGER);
- }
- if (controlMessage.add_context != null)
- {
- LOGGER.Log(Level.Info, "AddContext");
- AddContext(controlMessage.add_context);
- // support submitContextAndTask()
- if (controlMessage.start_task != null)
- {
- LOGGER.Log(Level.Info, "StartTask");
- StartTask(controlMessage.start_task);
- }
- else
- {
- // We need to trigger a heartbeat here. In other cases, the heartbeat will be triggered by the TaskRuntime
- // Therefore this call can not go into addContext
- LOGGER.Log(Level.Info, "Trigger Heartbeat");
- _heartBeatManager.OnNext();
- }
- }
- else if (controlMessage.remove_context != null)
- {
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "RemoveContext with id {0}", controlMessage.remove_context.context_id));
- RemoveContext(controlMessage.remove_context.context_id);
- }
- else if (controlMessage.start_task != null)
- {
- LOGGER.Log(Level.Info, "StartTask only");
- StartTask(controlMessage.start_task);
- }
- else if (controlMessage.stop_task != null)
- {
- LOGGER.Log(Level.Info, "CloseTask");
- _contextStack.Peek().CloseTask(message);
- }
- else if (controlMessage.suspend_task != null)
- {
- LOGGER.Log(Level.Info, "SuspendTask");
- _contextStack.Peek().SuspendTask(message);
- }
- else if (controlMessage.task_message != null)
- {
- LOGGER.Log(Level.Info, "DeliverTaskMessage");
- _contextStack.Peek().DeliverTaskMessage(message);
- }
- else if (controlMessage.context_message != null)
- {
- LOGGER.Log(Level.Info, "Handle context contol message");
- ContextMessageProto contextMessageProto = controlMessage.context_message;
- bool deliveredMessage = false;
- foreach (ContextRuntime context in _contextStack)
- {
- if (context.Id.Equals(contextMessageProto.context_id))
- {
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Handle context message {0}", controlMessage.context_message.message));
- context.HandleContextMessaage(controlMessage.context_message.message);
- deliveredMessage = true;
- break;
- }
- }
- if (!deliveredMessage)
- {
- InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent message to unknown context {0}", contextMessageProto.context_id));
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
- }
- }
- else
- {
- InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown task control message: {0}", controlMessage.ToString()));
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
- }
- }
- catch (Exception e)
- {
- if (e is TaskClientCodeException)
- {
- HandleTaskException((TaskClientCodeException)e);
- }
- else if (e is ContextClientCodeException)
- {
- HandlContextException((ContextClientCodeException)e);
- }
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, LOGGER);
- }
- }
-
- /// <summary>
- /// Get TaskStatusProto of the currently running task, if there is any
- /// </summary>
- /// <returns>the TaskStatusProto of the currently running task, if there is any</returns>
- public Optional<TaskStatusProto> GetTaskStatus()
- {
- if (_contextStack.Count == 0)
- {
- return Optional<TaskStatusProto>.Empty();
-
- //throw new InvalidOperationException("Asked for an Task status while there isn't even a context running.");
- }
- return _contextStack.Peek().GetTaskStatus();
- }
-
- /// <summary>
- /// get status of all contexts in the stack.
- /// </summary>
- /// <returns>the status of all contexts in the stack.</returns>
- public ICollection<ContextStatusProto> GetContextStatusCollection()
- {
- ICollection<ContextStatusProto> result = new Collection<ContextStatusProto>();
- foreach (ContextRuntime runtime in _contextStack)
- {
- ContextStatusProto contextStatusProto = runtime.GetContextStatus();
- LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", contextStatusProto));
- result.Add(contextStatusProto);
- }
- return result;
- }
-
- /// <summary>
- /// Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack,
- /// starting at the top.
- /// </summary>
- public void Dispose()
- {
- lock (_contextStack)
- {
- if (_contextStack != null && _contextStack.Any())
- {
- LOGGER.Log(Level.Info, "context stack not empty, forcefully closing context runtime.");
- ContextRuntime runtime = _contextStack.Last();
- if (runtime != null)
- {
- runtime.Dispose();
- }
- }
- }
- }
-
- /// <summary>
- /// Add a context to the stack.
- /// </summary>
- /// <param name="addContextProto"></param>
- private void AddContext(AddContextProto addContextProto)
- {
- lock (_contextStack)
- {
- ContextRuntime currentTopContext = _contextStack.Peek();
- if (!currentTopContext.Id.Equals(addContextProto.parent_context_id, StringComparison.OrdinalIgnoreCase))
- {
- var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to instantiate a child context on context with id '{0}' while the current top context id is {1}",
- addContextProto.parent_context_id,
- currentTopContext.Id));
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
- }
- string contextConfigString = addContextProto.context_configuration;
- ContextConfiguration contextConfiguration = new ContextConfiguration(contextConfigString);
- ContextRuntime newTopContext;
- if (addContextProto.service_configuration != null)
- {
- ServiceConfiguration serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration);
- newTopContext = currentTopContext.SpawnChildContext(contextConfiguration, serviceConfiguration.TangConfig);
- }
- else
- {
- newTopContext = currentTopContext.SpawnChildContext(contextConfiguration);
- }
- _contextStack.Push(newTopContext);
- }
- }
-
- /// <summary>
- /// Remove the context with the given ID from the stack.
- /// </summary>
- /// <param name="contextId"> context id</param>
- private void RemoveContext(string contextId)
- {
- lock (_contextStack)
- {
- string currentTopContextId = _contextStack.Peek().Id;
- if (!contextId.Equals(_contextStack.Peek().Id, StringComparison.OrdinalIgnoreCase))
- {
- var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to close context with id '{0}' while the top context id is {1}", contextId, currentTopContextId));
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
- }
- _contextStack.Peek().Dispose();
- if (_contextStack.Count > 1)
- {
- // We did not close the root context. Therefore, we need to inform the
- // driver explicitly that this context is closed. The root context notification
- // is implicit in the Evaluator close/done notification.
- _heartBeatManager.OnNext(); // Ensure Driver gets notified of context DONE state
- }
- _contextStack.Pop();
- }
- // System.gc(); // TODO: garbage collect?
- }
-
- /// <summary>
- /// Launch an Task.
- /// </summary>
- /// <param name="startTaskProto"></param>
- private void StartTask(StartTaskProto startTaskProto)
- {
- lock (_contextStack)
- {
- ContextRuntime currentActiveContext = _contextStack.Peek();
- string expectedContextId = startTaskProto.context_id;
- if (!expectedContextId.Equals(currentActiveContext.Id, StringComparison.OrdinalIgnoreCase))
- {
- var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id));
- Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
- }
- TaskConfiguration taskConfiguration = new TaskConfiguration(startTaskProto.configuration);
- currentActiveContext.StartTask(taskConfiguration, expectedContextId, _heartBeatManager);
- }
- }
-
- /// <summary>
- /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
- /// </summary>
- /// <param name="e"></param>
- private void HandleTaskException(TaskClientCodeException e)
- {
- LOGGER.Log(Level.Error, "TaskClientCodeException", e);
- byte[] exception = ByteUtilities.StringToByteArrays(e.ToString());
- TaskStatusProto taskStatus = new TaskStatusProto()
- {
- context_id = e.ContextId,
- task_id = e.TaskId,
- result = exception,
- state = State.FAILED
- };
- LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeatb for a failed task: {0}", taskStatus.ToString()));
- _heartBeatManager.OnNext(taskStatus);
- }
-
- /// <summary>
- /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
- /// </summary>
- /// <param name="e"></param>
- private void HandlContextException(ContextClientCodeException e)
- {
- LOGGER.Log(Level.Error, "ContextClientCodeException", e);
- byte[] exception = ByteUtilities.StringToByteArrays(e.ToString());
- ContextStatusProto contextStatusProto = new ContextStatusProto()
- {
- context_id = e.ContextId,
- context_state = ContextStatusProto.State.FAIL,
- error = exception
- };
- if (e.ParentId.IsPresent())
- {
- contextStatusProto.parent_id = e.ParentId.Value;
- }
- LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeat for a failed context: {0}", contextStatusProto.ToString()));
- _heartBeatManager.OnNext(contextStatusProto);
- }
- }
-}
\ No newline at end of file