You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/11/06 11:51:19 UTC

[24/30] ode git commit: Cluster Enabled Simple Scheduler-1

Cluster Enabled Simple Scheduler-1


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/15f1883c
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/15f1883c
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/15f1883c

Branch: refs/heads/ODE-563
Commit: 15f1883c40e845b430c5c40418333ccccfa48b6a
Parents: 9ffe0c7
Author: suba <su...@cse.mrt.ac.lk>
Authored: Sun Jul 19 00:41:15 2015 +0530
Committer: suba <su...@cse.mrt.ac.lk>
Committed: Sun Jul 19 00:41:15 2015 +0530

----------------------------------------------------------------------
 .../java/org/apache/ode/axis2/ODEServer.java    |  12 +-
 .../org/apache/ode/test/BPELTestAbstract.java   |  53 ++++----
 .../hazelcast/HazelcastClusterImpl.java         |  29 ++++-
 .../java/org/apache/ode/jbi/OdeLifeCycle.java   |  25 ++--
 .../ode/scheduler/simple/SchedulerListener.java |  27 ++++
 .../ode/scheduler/simple/SimpleScheduler.java   | 123 +++++++++++++++----
 6 files changed, 191 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index da62139..6803350 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -50,6 +50,7 @@ import org.apache.ode.store.ClusterProcessStoreImpl;
 import org.apache.ode.store.ProcessStoreImpl;
 import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.fs.TempFileManager;
