You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/11/06 11:51:20 UTC

[25/30] ode git commit: Cluster Enabled Simple Scheduler-2

Cluster Enabled Simple Scheduler-2


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/3f5ef53a
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/3f5ef53a
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/3f5ef53a

Branch: refs/heads/ODE-563
Commit: 3f5ef53ab9f248d3f443196bd96de6507ad94148
Parents: 15f1883
Author: suba <su...@cse.mrt.ac.lk>
Authored: Tue Jul 21 00:21:05 2015 +0530
Committer: suba <su...@cse.mrt.ac.lk>
Committed: Tue Jul 21 00:21:05 2015 +0530

----------------------------------------------------------------------
 Rakefile                                        |   2 +-
 .../java/org/apache/ode/axis2/ODEServer.java    |   5 +-
 .../apache/ode/bpel/clapi/ClusterManager.java   |  17 ++
 .../ode/bpel/clapi/ClusterMemberListener.java   |  29 +++
 .../hazelcast/HazelcastClusterImpl.java         |  36 ++--
 .../ode/scheduler/simple/SchedulerListener.java |  27 ---
 .../ode/scheduler/simple/SimpleScheduler.java   | 185 +++++++++----------
 7 files changed, 157 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/Rakefile
----------------------------------------------------------------------
diff --git a/Rakefile b/Rakefile
index 7c0fa67..5475227 100644
--- a/Rakefile
+++ b/Rakefile
@@ -208,7 +208,7 @@ define "ode" do
 
   desc "ODE Clustering"
    define "clustering" do
-     compile.with projects("bpel-api","bpel-store"),HAZELCAST, COMMONS.logging
+     compile.with projects("bpel-api","bpel-store","scheduler-simple"),HAZELCAST, COMMONS.logging
      package :jar
    end
 

http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index 6803350..222fedd 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -50,7 +50,6 @@ import org.apache.ode.store.ClusterProcessStoreImpl;
 import org.apache.ode.store.ProcessStoreImpl;
 import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.fs.TempFileManager;
