You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2016/12/02 22:00:02 UTC

reef git commit: [REEF-1679] Evaluator shouldn't go to recovery mode if there is no reconnect logic provided

Repository: reef
Updated Branches:
  refs/heads/master c0ddcc9c9 -> dd14f0910


[REEF-1679] Evaluator shouldn't go to recovery mode if there is no reconnect logic provided

If there is no IDriverConnection bound, instead of throwing exception after a few quick retries,
we will increase the retry number and finally exit the evaluator if it reaches max retry limit.
If there is a IDriverConnection implemented, we will increase the max retry number
since network glitch could happen for more than a few seconds.

JIRA:
  [REEF-1679](https://issues.apache.org/jira/browse/REEF-1679)

Pull request:
  This closes #1193


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/dd14f091
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/dd14f091
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/dd14f091

Branch: refs/heads/master
Commit: dd14f091050b9e8f22cff3627a8be686fd91350e
Parents: c0ddcc9
Author: Julia Wang <jw...@yahoo.com>
Authored: Tue Nov 29 18:34:38 2016 -0800
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Fri Dec 2 13:59:23 2016 -0800

----------------------------------------------------------------------
 .../Org.Apache.REEF.Common.csproj               |  1 +
 .../Runtime/Evaluator/EvaluatorSettings.cs      | 18 ++++-
 .../Runtime/Evaluator/HeartBeatManager.cs       | 70 ++++++++++++++------
 .../Evaluator/Parameters/HeartbeatMaxRetry.cs   |  2 +-
 .../HeartbeatMaxRetryForNonRecoveryMode.cs      | 26 ++++++++
 5 files changed, 94 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 8399e43..936bc01 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -199,6 +199,7 @@ under the License.
     <Compile Include="Runtime\Evaluator\HeartBeatManager.cs" />
     <Compile Include="Runtime\Evaluator\IHeartBeatManager.cs" />
     <Compile Include="Runtime\Evaluator\Parameters\EvaluatorHeartbeatPeriodInMs.cs" />
+    <Compile Include="Runtime\Evaluator\Parameters\HeartbeatMaxRetryForNonRecoveryMode.cs" />
     <Compile Include="Runtime\Evaluator\Parameters\HeartbeatMaxRetry.cs" />
     <Compile Include="Runtime\Evaluator\PIDStoreHandler.cs" />
     <Compile Include="Runtime\Evaluator\ReefMessageProtoObserver.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs
index 126ce02..cc58a5a 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs
@@ -35,6 +35,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
         private readonly string _evaluatorId;
         private readonly int _heartBeatPeriodInMs;
         private readonly int _maxHeartbeatRetries;
+        private readonly int _maxHeartbeatRetriesForNonrecoveryMode;
         private readonly IClock _clock;
         private readonly IRemoteManager<REEFMessage> _remoteManager;
 
@@ -45,6 +46,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
         /// <param name="evaluatorId"></param>
         /// <param name="heartbeatPeriodInMs"></param>
         /// <param name="maxHeartbeatRetries"></param>
+        /// <param name="maxHeartbeatRetriesForNonRecoveryMode">Max retry number for non HA mode</param>
         /// <param name="clock"></param>
         /// <param name="remoteManagerFactory"></param>
         /// <param name="reefMessageCodec"></param>
@@ -54,10 +56,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
             [Parameter(typeof(EvaluatorIdentifier))] string evaluatorId,
             [Parameter(typeof(EvaluatorHeartbeatPeriodInMs))] int heartbeatPeriodInMs,
             [Parameter(typeof(HeartbeatMaxRetry))] int maxHeartbeatRetries,
+            [Parameter(typeof(HeartbeatMaxRetryForNonRecoveryMode))] int maxHeartbeatRetriesForNonRecoveryMode,
             IClock clock,
             IRemoteManagerFactory remoteManagerFactory,
             REEFMessageCodec reefMessageCodec) :
