You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/11/06 11:51:21 UTC

[26/30] ode git commit: Cluster Enabled Simple Scheduler-3

Cluster Enabled Simple Scheduler-3


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/43a8df89
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/43a8df89
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/43a8df89

Branch: refs/heads/ODE-563
Commit: 43a8df89b9b05c5831cc5fd4d385b018094f7429
Parents: 3f5ef53
Author: suba <su...@cse.mrt.ac.lk>
Authored: Thu Jul 23 15:52:25 2015 +0530
Committer: suba <su...@cse.mrt.ac.lk>
Committed: Thu Jul 23 15:52:25 2015 +0530

----------------------------------------------------------------------
 Rakefile                                        |  2 +-
 .../java/org/apache/ode/axis2/ODEServer.java    | 15 ++--
 .../apache/ode/bpel/clapi/ClusterManager.java   |  4 +-
 .../ode/bpel/clapi/ClusterMemberListener.java   |  2 +-
 .../org/apache/ode/test/BPELTestAbstract.java   |  2 +-
 .../hazelcast/HazelcastClusterImpl.java         | 17 ++--
 .../java/org/apache/ode/jbi/OdeLifeCycle.java   |  2 +-
 .../ode/scheduler/simple/SimpleScheduler.java   | 88 +++++++++++---------
 8 files changed, 71 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/Rakefile
----------------------------------------------------------------------
diff --git a/Rakefile b/Rakefile
index 5475227..7c0fa67 100644
--- a/Rakefile
+++ b/Rakefile
@@ -208,7 +208,7 @@ define "ode" do
 
   desc "ODE Clustering"
    define "clustering" do
-     compile.with projects("bpel-api","bpel-store","scheduler-simple"),HAZELCAST, COMMONS.logging
+     compile.with projects("bpel-api","bpel-store"),HAZELCAST, COMMONS.logging
      package :jar
    end
 

http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index 222fedd..b3f5d2f 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -31,6 +31,7 @@ import org.apache.ode.axis2.service.DeploymentWebService;
 import org.apache.ode.axis2.service.ManagementService;
 import org.apache.ode.axis2.util.ClusterUrlTransformer;
 import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ClusterMemberListener;
 import org.apache.ode.bpel.connector.BpelServerConnector;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.engine.BpelServerImpl;