+import org.omg.CORBA.StringHolder;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
@@ -195,7 +196,10 @@ public class ODEServer {
         registerExternalVariableModules();
 
         _store.loadAll();
-        if (_clusterManager != null) _clusterManager.registerClusterProcessStoreMessageListener();
+        if (_clusterManager != null) {
+            _clusterManager.registerClusterProcessStoreMessageListener();
+            _clusterManager.setScheduler(_scheduler);
+        }
 
         try {
             _bpelServer.start();
@@ -524,8 +528,10 @@ public class ODEServer {
     }
 
     protected Scheduler createScheduler() {
-        SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(),
-                new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
+        String nodeId;
+        if (isClusteringEnabled) nodeId = _clusterManager.getUuid();
+        else nodeId = new GUID().toString();
+        SimpleScheduler scheduler = new SimpleScheduler(nodeId, new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled);
         scheduler.setExecutorService(_executorService);
         scheduler.setTransactionManager(_txMgr);
         return scheduler;

http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
----------------------------------------------------------------------
diff --git a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
index d24b59b..cdda50e 100644
--- a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
+++ b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
@@ -18,44 +18,14 @@
  */
 package org.apache.ode.test;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.xml.namespace.QName;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.common.evt.DebugBpelEventListener;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.engine.BpelServerImpl;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.*;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
-import org.apache.ode.bpel.iapi.ProcessStore;
-import org.apache.ode.bpel.iapi.ProcessStoreEvent;
-import org.apache.ode.bpel.iapi.ProcessStoreListener;
 import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
 import org.apache.ode.il.EmbeddedGeronimoFactory;
 import org.apache.ode.il.config.OdeConfigProperties;
@@ -71,6 +41,25 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.w3c.dom.Element;
 
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 public abstract class BPELTestAbstract {
     private static final Log log = LogFactory.getLog(BPELTestAbstract.class);
     public static final long WAIT_BEFORE_INVOKE_TIMEOUT = 2000;
@@ -139,7 +128,7 @@ public abstract class BPELTestAbstract {
 
         {
             JdbcDelegate del = new JdbcDelegate(_dataSource);
-            scheduler = new SimpleScheduler("node", del, props);
+            scheduler = new SimpleScheduler("node", del, props,false);
             scheduler.setTransactionManager(_txManager);
             _cf = new BpelDAOConnectionFactoryImpl(scheduler);
             _server.setDaoConnectionFactory(_cf);

http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index 5f2b8f5..63a889a 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.ode.bpel.clapi.*;
+import org.apache.ode.bpel.iapi.Scheduler;
 
 /**
  * This class implements necessary methods to build the cluster using hazelcast
@@ -46,6 +47,7 @@ public class HazelcastClusterImpl implements ClusterManager {
     private IMap<Long, Long> instance_lock_map;
     private ITopic<ProcessStoreClusterEvent> clusterMessageTopic;
     private ClusterProcessStore _clusterProcessStore;
+    private Scheduler _scheduler;
     private ClusterLock<String> _hazelcastDeploymentLock;
     private ClusterLock<Long> _hazelcastInstanceLock;
 
@@ -89,13 +91,17 @@ public class HazelcastClusterImpl implements ClusterManager {
     class ClusterMemberShipListener implements MembershipListener {
         @Override
         public void memberAdded(MembershipEvent membershipEvent) {
-            __log.info("Member Added " +membershipEvent.getMember().getUuid());
+            String nodeId =  membershipEvent.getMember().getUuid();
+            __log.info("Member Added " +nodeId);
+            if(isMaster) _simpleScheduler.memberAdded(nodeId);
         }
 
         @Override
         public void memberRemoved(MembershipEvent membershipEvent) {
-            __log.info("Member Removed " +membershipEvent.getMember().getUuid());
+            String nodeId =  membershipEvent.getMember().getUuid();
+            __log.info("Member Removed " +nodeId);
             markAsMaster();
+            if(isMaster) _simpleScheduler.memberRemoved(nodeId, uuid);
         }
 
         @Override
@@ -145,6 +151,7 @@ public class HazelcastClusterImpl implements ClusterManager {
         leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
         if (leader.localMember()) {
             isMaster = true;
+            _simpleScheduler.setIsMasterNode(true);
         }
         __log.info(isMaster);
     }
@@ -153,8 +160,17 @@ public class HazelcastClusterImpl implements ClusterManager {
         return isMaster;
     }
 
+    public String getUuid() {
+        return uuid;
+    }
+
     public void setClusterProcessStore(ClusterProcessStore store) {
-            _clusterProcessStore = store;
+        _clusterProcessStore = store;
+    }
+
+    public void setScheduler(Scheduler scheduler) {
+        _scheduler = scheduler;
+        _scheduler.setClusterManager(this);
     }
 
     public void registerClusterProcessStoreMessageListener() {
@@ -172,5 +188,12 @@ public class HazelcastClusterImpl implements ClusterManager {
     public ClusterLock<Long> getInstanceLock(){
         return _hazelcastInstanceLock;
     }
+
+    public List<String> getKnownNodes() {
+        List<String> nodesList = new ArrayList<String>();
+        for(Member m : _hazelcastInstance.getCluster().getMembers())
+          nodesList.add(m.getUuid()) ;
+        return nodesList;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
----------------------------------------------------------------------
diff --git a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
index 40fb044..0c1b296 100644
--- a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
+++ b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
@@ -19,18 +19,6 @@
 
 package org.apache.ode.jbi;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.util.concurrent.Executors;
-
-import javax.jbi.JBIException;
-import javax.jbi.component.ComponentContext;
-import javax.jbi.component.ComponentLifeCycle;
-import javax.jbi.component.ServiceUnitManager;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.transaction.TransactionManager;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
@@ -50,6 +38,17 @@ import org.apache.ode.store.ProcessStoreImpl;
 import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.fs.TempFileManager;
 
+import javax.jbi.JBIException;
+import javax.jbi.component.ComponentContext;
+import javax.jbi.component.ComponentLifeCycle;
+import javax.jbi.component.ServiceUnitManager;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.transaction.TransactionManager;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.concurrent.Executors;
+
 /**
  * This class implements ComponentLifeCycle. The JBI framework will start this engine class automatically when JBI framework starts
  * up.
@@ -243,7 +242,7 @@ public class OdeLifeCycle implements ComponentLifeCycle {
             _ode._executorService = Executors.newCachedThreadPool();
         else
             _ode._executorService = Executors.newFixedThreadPool(_ode._config.getThreadPoolMaxSize());
-        _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties());
+        _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties(),false);
         _ode._scheduler.setJobProcessor(_ode._server);
         _ode._scheduler.setExecutorService(_ode._executorService);
         _ode._scheduler.setTransactionManager((TransactionManager) _ode.getContext().getTransactionManager());

http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java
new file mode 100644
index 0000000..3786912
--- /dev/null
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.scheduler.simple;
+
+public interface SchedulerListener {
+
+    void memberAdded(String nodeId);
+
+    void memberRemoved(String nodeId,String masterId);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
index a56b86e..3b6ec4d 100644
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
@@ -19,33 +19,19 @@
 
 package org.apache.ode.scheduler.simple;
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.clapi.ClusterManager;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Scheduler;
 
+import javax.transaction.*;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * A reliable and relatively simple scheduler that uses a database to persist information about
  * scheduled tasks.
@@ -66,7 +52,7 @@ import org.apache.ode.bpel.iapi.Scheduler;
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
  *
  */
-public class SimpleScheduler implements Scheduler, TaskRunner {
+public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener {
     private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
 
     private static final int DEFAULT_TRANSACTION_TIMEOUT = 60 * 1000;
@@ -114,6 +100,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
 
     private DatabaseDelegate _db;
 
+    private boolean _isClusterEnabled;
+
+    private ClusterManager _clusterManager;
+
     /** All the nodes we know about */
     private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
 
@@ -147,9 +137,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
 
     private DateFormat debugDateFormatter = new SimpleDateFormat("HH:mm:ss,SSS");
 
-    public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
+    public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf, boolean clusterState) {
         _nodeId = nodeId;
         _db = del;
+        _isClusterEnabled = clusterState;
         _todoLimit = getIntProperty(conf, "ode.scheduler.queueLength", _todoLimit);
         _immediateInterval = getLongProperty(conf, "ode.scheduler.immediateInterval", _immediateInterval);
         _nearFutureInterval = getLongProperty(conf, "ode.scheduler.nearFutureInterval", _nearFutureInterval);
@@ -183,6 +174,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
         _nodeId = nodeId;
     }
 
+    public void setClusterManager(ClusterManager cm) {
+        _clusterManager = cm;
+    }
+
     public void setStaleInterval(long staleInterval) {
         _staleInterval = staleInterval;
     }
@@ -490,10 +485,11 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
         _todo.enqueue(new LoadImmediateTask(now));
 
         // schedule check for stale nodes, make it random so that the nodes don't overlap.
-        _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+        if (!_isClusterEnabled)
+            _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
 
         // do the upgrade sometime (random) in the immediate interval.
-        _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+        enqueUpgradeJobsTask(now);
 
         _todo.start();
         _running = true;
@@ -521,6 +517,18 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
         _running = false;
     }
 
+    public void memberAdded(final String nodeId) {
+        _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis()+ randomMean(_immediateInterval)));
+    }
+
+    public void memberRemoved(final String nodeId, final String masterId) {
+        recoverClusterStaleNodes(nodeId, masterId);
+    }
+
+    public void enqueUpgradeJobsTask(long now) {
+        _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+    }
+
     class RunJob implements Callable<Void> {
         final Job job;
         final JobProcessor processor;
@@ -814,6 +822,41 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
 
     }
 
+    boolean doClusterJobsUpgrade() {
+            __log.debug("UPGRADE started for Cluster Mode");
+            final ArrayList<String> knownNodes = _clusterManager.getKnownNodes();
+            Collections.sort(knownNodes);
+
+            // We're going to try to upgrade near future jobs using the db only.
+            // We assume that the distribution of the trailing digits in the
+            // scheduled time are uniformly distributed, and use modular division
+            // of the time by the number of nodes to create the node assignment.
+            // This can be done in a single update statement.
+            final long maxtime = System.currentTimeMillis() + _nearFutureInterval;
+            try {
+                return execTransaction(new Callable<Boolean>() {
+
+                    public Boolean call() throws Exception {
+                        int numNodes = knownNodes.size();
+                        for (int i = 0; i < numNodes; ++i) {
+                            String node = knownNodes.get(i);
+                            _db.updateAssignToNode(node, i, numNodes, maxtime);
+                        }
+                        return true;
+                    }
+
+                });
+
+            } catch (Exception ex) {
+                __log.error("Database error upgrading jobs.", ex);
+                return false;
+            } finally {
+                __log.debug("UPGRADE complete");
+            }
+
+        }
+
+
     /**
      * Re-assign stale node's jobs to self.
      * @param nodeId
@@ -857,6 +900,31 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
 //        return delay;
 //    }
 
+    void recoverClusterStaleNodes(final String nodeId, final String masterId) {
+        if (__log.isDebugEnabled()) {
+            __log.debug("recovering stale nodes for Cluster Mode " + nodeId);
+        }
+        try {
+            int numrows = execTransaction(new Callable<Integer>() {
+                public Integer call() throws Exception {
+                    return _db.updateReassign(nodeId, masterId);
+                }
+            });
+
+            if (__log.isDebugEnabled()) {
+                __log.debug("reassigned " + numrows + " jobs to master node. ");
+            }
+
+            // Force a load-immediate to catch anything new from the recovered node.
+            doLoadImmediate();
+
+        } catch (Exception ex) {
+            __log.error("Database error reassigning node.", ex);
+        } finally {
+            __log.debug("node recovery complete");
+        }
+    }
+
     private abstract class SchedulerTask extends Task implements Runnable {
         SchedulerTask(long schedDate) {
             super(schedDate);
@@ -911,7 +979,8 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
 
             boolean success = false;
             try {
-                success = doUpgrade();
+               if (_isClusterEnabled && _clusterManager.getIsMaster()) success = doClusterJobsUpgrade();
+                else success = doUpgrade();
             } finally {
                 long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 1000);
                 _nextUpgrade.set(future);