You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/11/06 11:51:21 UTC
[26/30] ode git commit: Cluster Enabled Simple Scheduler-3
Cluster Enabled Simple Scheduler-3
Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/43a8df89
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/43a8df89
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/43a8df89
Branch: refs/heads/ODE-563
Commit: 43a8df89b9b05c5831cc5fd4d385b018094f7429
Parents: 3f5ef53
Author: suba <su...@cse.mrt.ac.lk>
Authored: Thu Jul 23 15:52:25 2015 +0530
Committer: suba <su...@cse.mrt.ac.lk>
Committed: Thu Jul 23 15:52:25 2015 +0530
----------------------------------------------------------------------
Rakefile | 2 +-
.../java/org/apache/ode/axis2/ODEServer.java | 15 ++--
.../apache/ode/bpel/clapi/ClusterManager.java | 4 +-
.../ode/bpel/clapi/ClusterMemberListener.java | 2 +-
.../org/apache/ode/test/BPELTestAbstract.java | 2 +-
.../hazelcast/HazelcastClusterImpl.java | 17 ++--
.../java/org/apache/ode/jbi/OdeLifeCycle.java | 2 +-
.../ode/scheduler/simple/SimpleScheduler.java | 88 +++++++++++---------
8 files changed, 71 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/Rakefile
----------------------------------------------------------------------
diff --git a/Rakefile b/Rakefile
index 5475227..7c0fa67 100644
--- a/Rakefile
+++ b/Rakefile
@@ -208,7 +208,7 @@ define "ode" do
desc "ODE Clustering"
define "clustering" do
- compile.with projects("bpel-api","bpel-store","scheduler-simple"),HAZELCAST, COMMONS.logging
+ compile.with projects("bpel-api","bpel-store"),HAZELCAST, COMMONS.logging
package :jar
end
http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index 222fedd..b3f5d2f 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -31,6 +31,7 @@ import org.apache.ode.axis2.service.DeploymentWebService;
import org.apache.ode.axis2.service.ManagementService;
import org.apache.ode.axis2.util.ClusterUrlTransformer;
import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ClusterMemberListener;
import org.apache.ode.bpel.connector.BpelServerConnector;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
@@ -197,7 +198,9 @@ public class ODEServer {
_store.loadAll();
if (_clusterManager != null) {
_clusterManager.registerClusterProcessStoreMessageListener();
- _clusterManager.registerClusterMemberListener(_scheduler);
+ if (_scheduler instanceof SimpleScheduler) {
+ _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler);
+ }
}
try {
@@ -527,10 +530,12 @@ public class ODEServer {
}
protected Scheduler createScheduler() {
- String nodeId;
- if (isClusteringEnabled) nodeId = _clusterManager.getUuid();
- else nodeId = new GUID().toString();
- SimpleScheduler scheduler = new SimpleScheduler(nodeId, new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled);
+ SimpleScheduler scheduler;
+ if (isClusteringEnabled) {
+ scheduler = new SimpleScheduler(_clusterManager.getUuid(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled);
+ scheduler.setClusterManager(_clusterManager);
+ } else
+ scheduler = new SimpleScheduler(new GUID().toString(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
scheduler.setExecutorService(_executorService);
scheduler.setTransactionManager(_txMgr);
return scheduler;
http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
index a00959a..07d3d8d 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -59,9 +59,9 @@ public interface ClusterManager {
/**
* Register Scheduler as ClusterMemberListener
- * @param scheduler
+ * @param listener
*/
- void registerClusterMemberListener(Object scheduler);
+ void registerClusterMemberListener(ClusterMemberListener listener);
/**
* Return deployment lock for cluster
http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
index 4225f7d..541ab9c 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
@@ -24,6 +24,6 @@ public interface ClusterMemberListener {
void memberRemoved(String nodeId);
- void memberElectedAsMaster();
+ void memberElectedAsMaster(String masterId);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
----------------------------------------------------------------------
diff --git a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
index cdda50e..00bdf7d 100644
--- a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
+++ b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
@@ -128,7 +128,7 @@ public abstract class BPELTestAbstract {
{
JdbcDelegate del = new JdbcDelegate(_dataSource);
- scheduler = new SimpleScheduler("node", del, props,false);
+ scheduler = new SimpleScheduler("node", del, props);
scheduler.setTransactionManager(_txManager);
_cf = new BpelDAOConnectionFactoryImpl(scheduler);
_server.setDaoConnectionFactory(_cf);
http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index 971df3e..f68068a 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -30,7 +30,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.clapi.*;
-import org.apache.ode.scheduler.simple.SimpleScheduler;
/**
* This class implements necessary methods to build the cluster using hazelcast
@@ -47,7 +46,7 @@ public class HazelcastClusterImpl implements ClusterManager {
private IMap<Long, Long> instance_lock_map;
private ITopic<ProcessStoreClusterEvent> clusterMessageTopic;
private ClusterProcessStore _clusterProcessStore;
- private SimpleScheduler _scheduler;
+ private ClusterMemberListener _listener;
private ClusterLock<String> _hazelcastDeploymentLock;
private ClusterLock<Long> _hazelcastInstanceLock;
@@ -78,6 +77,8 @@ public class HazelcastClusterImpl implements ClusterManager {
uuid = localMember.getUuid();
__log.info("Registering HZ localMember ID " + nodeID);
+ markAsMaster();
+
deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK);
instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK);
clusterMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_MSG);
@@ -92,7 +93,7 @@ public class HazelcastClusterImpl implements ClusterManager {
public void memberAdded(MembershipEvent membershipEvent) {
String nodeId = membershipEvent.getMember().getUuid();
__log.info("Member Added " +nodeId);
- _scheduler.memberAdded(nodeId);
+ if(isMaster && _listener != null) _listener.memberAdded(nodeId);
}
@Override
@@ -100,7 +101,7 @@ public class HazelcastClusterImpl implements ClusterManager {
String nodeId = membershipEvent.getMember().getUuid();
__log.info("Member Removed " + nodeId);
markAsMaster();
- _scheduler.memberRemoved(nodeId);
+ if(isMaster && _listener != null) _listener.memberRemoved(nodeId);
}
@Override
@@ -150,7 +151,7 @@ public class HazelcastClusterImpl implements ClusterManager {
leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
if (leader.localMember() && isMaster == false) {
isMaster = true;
- _scheduler.memberElectedAsMaster();
+ if(_listener != null) _listener.memberElectedAsMaster(uuid);
}
__log.info(isMaster);
}
@@ -171,10 +172,8 @@ public class HazelcastClusterImpl implements ClusterManager {
clusterMessageTopic.addMessageListener(new ClusterMessageListener());
}
- public void registerClusterMemberListener(Object scheduler) {
- _scheduler = (SimpleScheduler) scheduler;
- markAsMaster();
- _scheduler.setClusterManager(this);
+ public void registerClusterMemberListener(ClusterMemberListener listener) {
+ _listener = listener;
}
public void shutdown() {
http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
----------------------------------------------------------------------
diff --git a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
index 0c1b296..c885d13 100644
--- a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
+++ b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
@@ -242,7 +242,7 @@ public class OdeLifeCycle implements ComponentLifeCycle {
_ode._executorService = Executors.newCachedThreadPool();
else
_ode._executorService = Executors.newFixedThreadPool(_ode._config.getThreadPoolMaxSize());
- _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties(),false);
+ _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties());
_ode._scheduler.setJobProcessor(_ode._server);
_ode._scheduler.setExecutorService(_ode._executorService);
_ode._scheduler.setTransactionManager((TransactionManager) _ode.getContext().getTransactionManager());
http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
index a0dbf5a..df33ae0 100644
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
@@ -103,19 +103,14 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
private boolean _isClusterEnabled;
- private String _masterId;
-
private ClusterManager _clusterManager;
- /** All the nodes which are taken from the database*/
- private CopyOnWriteArraySet<String> _dbNodes = new CopyOnWriteArraySet<String>();
+ /** All the nodes we know about */
+ private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
/** All the stale nodes */
private CopyOnWriteArraySet<String> _staleNodes = new CopyOnWriteArraySet<String>();
- /** All the nodes when members are added to the cluster*/
- private CopyOnWriteArraySet<String> _clusterNodes = new CopyOnWriteArraySet<String>();
-
/** When we last heard from our nodes. */
//private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
@@ -146,6 +141,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
private DateFormat debugDateFormatter = new SimpleDateFormat("HH:mm:ss,SSS");
+ public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
+ this(nodeId,del,conf,false);
+ }
+
public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf, boolean clusterState) {
_nodeId = nodeId;
_db = del;
@@ -469,25 +468,9 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
_processedSinceLastLoadTask.clear();
_outstandingJobs.clear();
- _dbNodes.clear();
- _clusterNodes.clear();
+ _knownNodes.clear();
_staleNodes.clear();
- try {
- execTransaction(new Callable<Void>() {
-
- public Void call() throws Exception {
- _dbNodes.addAll(_db.getNodeIds());
- return null;
- }
-
- });
- } catch (Exception ex) {
- __log.error("Error retrieving node list.", ex);
- throw new ContextException("Error retrieving node list.", ex);
- }
- _clusterNodes.add(_nodeId);
-
long now = System.currentTimeMillis();
// Pretend we got a heartbeat...
@@ -496,11 +479,11 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
// schedule immediate job loading for now!
_todo.enqueue(new LoadImmediateTask(now));
- // schedule check for stale nodes, make it random so that the nodes don't overlap.
- _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+ if(!_isClusterEnabled) enqueueTasksReadnodeIds();
- // do the upgrade sometime (random) in the immediate interval.
- _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+ else {
+ if (_clusterManager.getIsMaster()) enqueueTasksReadnodeIds();
+ }
_todo.start();
_running = true;
@@ -529,7 +512,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
}
public void memberAdded(String nodeId) {
- _clusterNodes.add(nodeId);
+ _knownNodes.add(nodeId);
}
public void memberRemoved(String nodeId) {
@@ -537,16 +520,16 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
}
// Do enqueue CheckStaleNodes and UpgradeJobsTask after a new master is identified.
- public void memberElectedAsMaster() {
- _masterId = _nodeId;
- _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + randomMean(_staleInterval)));
- _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + randomMean(_immediateInterval)));
- _dbNodes.clear();
+ public void memberElectedAsMaster(String masterId) {
+ enqueueTasksReadnodeIds();
+ }
+
+ private void enqueueTasksReadnodeIds() {
try {
execTransaction(new Callable<Void>() {
public Void call() throws Exception {
- _dbNodes.addAll(_db.getNodeIds());
+ _knownNodes.addAll(_db.getNodeIds());
return null;
}
@@ -555,6 +538,19 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
__log.error("Error retrieving node list.", ex);
throw new ContextException("Error retrieving node list.", ex);
}
+
+ //make double sure all the active nodes are included into _knownNodes
+ if(_isClusterEnabled) _knownNodes.addAll(_clusterManager.getActiveNodes());
+
+ else _knownNodes.add(_nodeId);
+
+ long now = System.currentTimeMillis();
+
+ // schedule check for stale nodes, make it random so that the nodes don't overlap.
+ _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+
+ // do the upgrade sometime (random) in the immediate interval.
+ _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
}
class RunJob implements Callable<Void> {
@@ -880,9 +876,13 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
if(_isClusterEnabled) _staleNodes.remove(nodeId);
- // If the stale node id is in _clusterNodes or _dbNodes, remove it.
- _clusterNodes.remove(nodeId);
- _dbNodes.remove(nodeId);
+ // If the stale node id is in _knownNodes, remove it.
+ _knownNodes.remove(nodeId);
+
+ // We can now forget about this node, if we see it again, it will be
+ // "new to us"
+ //_knownNodes.remove(nodeId);
+ //_lastHeartBeat.remove(nodeId);
// Force a load-immediate to catch anything new from the recovered node.
doLoadImmediate();
@@ -981,9 +981,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
_todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));
__log.debug("CHECK STALE NODES started");
- ArrayList<String> knownNodes = new ArrayList<String>();
- knownNodes.addAll(_dbNodes);
- knownNodes.addAll(_clusterNodes);
+ ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
// for cluster mode
if (_isClusterEnabled && _clusterManager.getIsMaster()) {
@@ -1006,6 +1004,14 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
if (!nodeId.equals(_nodeId)) recoverStaleNode(nodeId);
}
}
+ /*for (String nodeId : _knownNodes) {
+ Long lastSeen = _lastHeartBeat.get(nodeId);
+ if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
+ && !_nodeId.equals(nodeId))
+ {
+ recoverStaleNode(nodeId);
+ }
+ }*/
}
}