-            this(applicationId, evaluatorId, heartbeatPeriodInMs, maxHeartbeatRetries, 
+            this(applicationId, evaluatorId, heartbeatPeriodInMs, maxHeartbeatRetries, maxHeartbeatRetriesForNonRecoveryMode,
             clock, remoteManagerFactory, reefMessageCodec, null)
         {
         }
@@ -68,6 +71,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
             [Parameter(typeof(EvaluatorIdentifier))] string evaluatorId,
             [Parameter(typeof(EvaluatorHeartbeatPeriodInMs))] int heartbeatPeriodInMs,
             [Parameter(typeof(HeartbeatMaxRetry))] int maxHeartbeatRetries,
+            [Parameter(typeof(HeartbeatMaxRetryForNonRecoveryMode))] int maxHeartbeatRetriesForNonRecoveryMode,
             IClock clock,
             IRemoteManagerFactory remoteManagerFactory,
             REEFMessageCodec reefMessageCodec,
@@ -77,6 +81,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
             _evaluatorId = evaluatorId;
             _heartBeatPeriodInMs = heartbeatPeriodInMs;
             _maxHeartbeatRetries = maxHeartbeatRetries;
+            _maxHeartbeatRetriesForNonrecoveryMode = maxHeartbeatRetriesForNonRecoveryMode;
             _clock = clock;
 
             _remoteManager = remoteManagerFactory.GetInstance(reefMessageCodec);
@@ -134,6 +139,17 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
         }
 
         /// <summary>
+        /// Return MaxHeartbeatRetriesForNonrecoveryMode from NamedParameter
+        /// </summary>
+        public int MaxHeartbeatRetriesForNonRecoveryMode
+        {
+            get
+            {
+                return _maxHeartbeatRetriesForNonrecoveryMode;
+            }
+        }
+
+        /// <summary>
         /// return Runtime Clock injected from the constructor
         /// </summary>
         public IClock RuntimeClock

http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
index 0f3fbd6..889c67c 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
@@ -29,7 +29,6 @@ using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
-using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
@@ -55,6 +54,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
 
         private readonly int _maxHeartbeatRetries = 0;
 
+        private readonly int _maxHeartbeatRetriesForNonRecoveryMode = 0;
+
         private IRemoteIdentifier _remoteId;
 
         private IObserver<REEFMessage> _observer;
@@ -95,6 +96,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                 _clock = settings.RuntimeClock;
                 _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs;
                 _maxHeartbeatRetries = settings.MaxHeartbeatRetries;
+                _maxHeartbeatRetriesForNonRecoveryMode = settings.MaxHeartbeatRetriesForNonRecoveryMode;
                 _driverConnection = driverConnection;
                 MachineStatus.ToString(); // kick start the CPU perf counter
             }
@@ -152,7 +154,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                 try
                 {
                     _observer.OnNext(payload);
-                    _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures
+                    _heartbeatFailures = 0; // reset failure counts if we are having intermittent (not continuous) failures
                 }
                 catch (Exception e)
                 {
@@ -166,17 +168,35 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                     _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
                     LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e);
 
-                    if (_heartbeatFailures >= _maxHeartbeatRetries)
+                    if (_driverConnection.Get() is MissingDriverConnection)
                     {
-                        LOGGER.Log(Level.Warning, "Heartbeat communications to driver reached max of {0} failures. Driver is considered dead/unreachable", _heartbeatFailures);
-                        LOGGER.Log(Level.Info, "=========== Entering RECOVERY mode. ===========");
-                        ContextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Disconnected));
-
-                        LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection);
-                        _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY;
-
-                        // clean heartbeat failure
-                        _heartbeatFailures = 0;
+                        if (_heartbeatFailures >= _maxHeartbeatRetriesForNonRecoveryMode)
+                        {
+                            var msg =
+                                string.Format(CultureInfo.InvariantCulture,
+                                    "Have encountered {0} heartbeat failures. Limit of heartbeat sending failures exceeded. Driver reconnect logic is not implemented, failing evaluator.",
+                                    _heartbeatFailures);
+                            LOGGER.Log(Level.Error, msg);
+                            throw new ReefRuntimeException(msg, e);
+                        }
+                    }
+                    else
+                    {
+                        if (_heartbeatFailures >= _maxHeartbeatRetries)
+                        {
+                            LOGGER.Log(Level.Warning,
+                                "Heartbeat communications to driver reached max of {0} failures. Driver is considered dead/unreachable",
+                                _heartbeatFailures);
+                            LOGGER.Log(Level.Info, "Entering RECOVERY mode!!!");
+                            ContextManager.HandleDriverConnectionMessage(
+                                new DriverConnectionMessageImpl(DriverConnectionState.Disconnected));
+
+                            LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection);
+                            _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY;
+
+                            // clean heartbeat failure
+                            _heartbeatFailures = 0;
+                        }
                     }
                 }
             }     