@@ -197,7 +198,9 @@ public class ODEServer {
         _store.loadAll();
         if (_clusterManager != null) {
             _clusterManager.registerClusterProcessStoreMessageListener();
-            _clusterManager.registerClusterMemberListener(_scheduler);
+            if (_scheduler instanceof SimpleScheduler) {
+                _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler);
+            }
         }
 
         try {
@@ -527,10 +530,12 @@ public class ODEServer {
     }
 
     protected Scheduler createScheduler() {
-        String nodeId;
-        if (isClusteringEnabled) nodeId = _clusterManager.getUuid();
-        else nodeId = new GUID().toString();
-        SimpleScheduler scheduler = new SimpleScheduler(nodeId, new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled);
+        SimpleScheduler scheduler;
+        if (isClusteringEnabled) {
+            scheduler = new SimpleScheduler(_clusterManager.getUuid(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled);
+            scheduler.setClusterManager(_clusterManager);
+        } else
+            scheduler = new SimpleScheduler(new GUID().toString(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
         scheduler.setExecutorService(_executorService);
         scheduler.setTransactionManager(_txMgr);
         return scheduler;

http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
index a00959a..07d3d8d 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -59,9 +59,9 @@ public interface ClusterManager {
 
     /**
      * Register Scheduler as ClusterMemberListener
-     * @param scheduler
+     * @param listener
      */
-    void registerClusterMemberListener(Object scheduler);
+    void registerClusterMemberListener(ClusterMemberListener listener);
 
     /**
      * Return deployment lock for cluster

http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
index 4225f7d..541ab9c 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
@@ -24,6 +24,6 @@ public interface ClusterMemberListener {
 
     void memberRemoved(String nodeId);
 
-    void memberElectedAsMaster();
+    void memberElectedAsMaster(String masterId);
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
----------------------------------------------------------------------
diff --git a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
index cdda50e..00bdf7d 100644
--- a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
+++ b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
@@ -128,7 +128,7 @@ public abstract class BPELTestAbstract {
 
         {
             JdbcDelegate del = new JdbcDelegate(_dataSource);
-            scheduler = new SimpleScheduler("node", del, props,false);
+            scheduler = new SimpleScheduler("node", del, props);
             scheduler.setTransactionManager(_txManager);
             _cf = new BpelDAOConnectionFactoryImpl(scheduler);
             _server.setDaoConnectionFactory(_cf);

http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index 971df3e..f68068a 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -30,7 +30,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.ode.bpel.clapi.*;
-import org.apache.ode.scheduler.simple.SimpleScheduler;
 
 /**
  * This class implements necessary methods to build the cluster using hazelcast
@@ -47,7 +46,7 @@ public class HazelcastClusterImpl implements ClusterManager {
     private IMap<Long, Long> instance_lock_map;
     private ITopic<ProcessStoreClusterEvent> clusterMessageTopic;
     private ClusterProcessStore _clusterProcessStore;
-    private SimpleScheduler _scheduler;
+    private ClusterMemberListener _listener;
     private ClusterLock<String> _hazelcastDeploymentLock;
     private ClusterLock<Long> _hazelcastInstanceLock;
 
@@ -78,6 +77,8 @@ public class HazelcastClusterImpl implements ClusterManager {
             uuid = localMember.getUuid();
             __log.info("Registering HZ localMember ID " + nodeID);
 
+            markAsMaster();
+
             deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK);
             instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK);
             clusterMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_MSG);
@@ -92,7 +93,7 @@ public class HazelcastClusterImpl implements ClusterManager {
         public void memberAdded(MembershipEvent membershipEvent) {
             String nodeId =  membershipEvent.getMember().getUuid();
             __log.info("Member Added " +nodeId);
-            _scheduler.memberAdded(nodeId);
+            if(isMaster && _listener != null) _listener.memberAdded(nodeId);
         }
 
         @Override
@@ -100,7 +101,7 @@ public class HazelcastClusterImpl implements ClusterManager {
             String nodeId = membershipEvent.getMember().getUuid();
             __log.info("Member Removed " + nodeId);
             markAsMaster();
-            _scheduler.memberRemoved(nodeId);
+            if(isMaster && _listener != null) _listener.memberRemoved(nodeId);
         }
 
         @Override
@@ -150,7 +151,7 @@ public class HazelcastClusterImpl implements ClusterManager {
         leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
         if (leader.localMember() && isMaster == false) {
             isMaster = true;
-            _scheduler.memberElectedAsMaster();
+            if(_listener != null) _listener.memberElectedAsMaster(uuid);
         }
         __log.info(isMaster);
     }
@@ -171,10 +172,8 @@ public class HazelcastClusterImpl implements ClusterManager {
         clusterMessageTopic.addMessageListener(new ClusterMessageListener());
     }
 
-    public void registerClusterMemberListener(Object scheduler) {
-        _scheduler = (SimpleScheduler) scheduler;
-        markAsMaster();
-        _scheduler.setClusterManager(this);
+    public void registerClusterMemberListener(ClusterMemberListener listener) {
+        _listener = listener;
     }
 
     public void shutdown() {

http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
----------------------------------------------------------------------
diff --git a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
index 0c1b296..c885d13 100644
--- a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
+++ b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
@@ -242,7 +242,7 @@ public class OdeLifeCycle implements ComponentLifeCycle {
             _ode._executorService = Executors.newCachedThreadPool();
         else
             _ode._executorService = Executors.newFixedThreadPool(_ode._config.getThreadPoolMaxSize());
-        _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties(),false);
+        _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties());
         _ode._scheduler.setJobProcessor(_ode._server);
         _ode._scheduler.setExecutorService(_ode._executorService);
         _ode._scheduler.setTransactionManager((TransactionManager) _ode.getContext().getTransactionManager());

http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
index a0dbf5a..df33ae0 100644
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
@@ -103,19 +103,14 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
 
     private boolean _isClusterEnabled;
 
-    private String _masterId;
-
     private ClusterManager _clusterManager;
 
-    /** All the nodes which are taken from the database*/
-    private CopyOnWriteArraySet<String> _dbNodes = new CopyOnWriteArraySet<String>();
+    /** All the nodes we know about */
+    private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
 
     /** All the stale nodes */
     private CopyOnWriteArraySet<String> _staleNodes = new CopyOnWriteArraySet<String>();
 
-    /** All the nodes when members are added to the cluster*/
-    private CopyOnWriteArraySet<String> _clusterNodes = new CopyOnWriteArraySet<String>();
-
     /** When we last heard from our nodes. */
     //private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
 
@@ -146,6 +141,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
 
     private DateFormat debugDateFormatter = new SimpleDateFormat("HH:mm:ss,SSS");
 
+    public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
+       this(nodeId,del,conf,false);
+    }
+
     public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf, boolean clusterState) {
         _nodeId = nodeId;
         _db = del;
@@ -469,25 +468,9 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
         _processedSinceLastLoadTask.clear();
         _outstandingJobs.clear();
 
-        _dbNodes.clear();
-        _clusterNodes.clear();
+        _knownNodes.clear();
         _staleNodes.clear();
 
-        try {
-            execTransaction(new Callable<Void>() {
-
-                public Void call() throws Exception {
-                    _dbNodes.addAll(_db.getNodeIds());
-                    return null;
-                }
-
-            });
-        } catch (Exception ex) {
-            __log.error("Error retrieving node list.", ex);
-            throw new ContextException("Error retrieving node list.", ex);
-        }
-        _clusterNodes.add(_nodeId);
-
         long now = System.currentTimeMillis();
 
         // Pretend we got a heartbeat...
@@ -496,11 +479,11 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
         // schedule immediate job loading for now!
         _todo.enqueue(new LoadImmediateTask(now));
 
-        // schedule check for stale nodes, make it random so that the nodes don't overlap.
-        _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+        if(!_isClusterEnabled) enqueueTasksReadnodeIds();
 
-        // do the upgrade sometime (random) in the immediate interval.
-        _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+        else {
+            if (_clusterManager.getIsMaster()) enqueueTasksReadnodeIds();
+        }
 
         _todo.start();
         _running = true;
@@ -529,7 +512,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
     }
 
     public void memberAdded(String nodeId) {
-        _clusterNodes.add(nodeId);
+        _knownNodes.add(nodeId);
     }
 
     public void memberRemoved(String nodeId) {
@@ -537,16 +520,16 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
     }
 
     // Do enqueue CheckStaleNodes and UpgradeJobsTask after a new master is identified.
-    public void memberElectedAsMaster() {
-        _masterId = _nodeId;
-        _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + randomMean(_staleInterval)));
-        _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + randomMean(_immediateInterval)));
-        _dbNodes.clear();
+    public void memberElectedAsMaster(String masterId) {
+        enqueueTasksReadnodeIds();
+    }
+
+    private void enqueueTasksReadnodeIds() {
         try {
             execTransaction(new Callable<Void>() {
 
                 public Void call() throws Exception {
-                    _dbNodes.addAll(_db.getNodeIds());
+                    _knownNodes.addAll(_db.getNodeIds());
                     return null;
                 }
 
@@ -555,6 +538,19 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
             __log.error("Error retrieving node list.", ex);
             throw new ContextException("Error retrieving node list.", ex);
         }
+
+        //make double sure all the active nodes are included into _knownNodes
+        if(_isClusterEnabled) _knownNodes.addAll(_clusterManager.getActiveNodes());
+
+        else _knownNodes.add(_nodeId);
+
+        long now = System.currentTimeMillis();
+
+        // schedule check for stale nodes, make it random so that the nodes don't overlap.
+        _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+
+        // do the upgrade sometime (random) in the immediate interval.
+        _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
     }
 
     class RunJob implements Callable<Void> {
@@ -880,9 +876,13 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
 
             if(_isClusterEnabled) _staleNodes.remove(nodeId);
 
-            // If the stale node id is in _clusterNodes or _dbNodes, remove it.
-            _clusterNodes.remove(nodeId);
-            _dbNodes.remove(nodeId);
+            // If the stale node id is in _knownNodes, remove it.
+            _knownNodes.remove(nodeId);
+
+            // We can now forget about this node, if we see it again, it will be
+            // "new to us"
+            //_knownNodes.remove(nodeId);
+            //_lastHeartBeat.remove(nodeId);
 
             // Force a load-immediate to catch anything new from the recovered node.
             doLoadImmediate();
@@ -981,9 +981,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
             _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));
             __log.debug("CHECK STALE NODES started");
 
-            ArrayList<String> knownNodes = new ArrayList<String>();
-            knownNodes.addAll(_dbNodes);
-            knownNodes.addAll(_clusterNodes);
+            ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
 
             // for cluster mode
             if (_isClusterEnabled && _clusterManager.getIsMaster()) {
@@ -1006,6 +1004,14 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
                     if (!nodeId.equals(_nodeId)) recoverStaleNode(nodeId);
                 }
             }
+            /*for (String nodeId : _knownNodes) {
+                Long lastSeen = _lastHeartBeat.get(nodeId);
+                if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
+                        && !_nodeId.equals(nodeId))
+                {
+                    recoverStaleNode(nodeId);
+                }
+            }*/
         }
     }