-import org.omg.CORBA.StringHolder;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
@@ -198,7 +197,7 @@ public class ODEServer {
         _store.loadAll();
         if (_clusterManager != null) {
             _clusterManager.registerClusterProcessStoreMessageListener();
-            _clusterManager.setScheduler(_scheduler);
+            _clusterManager.registerClusterMemberListener(_scheduler);
         }
 
         try {
@@ -489,7 +488,7 @@ public class ODEServer {
             Class<?> clusterImplClass = this.getClass().getClassLoader().loadClass(clusterImplName);
             _clusterManager = (ClusterManager) clusterImplClass.newInstance();
         } catch (Exception ex) {
-            __log.error("Error while loading class : " +clusterImplName ,ex);
+            __log.error("Error while loading class : " + clusterImplName, ex);
         }
         _clusterManager.init(_configRoot);
     }

http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
index 70d7c03..a00959a 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -19,6 +19,7 @@
 package org.apache.ode.bpel.clapi;
 
 import java.io.File;
+import java.util.List;
 
 public interface ClusterManager {
 
@@ -57,6 +58,12 @@ public interface ClusterManager {
     void registerClusterProcessStoreMessageListener();
 
     /**
+     * Register Scheduler as ClusterMemberListener
+     * @param scheduler
+     */
+    void registerClusterMemberListener(Object scheduler);
+
+    /**
      * Return deployment lock for cluster
      */
     ClusterLock getDeploymentLock();
@@ -65,4 +72,14 @@ public interface ClusterManager {
      * Return instance lock for cluster
      */
     ClusterLock getInstanceLock();
+
+    /**
+     * Return active node list in the cluster
+     */
+    List<String> getActiveNodes();
+
+    /**
+     * Return local member's uuid in the cluster
+     */
+    String getUuid();
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
new file mode 100644
index 0000000..4225f7d
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+public interface ClusterMemberListener {
+
+    void memberAdded(String nodeId);
+
+    void memberRemoved(String nodeId);
+
+    void memberElectedAsMaster();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index 63a889a..971df3e 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -30,7 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.ode.bpel.clapi.*;
-import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.scheduler.simple.SimpleScheduler;
 
 /**
  * This class implements necessary methods to build the cluster using hazelcast
@@ -47,7 +47,7 @@ public class HazelcastClusterImpl implements ClusterManager {
     private IMap<Long, Long> instance_lock_map;
     private ITopic<ProcessStoreClusterEvent> clusterMessageTopic;
     private ClusterProcessStore _clusterProcessStore;
-    private Scheduler _scheduler;
+    private SimpleScheduler _scheduler;
     private ClusterLock<String> _hazelcastDeploymentLock;
     private ClusterLock<Long> _hazelcastInstanceLock;
 
@@ -77,7 +77,6 @@ public class HazelcastClusterImpl implements ClusterManager {
             nodeID = localMember.getInetSocketAddress().getHostName() +":" +localMember.getInetSocketAddress().getPort();
             uuid = localMember.getUuid();
             __log.info("Registering HZ localMember ID " + nodeID);
-            markAsMaster();
 
             deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK);
             instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK);
@@ -93,15 +92,15 @@ public class HazelcastClusterImpl implements ClusterManager {
         public void memberAdded(MembershipEvent membershipEvent) {
             String nodeId =  membershipEvent.getMember().getUuid();
             __log.info("Member Added " +nodeId);
-            if(isMaster) _simpleScheduler.memberAdded(nodeId);
+            _scheduler.memberAdded(nodeId);
         }
 
         @Override
         public void memberRemoved(MembershipEvent membershipEvent) {
-            String nodeId =  membershipEvent.getMember().getUuid();
-            __log.info("Member Removed " +nodeId);
+            String nodeId = membershipEvent.getMember().getUuid();
+            __log.info("Member Removed " + nodeId);
             markAsMaster();
-            if(isMaster) _simpleScheduler.memberRemoved(nodeId, uuid);
+            _scheduler.memberRemoved(nodeId);
         }
 
         @Override
@@ -149,9 +148,9 @@ public class HazelcastClusterImpl implements ClusterManager {
 
     private void markAsMaster() {
         leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
-        if (leader.localMember()) {
+        if (leader.localMember() && isMaster == false) {
             isMaster = true;
-            _simpleScheduler.setIsMasterNode(true);
+            _scheduler.memberElectedAsMaster();
         }
         __log.info(isMaster);
     }
@@ -168,15 +167,16 @@ public class HazelcastClusterImpl implements ClusterManager {
         _clusterProcessStore = store;
     }
 
-    public void setScheduler(Scheduler scheduler) {
-        _scheduler = scheduler;
-        _scheduler.setClusterManager(this);
-    }
-
     public void registerClusterProcessStoreMessageListener() {
         clusterMessageTopic.addMessageListener(new ClusterMessageListener());
     }
 
+    public void registerClusterMemberListener(Object scheduler) {
+        _scheduler = (SimpleScheduler) scheduler;
+        markAsMaster();
+        _scheduler.setClusterManager(this);
+    }
+
     public void shutdown() {
         if(_hazelcastInstance != null) _hazelcastInstance.getLifecycleService().shutdown();
     }
@@ -189,11 +189,11 @@ public class HazelcastClusterImpl implements ClusterManager {
         return _hazelcastInstanceLock;
     }
 
-    public List<String> getKnownNodes() {
-        List<String> nodesList = new ArrayList<String>();
+    public List<String> getActiveNodes() {
+        List<String> nodeList = new ArrayList<String>();
         for(Member m : _hazelcastInstance.getCluster().getMembers())
-          nodesList.add(m.getUuid()) ;
-        return nodesList;
+          nodeList.add(m.getUuid()) ;
+        return nodeList;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java
deleted file mode 100644
index 3786912..0000000
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ode.scheduler.simple;
-
-public interface SchedulerListener {
-
-    void memberAdded(String nodeId);
-
-    void memberRemoved(String nodeId,String masterId);
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
index 3b6ec4d..a0dbf5a 100644
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
@@ -22,6 +22,7 @@ package org.apache.ode.scheduler.simple;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ClusterMemberListener;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Scheduler;
 
@@ -52,7 +53,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
  *
  */
-public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener {
+public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberListener {
     private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
 
     private static final int DEFAULT_TRANSACTION_TIMEOUT = 60 * 1000;
@@ -102,13 +103,21 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
 
     private boolean _isClusterEnabled;
 
+    private String _masterId;
+
     private ClusterManager _clusterManager;
 
-    /** All the nodes we know about */
-    private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
+    /** All the nodes which are taken from the database*/
+    private CopyOnWriteArraySet<String> _dbNodes = new CopyOnWriteArraySet<String>();
+
+    /** All the stale nodes */
+    private CopyOnWriteArraySet<String> _staleNodes = new CopyOnWriteArraySet<String>();
+
+    /** All the nodes when members are added to the cluster*/
+    private CopyOnWriteArraySet<String> _clusterNodes = new CopyOnWriteArraySet<String>();
 
     /** When we last heard from our nodes. */
-    private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
+    //private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
 
     /** Set of outstanding jobs, i.e., jobs that have been enqueued but not dequeued or dispatched yet.
         Used to avoid cases where a job would be dispatched twice if the server is under high load and
@@ -460,13 +469,15 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
         _processedSinceLastLoadTask.clear();
         _outstandingJobs.clear();
 
-        _knownNodes.clear();
+        _dbNodes.clear();
+        _clusterNodes.clear();
+        _staleNodes.clear();
 
         try {
             execTransaction(new Callable<Void>() {
 
                 public Void call() throws Exception {
-                    _knownNodes.addAll(_db.getNodeIds());
+                    _dbNodes.addAll(_db.getNodeIds());
                     return null;
                 }
 
@@ -475,21 +486,21 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
             __log.error("Error retrieving node list.", ex);
             throw new ContextException("Error retrieving node list.", ex);
         }
+        _clusterNodes.add(_nodeId);
 
         long now = System.currentTimeMillis();
 
         // Pretend we got a heartbeat...
-        for (String s : _knownNodes) _lastHeartBeat.put(s, now);
+        //for (String s : _knownNodes) _lastHeartBeat.put(s, now);
 
         // schedule immediate job loading for now!
         _todo.enqueue(new LoadImmediateTask(now));
 
         // schedule check for stale nodes, make it random so that the nodes don't overlap.
-        if (!_isClusterEnabled)
-            _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+        _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
 
         // do the upgrade sometime (random) in the immediate interval.
-        enqueUpgradeJobsTask(now);
+        _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
 
         _todo.start();
         _running = true;
@@ -517,16 +528,33 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
         _running = false;
     }
 
-    public void memberAdded(final String nodeId) {
-        _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis()+ randomMean(_immediateInterval)));
+    public void memberAdded(String nodeId) {
+        _clusterNodes.add(nodeId);
     }
 
-    public void memberRemoved(final String nodeId, final String masterId) {
-        recoverClusterStaleNodes(nodeId, masterId);
+    public void memberRemoved(String nodeId) {
+        _staleNodes.add(nodeId);
     }
 
-    public void enqueUpgradeJobsTask(long now) {
-        _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+    // Do enqueue CheckStaleNodes and UpgradeJobsTask after a new master is identified.
+    public void memberElectedAsMaster() {
+        _masterId = _nodeId;
+        _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + randomMean(_staleInterval)));
+        _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + randomMean(_immediateInterval)));
+        _dbNodes.clear();
+        try {
+            execTransaction(new Callable<Void>() {
+
+                public Void call() throws Exception {
+                    _dbNodes.addAll(_db.getNodeIds());
+                    return null;
+                }
+
+            });
+        } catch (Exception ex) {
+            __log.error("Error retrieving node list.", ex);
+            throw new ContextException("Error retrieving node list.", ex);
+        }
     }
 
     class RunJob implements Callable<Void> {
@@ -701,7 +729,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
         }
     }
 
-    public void updateHeartBeat(String nodeId) {
+    /*public void updateHeartBeat(String nodeId) {
         if (nodeId == null)
             return;
 
@@ -710,7 +738,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
 
         _lastHeartBeat.put(nodeId, System.currentTimeMillis());
         _knownNodes.add(nodeId);
-    }
+    }*/
 
     boolean doLoadImmediate() {
         __log.debug("LOAD IMMEDIATE started");
@@ -788,10 +816,19 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
 
     boolean doUpgrade() {
         __log.debug("UPGRADE started");
-        final ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
-        // Don't forget about self.
-        knownNodes.add(_nodeId);
-        Collections.sort(knownNodes);
+        final ArrayList<String> activeNodes;
+
+        // for cluster mode
+        if (_isClusterEnabled && _clusterManager.getIsMaster()) {
+            activeNodes = (ArrayList) _clusterManager.getActiveNodes();
+        }
+        //for standalone ODE deployments
+        else {
+            activeNodes = new ArrayList<String>();
+            activeNodes.add(_nodeId);
+        }
+
+        Collections.sort(activeNodes);
 
         // We're going to try to upgrade near future jobs using the db only.
         // We assume that the distribution of the trailing digits in the
@@ -803,9 +840,9 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
             return execTransaction(new Callable<Boolean>() {
 
                 public Boolean call() throws Exception {
-                    int numNodes = knownNodes.size();
+                    int numNodes = activeNodes.size();
                     for (int i = 0; i < numNodes; ++i) {
-                        String node = knownNodes.get(i);
+                        String node = activeNodes.get(i);
                         _db.updateAssignToNode(node, i, numNodes, maxtime);
                     }
                     return true;
@@ -822,41 +859,6 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
 
     }
 
-    boolean doClusterJobsUpgrade() {
-            __log.debug("UPGRADE started for Cluster Mode");
-            final ArrayList<String> knownNodes = _clusterManager.getKnownNodes();
-            Collections.sort(knownNodes);
-
-            // We're going to try to upgrade near future jobs using the db only.
-            // We assume that the distribution of the trailing digits in the
-            // scheduled time are uniformly distributed, and use modular division
-            // of the time by the number of nodes to create the node assignment.
-            // This can be done in a single update statement.
-            final long maxtime = System.currentTimeMillis() + _nearFutureInterval;
-            try {
-                return execTransaction(new Callable<Boolean>() {
-
-                    public Boolean call() throws Exception {
-                        int numNodes = knownNodes.size();
-                        for (int i = 0; i < numNodes; ++i) {
-                            String node = knownNodes.get(i);
-                            _db.updateAssignToNode(node, i, numNodes, maxtime);
-                        }
-                        return true;
-                    }
-
-                });
-
-            } catch (Exception ex) {
-                __log.error("Database error upgrading jobs.", ex);
-                return false;
-            } finally {
-                __log.debug("UPGRADE complete");
-            }
-
-        }
-
-
     /**
      * Re-assign stale node's jobs to self.
      * @param nodeId
@@ -876,10 +878,11 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
                 __log.debug("reassigned " + numrows + " jobs to self. ");
             }
 
-            // We can now forget about this node, if we see it again, it will be
-            // "new to us"
-            _knownNodes.remove(nodeId);
-            _lastHeartBeat.remove(nodeId);
+            if(_isClusterEnabled) _staleNodes.remove(nodeId);
+
+            // If the stale node id is in _clusterNodes or _dbNodes, remove it.
+            _clusterNodes.remove(nodeId);
+            _dbNodes.remove(nodeId);
 
             // Force a load-immediate to catch anything new from the recovered node.
             doLoadImmediate();
@@ -900,31 +903,6 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
 //        return delay;
 //    }
 
-    void recoverClusterStaleNodes(final String nodeId, final String masterId) {
-        if (__log.isDebugEnabled()) {
-            __log.debug("recovering stale nodes for Cluster Mode " + nodeId);
-        }
-        try {
-            int numrows = execTransaction(new Callable<Integer>() {
-                public Integer call() throws Exception {
-                    return _db.updateReassign(nodeId, masterId);
-                }
-            });
-
-            if (__log.isDebugEnabled()) {
-                __log.debug("reassigned " + numrows + " jobs to master node. ");
-            }
-
-            // Force a load-immediate to catch anything new from the recovered node.
-            doLoadImmediate();
-
-        } catch (Exception ex) {
-            __log.error("Database error reassigning node.", ex);
-        } finally {
-            __log.debug("node recovery complete");
-        }
-    }
-
     private abstract class SchedulerTask extends Task implements Runnable {
         SchedulerTask(long schedDate) {
             super(schedDate);
@@ -979,8 +957,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
 
             boolean success = false;
             try {
-               if (_isClusterEnabled && _clusterManager.getIsMaster()) success = doClusterJobsUpgrade();
-                else success = doUpgrade();
+              success = doUpgrade();
             } finally {
                 long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 1000);
                 _nextUpgrade.set(future);
@@ -1003,14 +980,32 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
         public void run() {
             _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));
             __log.debug("CHECK STALE NODES started");
-            for (String nodeId : _knownNodes) {
-                Long lastSeen = _lastHeartBeat.get(nodeId);
-                if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
-                    && !_nodeId.equals(nodeId))
-                {
+
+            ArrayList<String> knownNodes = new ArrayList<String>();
+            knownNodes.addAll(_dbNodes);
+            knownNodes.addAll(_clusterNodes);
+
+            // for cluster mode
+            if (_isClusterEnabled && _clusterManager.getIsMaster()) {
+                ArrayList<String> memberList = (ArrayList) _clusterManager.getActiveNodes();
+
+                //find stale nodes
+                knownNodes.removeAll(memberList);
+                if (knownNodes.size() != 0) {
+                    for (String nodeId : knownNodes) {
+                        _staleNodes.add(nodeId);
+                    }
+                }
+                for (String nodeId : _staleNodes)  {
                     recoverStaleNode(nodeId);
                 }
             }
+            // for standalone ode node
+            else {
+                for (String nodeId : knownNodes) {
+                    if (!nodeId.equals(_nodeId)) recoverStaleNode(nodeId);
+                }
+            }
         }
     }