You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/11/06 11:51:20 UTC
[25/30] ode git commit: Cluster Enabled Simple Scheduler-2
Cluster Enabled Simple Scheduler-2
Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/3f5ef53a
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/3f5ef53a
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/3f5ef53a
Branch: refs/heads/ODE-563
Commit: 3f5ef53ab9f248d3f443196bd96de6507ad94148
Parents: 15f1883
Author: suba <su...@cse.mrt.ac.lk>
Authored: Tue Jul 21 00:21:05 2015 +0530
Committer: suba <su...@cse.mrt.ac.lk>
Committed: Tue Jul 21 00:21:05 2015 +0530
----------------------------------------------------------------------
Rakefile | 2 +-
.../java/org/apache/ode/axis2/ODEServer.java | 5 +-
.../apache/ode/bpel/clapi/ClusterManager.java | 17 ++
.../ode/bpel/clapi/ClusterMemberListener.java | 29 +++
.../hazelcast/HazelcastClusterImpl.java | 36 ++--
.../ode/scheduler/simple/SchedulerListener.java | 27 ---
.../ode/scheduler/simple/SimpleScheduler.java | 185 +++++++++----------
7 files changed, 157 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/Rakefile
----------------------------------------------------------------------
diff --git a/Rakefile b/Rakefile
index 7c0fa67..5475227 100644
--- a/Rakefile
+++ b/Rakefile
@@ -208,7 +208,7 @@ define "ode" do
desc "ODE Clustering"
define "clustering" do
- compile.with projects("bpel-api","bpel-store"),HAZELCAST, COMMONS.logging
+ compile.with projects("bpel-api","bpel-store","scheduler-simple"),HAZELCAST, COMMONS.logging
package :jar
end
http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index 6803350..222fedd 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -50,7 +50,6 @@ import org.apache.ode.store.ClusterProcessStoreImpl;
import org.apache.ode.store.ProcessStoreImpl;
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.fs.TempFileManager;
-import org.omg.CORBA.StringHolder;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
@@ -198,7 +197,7 @@ public class ODEServer {
_store.loadAll();
if (_clusterManager != null) {
_clusterManager.registerClusterProcessStoreMessageListener();
- _clusterManager.setScheduler(_scheduler);
+ _clusterManager.registerClusterMemberListener(_scheduler);
}
try {
@@ -489,7 +488,7 @@ public class ODEServer {
Class<?> clusterImplClass = this.getClass().getClassLoader().loadClass(clusterImplName);
_clusterManager = (ClusterManager) clusterImplClass.newInstance();
} catch (Exception ex) {
- __log.error("Error while loading class : " +clusterImplName ,ex);
+ __log.error("Error while loading class : " + clusterImplName, ex);
}
_clusterManager.init(_configRoot);
}
http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
index 70d7c03..a00959a 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -19,6 +19,7 @@
package org.apache.ode.bpel.clapi;
import java.io.File;
+import java.util.List;
public interface ClusterManager {
@@ -57,6 +58,12 @@ public interface ClusterManager {
void registerClusterProcessStoreMessageListener();
/**
+ * Register Scheduler as ClusterMemberListener
+ * @param scheduler
+ */
+ void registerClusterMemberListener(Object scheduler);
+
+ /**
* Return deployment lock for cluster
*/
ClusterLock getDeploymentLock();
@@ -65,4 +72,14 @@ public interface ClusterManager {
* Return instance lock for cluster
*/
ClusterLock getInstanceLock();
+
+ /**
+ * Return active node list in the cluster
+ */
+ List<String> getActiveNodes();
+
+ /**
+ * Return local member's uuid in the cluster
+ */
+ String getUuid();
}
http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
new file mode 100644
index 0000000..4225f7d
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+public interface ClusterMemberListener {
+
+ void memberAdded(String nodeId);
+
+ void memberRemoved(String nodeId);
+
+ void memberElectedAsMaster();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index 63a889a..971df3e 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -30,7 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.clapi.*;
-import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.scheduler.simple.SimpleScheduler;
/**
* This class implements necessary methods to build the cluster using hazelcast
@@ -47,7 +47,7 @@ public class HazelcastClusterImpl implements ClusterManager {
private IMap<Long, Long> instance_lock_map;
private ITopic<ProcessStoreClusterEvent> clusterMessageTopic;
private ClusterProcessStore _clusterProcessStore;
- private Scheduler _scheduler;
+ private SimpleScheduler _scheduler;
private ClusterLock<String> _hazelcastDeploymentLock;
private ClusterLock<Long> _hazelcastInstanceLock;
@@ -77,7 +77,6 @@ public class HazelcastClusterImpl implements ClusterManager {
nodeID = localMember.getInetSocketAddress().getHostName() +":" +localMember.getInetSocketAddress().getPort();
uuid = localMember.getUuid();
__log.info("Registering HZ localMember ID " + nodeID);
- markAsMaster();
deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK);
instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK);
@@ -93,15 +92,15 @@ public class HazelcastClusterImpl implements ClusterManager {
public void memberAdded(MembershipEvent membershipEvent) {
String nodeId = membershipEvent.getMember().getUuid();
__log.info("Member Added " +nodeId);
- if(isMaster) _simpleScheduler.memberAdded(nodeId);
+ _scheduler.memberAdded(nodeId);
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
- String nodeId = membershipEvent.getMember().getUuid();
- __log.info("Member Removed " +nodeId);
+ String nodeId = membershipEvent.getMember().getUuid();
+ __log.info("Member Removed " + nodeId);
markAsMaster();
- if(isMaster) _simpleScheduler.memberRemoved(nodeId, uuid);
+ _scheduler.memberRemoved(nodeId);
}
@Override
@@ -149,9 +148,9 @@ public class HazelcastClusterImpl implements ClusterManager {
private void markAsMaster() {
leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
- if (leader.localMember()) {
+ if (leader.localMember() && isMaster == false) {
isMaster = true;
- _simpleScheduler.setIsMasterNode(true);
+ _scheduler.memberElectedAsMaster();
}
__log.info(isMaster);
}
@@ -168,15 +167,16 @@ public class HazelcastClusterImpl implements ClusterManager {
_clusterProcessStore = store;
}
- public void setScheduler(Scheduler scheduler) {
- _scheduler = scheduler;
- _scheduler.setClusterManager(this);
- }
-
public void registerClusterProcessStoreMessageListener() {
clusterMessageTopic.addMessageListener(new ClusterMessageListener());
}
+ public void registerClusterMemberListener(Object scheduler) {
+ _scheduler = (SimpleScheduler) scheduler;
+ markAsMaster();
+ _scheduler.setClusterManager(this);
+ }
+
public void shutdown() {
if(_hazelcastInstance != null) _hazelcastInstance.getLifecycleService().shutdown();
}
@@ -189,11 +189,11 @@ public class HazelcastClusterImpl implements ClusterManager {
return _hazelcastInstanceLock;
}
- public List<String> getKnownNodes() {
- List<String> nodesList = new ArrayList<String>();
+ public List<String> getActiveNodes() {
+ List<String> nodeList = new ArrayList<String>();
for(Member m : _hazelcastInstance.getCluster().getMembers())
- nodesList.add(m.getUuid()) ;
- return nodesList;
+ nodeList.add(m.getUuid()) ;
+ return nodeList;
}
}
http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java
deleted file mode 100644
index 3786912..0000000
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ode.scheduler.simple;
-
-public interface SchedulerListener {
-
- void memberAdded(String nodeId);
-
- void memberRemoved(String nodeId,String masterId);
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
index 3b6ec4d..a0dbf5a 100644
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
@@ -22,6 +22,7 @@ package org.apache.ode.scheduler.simple;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ClusterMemberListener;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Scheduler;
@@ -52,7 +53,7 @@ import java.util.concurrent.atomic.AtomicLong;
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
*
*/
-public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener {
+public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberListener {
private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
private static final int DEFAULT_TRANSACTION_TIMEOUT = 60 * 1000;
@@ -102,13 +103,21 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
private boolean _isClusterEnabled;
+ private String _masterId;
+
private ClusterManager _clusterManager;
- /** All the nodes we know about */
- private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
+ /** All the nodes which are taken from the database*/
+ private CopyOnWriteArraySet<String> _dbNodes = new CopyOnWriteArraySet<String>();
+
+ /** All the stale nodes */
+ private CopyOnWriteArraySet<String> _staleNodes = new CopyOnWriteArraySet<String>();
+
+ /** All the nodes when members are added to the cluster*/
+ private CopyOnWriteArraySet<String> _clusterNodes = new CopyOnWriteArraySet<String>();
/** When we last heard from our nodes. */
- private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
+ //private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
/** Set of outstanding jobs, i.e., jobs that have been enqueued but not dequeued or dispatched yet.
Used to avoid cases where a job would be dispatched twice if the server is under high load and
@@ -460,13 +469,15 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
_processedSinceLastLoadTask.clear();
_outstandingJobs.clear();
- _knownNodes.clear();
+ _dbNodes.clear();
+ _clusterNodes.clear();
+ _staleNodes.clear();
try {
execTransaction(new Callable<Void>() {
public Void call() throws Exception {
- _knownNodes.addAll(_db.getNodeIds());
+ _dbNodes.addAll(_db.getNodeIds());
return null;
}
@@ -475,21 +486,21 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
__log.error("Error retrieving node list.", ex);
throw new ContextException("Error retrieving node list.", ex);
}
+ _clusterNodes.add(_nodeId);
long now = System.currentTimeMillis();
// Pretend we got a heartbeat...
- for (String s : _knownNodes) _lastHeartBeat.put(s, now);
+ //for (String s : _knownNodes) _lastHeartBeat.put(s, now);
// schedule immediate job loading for now!
_todo.enqueue(new LoadImmediateTask(now));
// schedule check for stale nodes, make it random so that the nodes don't overlap.
- if (!_isClusterEnabled)
- _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+ _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
// do the upgrade sometime (random) in the immediate interval.
- enqueUpgradeJobsTask(now);
+ _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
_todo.start();
_running = true;
@@ -517,16 +528,33 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
_running = false;
}
- public void memberAdded(final String nodeId) {
- _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis()+ randomMean(_immediateInterval)));
+ public void memberAdded(String nodeId) {
+ _clusterNodes.add(nodeId);
}
- public void memberRemoved(final String nodeId, final String masterId) {
- recoverClusterStaleNodes(nodeId, masterId);
+ public void memberRemoved(String nodeId) {
+ _staleNodes.add(nodeId);
}
- public void enqueUpgradeJobsTask(long now) {
- _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+ // Do enqueue CheckStaleNodes and UpgradeJobsTask after a new master is identified.
+ public void memberElectedAsMaster() {
+ _masterId = _nodeId;
+ _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + randomMean(_staleInterval)));
+ _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + randomMean(_immediateInterval)));
+ _dbNodes.clear();
+ try {
+ execTransaction(new Callable<Void>() {
+
+ public Void call() throws Exception {
+ _dbNodes.addAll(_db.getNodeIds());
+ return null;
+ }
+
+ });
+ } catch (Exception ex) {
+ __log.error("Error retrieving node list.", ex);
+ throw new ContextException("Error retrieving node list.", ex);
+ }
}
class RunJob implements Callable<Void> {
@@ -701,7 +729,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
}
}
- public void updateHeartBeat(String nodeId) {
+ /*public void updateHeartBeat(String nodeId) {
if (nodeId == null)
return;
@@ -710,7 +738,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
_lastHeartBeat.put(nodeId, System.currentTimeMillis());
_knownNodes.add(nodeId);
- }
+ }*/
boolean doLoadImmediate() {
__log.debug("LOAD IMMEDIATE started");
@@ -788,10 +816,19 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
boolean doUpgrade() {
__log.debug("UPGRADE started");
- final ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
- // Don't forget about self.
- knownNodes.add(_nodeId);
- Collections.sort(knownNodes);
+ final ArrayList<String> activeNodes;
+
+ // for cluster mode
+ if (_isClusterEnabled && _clusterManager.getIsMaster()) {
+ activeNodes = (ArrayList) _clusterManager.getActiveNodes();
+ }
+ //for standalone ODE deployments
+ else {
+ activeNodes = new ArrayList<String>();
+ activeNodes.add(_nodeId);
+ }
+
+ Collections.sort(activeNodes);
// We're going to try to upgrade near future jobs using the db only.
// We assume that the distribution of the trailing digits in the
@@ -803,9 +840,9 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
return execTransaction(new Callable<Boolean>() {
public Boolean call() throws Exception {
- int numNodes = knownNodes.size();
+ int numNodes = activeNodes.size();
for (int i = 0; i < numNodes; ++i) {
- String node = knownNodes.get(i);
+ String node = activeNodes.get(i);
_db.updateAssignToNode(node, i, numNodes, maxtime);
}
return true;
@@ -822,41 +859,6 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
}
- boolean doClusterJobsUpgrade() {
- __log.debug("UPGRADE started for Cluster Mode");
- final ArrayList<String> knownNodes = _clusterManager.getKnownNodes();
- Collections.sort(knownNodes);
-
- // We're going to try to upgrade near future jobs using the db only.
- // We assume that the distribution of the trailing digits in the
- // scheduled time are uniformly distributed, and use modular division
- // of the time by the number of nodes to create the node assignment.
- // This can be done in a single update statement.
- final long maxtime = System.currentTimeMillis() + _nearFutureInterval;
- try {
- return execTransaction(new Callable<Boolean>() {
-
- public Boolean call() throws Exception {
- int numNodes = knownNodes.size();
- for (int i = 0; i < numNodes; ++i) {
- String node = knownNodes.get(i);
- _db.updateAssignToNode(node, i, numNodes, maxtime);
- }
- return true;
- }
-
- });
-
- } catch (Exception ex) {
- __log.error("Database error upgrading jobs.", ex);
- return false;
- } finally {
- __log.debug("UPGRADE complete");
- }
-
- }
-
-
/**
* Re-assign stale node's jobs to self.
* @param nodeId
@@ -876,10 +878,11 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
__log.debug("reassigned " + numrows + " jobs to self. ");
}
- // We can now forget about this node, if we see it again, it will be
- // "new to us"
- _knownNodes.remove(nodeId);
- _lastHeartBeat.remove(nodeId);
+ if(_isClusterEnabled) _staleNodes.remove(nodeId);
+
+ // If the stale node id is in _clusterNodes or _dbNodes, remove it.
+ _clusterNodes.remove(nodeId);
+ _dbNodes.remove(nodeId);
// Force a load-immediate to catch anything new from the recovered node.
doLoadImmediate();
@@ -900,31 +903,6 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
// return delay;
// }
- void recoverClusterStaleNodes(final String nodeId, final String masterId) {
- if (__log.isDebugEnabled()) {
- __log.debug("recovering stale nodes for Cluster Mode " + nodeId);
- }
- try {
- int numrows = execTransaction(new Callable<Integer>() {
- public Integer call() throws Exception {
- return _db.updateReassign(nodeId, masterId);
- }
- });
-
- if (__log.isDebugEnabled()) {
- __log.debug("reassigned " + numrows + " jobs to master node. ");
- }
-
- // Force a load-immediate to catch anything new from the recovered node.
- doLoadImmediate();
-
- } catch (Exception ex) {
- __log.error("Database error reassigning node.", ex);
- } finally {
- __log.debug("node recovery complete");
- }
- }
-
private abstract class SchedulerTask extends Task implements Runnable {
SchedulerTask(long schedDate) {
super(schedDate);
@@ -979,8 +957,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
boolean success = false;
try {
- if (_isClusterEnabled && _clusterManager.getIsMaster()) success = doClusterJobsUpgrade();
- else success = doUpgrade();
+ success = doUpgrade();
} finally {
long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 1000);
_nextUpgrade.set(future);
@@ -1003,14 +980,32 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener
public void run() {
_todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));
__log.debug("CHECK STALE NODES started");
- for (String nodeId : _knownNodes) {
- Long lastSeen = _lastHeartBeat.get(nodeId);
- if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
- && !_nodeId.equals(nodeId))
- {
+
+ ArrayList<String> knownNodes = new ArrayList<String>();
+ knownNodes.addAll(_dbNodes);
+ knownNodes.addAll(_clusterNodes);
+
+ // for cluster mode
+ if (_isClusterEnabled && _clusterManager.getIsMaster()) {
+ ArrayList<String> memberList = (ArrayList) _clusterManager.getActiveNodes();
+
+ //find stale nodes
+ knownNodes.removeAll(memberList);
+ if (knownNodes.size() != 0) {
+ for (String nodeId : knownNodes) {
+ _staleNodes.add(nodeId);
+ }
+ }
+ for (String nodeId : _staleNodes) {
recoverStaleNode(nodeId);
}
}
+ // for standalone ode node
+ else {
+ for (String nodeId : knownNodes) {
+ if (!nodeId.equals(_nodeId)) recoverStaleNode(nodeId);
+ }
+ }
}
}