@@ -284,22 +304,30 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                     if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY)
                     {
                         var driverConnection = _driverConnection.Get();
-
                         try
                         {
                             var driverInformation = driverConnection.GetDriverInformation();
                             if (driverInformation == null)
                             {
-                                LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later.");
+                                LOGGER.Log(Level.Verbose,
+                                    "In RECOVERY mode, cannot retrieve driver information, will try again later.");
                             }
                             else
                             {
-                                LOGGER.Log(
-                                    Level.Info,
-                                    string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId));
+                                var msg = string.Format(CultureInfo.InvariantCulture,
+                                        "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection",
+                                        driverInformation.DriverStartTime,
+                                        driverInformation.DriverRemoteIdentifier,
+                                        driverInformation.NameServerId);
+                                LOGGER.Log(Level.Info, msg);
                                 Recover(driverInformation);
                             }
                         }
+                        catch (NotImplementedException)
+                        {
+                            LOGGER.Log(Level.Error, "Reaching EvaluatorOperation RECOVERY mode, however, there is no IDriverConnection implemented for HA.");
+                            throw;
+                        }
                         catch (Exception e)
                         {
                             // we do not want any exception to stop the query for driver status
@@ -329,8 +357,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
 
         private static long CurrentTimeMilliSeconds()
         {
-            // this is an implmenation to get current time milli second counted from Jan 1st, 1970
-            // it is chose as such to be compatible with java implmentation
+            // this is an implementation to get current time in milliseconds counted from Jan 1st, 1970
+            // it is chosen as such to be compatible with Java implementation
             DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
             return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds;
         }
@@ -367,7 +395,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                     {
                         if (firstHeartbeatInQueue)
                         {
-                            // first heartbeat is specially construted to include the recovery flag
+                            // first heartbeat is specially constructed to include the recovery flag
                             EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue());
                             LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat);
                             _observer.OnNext(new REEFMessage(recoveryHeartbeat));
@@ -394,7 +422,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
             _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL;
             ContextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Reconnected));
 
-            LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ===========");
+            LOGGER.Log(Level.Info, "Exiting RECOVERY mode!!!");
         }
 
         private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat)

http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs
index 2211a3e..bd836db 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs
@@ -19,7 +19,7 @@ using Org.Apache.REEF.Tang.Annotations;
 
 namespace Org.Apache.REEF.Common.Runtime.Evaluator.Parameters
 {
-    [NamedParameter(Documentation = "Heartbeat Max Retry", ShortName = "HeartbeatMaxRetry", DefaultValue = "3")]
+    [NamedParameter(Documentation = "Max number of retries for sending heartbeat to driver before evaluator enters recovery mode to reconnect with driver.", ShortName = "HeartbeatMaxRetry", DefaultValue = "10")]
     internal sealed class HeartbeatMaxRetry : Name<int>
     {
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetryForNonRecoveryMode.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetryForNonRecoveryMode.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetryForNonRecoveryMode.cs
new file mode 100644
index 0000000..b0bc06c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetryForNonRecoveryMode.cs
@@ -0,0 +1,26 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Parameters
+{
+    [NamedParameter(Documentation = "Max number of retries for sending heartbeat to driver if driver reconnection logic is not implemented.", ShortName = "HeartbeatMaxRetryForNonRecovery", DefaultValue = "60")]
+    internal sealed class HeartbeatMaxRetryForNonRecoveryMode : Name<int>
+    {
+    }
+}