You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2016/12/02 22:00:02 UTC
reef git commit: [REEF-1679] Evaluator shouldn't go to recovery mode
if there is no reconnect logic provided
Repository: reef
Updated Branches:
refs/heads/master c0ddcc9c9 -> dd14f0910
[REEF-1679] Evaluator shouldn't go to recovery mode if there is no reconnect logic provided
If there is no IDriverConnection bound, instead of throwing exception after a few quick retries,
we will increase the retry number and finally exit the evaluator if it reaches max retry limit.
If there is a IDriverConnection implemented, we will increase the max retry number
since network glitch could happen for more than a few seconds.
JIRA:
[REEF-1679](https://issues.apache.org/jira/browse/REEF-1679)
Pull request:
This closes #1193
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/dd14f091
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/dd14f091
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/dd14f091
Branch: refs/heads/master
Commit: dd14f091050b9e8f22cff3627a8be686fd91350e
Parents: c0ddcc9
Author: Julia Wang <jw...@yahoo.com>
Authored: Tue Nov 29 18:34:38 2016 -0800
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Fri Dec 2 13:59:23 2016 -0800
----------------------------------------------------------------------
.../Org.Apache.REEF.Common.csproj | 1 +
.../Runtime/Evaluator/EvaluatorSettings.cs | 18 ++++-
.../Runtime/Evaluator/HeartBeatManager.cs | 70 ++++++++++++++------
.../Evaluator/Parameters/HeartbeatMaxRetry.cs | 2 +-
.../HeartbeatMaxRetryForNonRecoveryMode.cs | 26 ++++++++
5 files changed, 94 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 8399e43..936bc01 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -199,6 +199,7 @@ under the License.
<Compile Include="Runtime\Evaluator\HeartBeatManager.cs" />
<Compile Include="Runtime\Evaluator\IHeartBeatManager.cs" />
<Compile Include="Runtime\Evaluator\Parameters\EvaluatorHeartbeatPeriodInMs.cs" />
+ <Compile Include="Runtime\Evaluator\Parameters\HeartbeatMaxRetryForNonRecoveryMode.cs" />
<Compile Include="Runtime\Evaluator\Parameters\HeartbeatMaxRetry.cs" />
<Compile Include="Runtime\Evaluator\PIDStoreHandler.cs" />
<Compile Include="Runtime\Evaluator\ReefMessageProtoObserver.cs" />
http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs
index 126ce02..cc58a5a 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs
@@ -35,6 +35,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
private readonly string _evaluatorId;
private readonly int _heartBeatPeriodInMs;
private readonly int _maxHeartbeatRetries;
+ private readonly int _maxHeartbeatRetriesForNonrecoveryMode;
private readonly IClock _clock;
private readonly IRemoteManager<REEFMessage> _remoteManager;
@@ -45,6 +46,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
/// <param name="evaluatorId"></param>
/// <param name="heartbeatPeriodInMs"></param>
/// <param name="maxHeartbeatRetries"></param>
+ /// <param name="maxHeartbeatRetriesForNonRecoveryMode">Max retry number for non HA mode</param>
/// <param name="clock"></param>
/// <param name="remoteManagerFactory"></param>
/// <param name="reefMessageCodec"></param>
@@ -54,10 +56,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
[Parameter(typeof(EvaluatorIdentifier))] string evaluatorId,
[Parameter(typeof(EvaluatorHeartbeatPeriodInMs))] int heartbeatPeriodInMs,
[Parameter(typeof(HeartbeatMaxRetry))] int maxHeartbeatRetries,
+ [Parameter(typeof(HeartbeatMaxRetryForNonRecoveryMode))] int maxHeartbeatRetriesForNonRecoveryMode,
IClock clock,
IRemoteManagerFactory remoteManagerFactory,
REEFMessageCodec reefMessageCodec) :
- this(applicationId, evaluatorId, heartbeatPeriodInMs, maxHeartbeatRetries,
+ this(applicationId, evaluatorId, heartbeatPeriodInMs, maxHeartbeatRetries, maxHeartbeatRetriesForNonRecoveryMode,
clock, remoteManagerFactory, reefMessageCodec, null)
{
}
@@ -68,6 +71,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
[Parameter(typeof(EvaluatorIdentifier))] string evaluatorId,
[Parameter(typeof(EvaluatorHeartbeatPeriodInMs))] int heartbeatPeriodInMs,
[Parameter(typeof(HeartbeatMaxRetry))] int maxHeartbeatRetries,
+ [Parameter(typeof(HeartbeatMaxRetryForNonRecoveryMode))] int maxHeartbeatRetriesForNonRecoveryMode,
IClock clock,
IRemoteManagerFactory remoteManagerFactory,
REEFMessageCodec reefMessageCodec,
@@ -77,6 +81,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
_evaluatorId = evaluatorId;
_heartBeatPeriodInMs = heartbeatPeriodInMs;
_maxHeartbeatRetries = maxHeartbeatRetries;
+ _maxHeartbeatRetriesForNonrecoveryMode = maxHeartbeatRetriesForNonRecoveryMode;
_clock = clock;
_remoteManager = remoteManagerFactory.GetInstance(reefMessageCodec);
@@ -134,6 +139,17 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
}
/// <summary>
+ /// Return MaxHeartbeatRetriesForNonrecoveryMode from NamedParameter
+ /// </summary>
+ public int MaxHeartbeatRetriesForNonRecoveryMode
+ {
+ get
+ {
+ return _maxHeartbeatRetriesForNonrecoveryMode;
+ }
+ }
+
+ /// <summary>
/// return Runtime Clock injected from the constructor
/// </summary>
public IClock RuntimeClock
http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
index 0f3fbd6..889c67c 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
@@ -29,7 +29,6 @@ using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
-using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Logging;
@@ -55,6 +54,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
private readonly int _maxHeartbeatRetries = 0;
+ private readonly int _maxHeartbeatRetriesForNonRecoveryMode = 0;
+
private IRemoteIdentifier _remoteId;
private IObserver<REEFMessage> _observer;
@@ -95,6 +96,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
_clock = settings.RuntimeClock;
_heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs;
_maxHeartbeatRetries = settings.MaxHeartbeatRetries;
+ _maxHeartbeatRetriesForNonRecoveryMode = settings.MaxHeartbeatRetriesForNonRecoveryMode;
_driverConnection = driverConnection;
MachineStatus.ToString(); // kick start the CPU perf counter
}
@@ -152,7 +154,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
try
{
_observer.OnNext(payload);
- _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures
+ _heartbeatFailures = 0; // reset failure counts if we are having intermittent (not continuous) failures
}
catch (Exception e)
{
@@ -166,17 +168,35 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
_queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e);
- if (_heartbeatFailures >= _maxHeartbeatRetries)
+ if (_driverConnection.Get() is MissingDriverConnection)
{
- LOGGER.Log(Level.Warning, "Heartbeat communications to driver reached max of {0} failures. Driver is considered dead/unreachable", _heartbeatFailures);
- LOGGER.Log(Level.Info, "=========== Entering RECOVERY mode. ===========");
- ContextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Disconnected));
-
- LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection);
- _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY;
-
- // clean heartbeat failure
- _heartbeatFailures = 0;
+ if (_heartbeatFailures >= _maxHeartbeatRetriesForNonRecoveryMode)
+ {
+ var msg =
+ string.Format(CultureInfo.InvariantCulture,
+ "Have encountered {0} heartbeat failures. Limit of heartbeat sending failures exceeded. Driver reconnect logic is not implemented, failing evaluator.",
+ _heartbeatFailures);
+ LOGGER.Log(Level.Error, msg);
+ throw new ReefRuntimeException(msg, e);
+ }
+ }
+ else
+ {
+ if (_heartbeatFailures >= _maxHeartbeatRetries)
+ {
+ LOGGER.Log(Level.Warning,
+ "Heartbeat communications to driver reached max of {0} failures. Driver is considered dead/unreachable",
+ _heartbeatFailures);
+ LOGGER.Log(Level.Info, "Entering RECOVERY mode!!!");
+ ContextManager.HandleDriverConnectionMessage(
+ new DriverConnectionMessageImpl(DriverConnectionState.Disconnected));
+
+ LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection);
+ _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY;
+
+ // clean heartbeat failure
+ _heartbeatFailures = 0;
+ }
}
}
}
@@ -284,22 +304,30 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY)
{
var driverConnection = _driverConnection.Get();
-
try
{
var driverInformation = driverConnection.GetDriverInformation();
if (driverInformation == null)
{
- LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later.");
+ LOGGER.Log(Level.Verbose,
+ "In RECOVERY mode, cannot retrieve driver information, will try again later.");
}
else
{
- LOGGER.Log(
- Level.Info,
- string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId));
+ var msg = string.Format(CultureInfo.InvariantCulture,
+ "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection",
+ driverInformation.DriverStartTime,
+ driverInformation.DriverRemoteIdentifier,
+ driverInformation.NameServerId);
+ LOGGER.Log(Level.Info, msg);
Recover(driverInformation);
}
}
+ catch (NotImplementedException)
+ {
+ LOGGER.Log(Level.Error, "Reaching EvaluatorOperation RECOVERY mode, however, there is no IDriverConnection implemented for HA.");
+ throw;
+ }
catch (Exception e)
{
// we do not want any exception to stop the query for driver status
@@ -329,8 +357,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
private static long CurrentTimeMilliSeconds()
{
- // this is an implmenation to get current time milli second counted from Jan 1st, 1970
- // it is chose as such to be compatible with java implmentation
+ // this is an implementation to get current time in milliseconds counted from Jan 1st, 1970
+ // it is chosen as such to be compatible with Java implementation
DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds;
}
@@ -367,7 +395,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
{
if (firstHeartbeatInQueue)
{
- // first heartbeat is specially construted to include the recovery flag
+ // first heartbeat is specially constructed to include the recovery flag
EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue());
LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat);
_observer.OnNext(new REEFMessage(recoveryHeartbeat));
@@ -394,7 +422,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
_evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL;
ContextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Reconnected));
- LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ===========");
+ LOGGER.Log(Level.Info, "Exiting RECOVERY mode!!!");
}
private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat)
http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs
index 2211a3e..bd836db 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs
@@ -19,7 +19,7 @@ using Org.Apache.REEF.Tang.Annotations;
namespace Org.Apache.REEF.Common.Runtime.Evaluator.Parameters
{
- [NamedParameter(Documentation = "Heartbeat Max Retry", ShortName = "HeartbeatMaxRetry", DefaultValue = "3")]
+ [NamedParameter(Documentation = "Max number of retries for sending heartbeat to driver before evaluator enters recovery mode to reconnect with driver.", ShortName = "HeartbeatMaxRetry", DefaultValue = "10")]
internal sealed class HeartbeatMaxRetry : Name<int>
{
}
http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetryForNonRecoveryMode.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetryForNonRecoveryMode.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetryForNonRecoveryMode.cs
new file mode 100644
index 0000000..b0bc06c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetryForNonRecoveryMode.cs
@@ -0,0 +1,26 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Parameters
+{
+ [NamedParameter(Documentation = "Max number of retries for sending heartbeat to driver if driver reconnection logic is not implemented.", ShortName = "HeartbeatMaxRetryForNonRecovery", DefaultValue = "60")]
+ internal sealed class HeartbeatMaxRetryForNonRecoveryMode : Name<int>
+ {
+ }
+}