You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/11/06 11:51:19 UTC
[24/30] ode git commit: Cluster Enabled Simple Scheduler-1
Cluster Enabled Simple Scheduler-1
Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/15f1883c
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/15f1883c
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/15f1883c
Branch: refs/heads/ODE-563
Commit: 15f1883c40e845b430c5c40418333ccccfa48b6a
Parents: 9ffe0c7
Author: suba <su...@cse.mrt.ac.lk>
Authored: Sun Jul 19 00:41:15 2015 +0530
Committer: suba <su...@cse.mrt.ac.lk>
Committed: Sun Jul 19 00:41:15 2015 +0530
----------------------------------------------------------------------
.../java/org/apache/ode/axis2/ODEServer.java | 12 +-
.../org/apache/ode/test/BPELTestAbstract.java | 53 ++++----
.../hazelcast/HazelcastClusterImpl.java | 29 ++++-
.../java/org/apache/ode/jbi/OdeLifeCycle.java | 25 ++--
.../ode/scheduler/simple/SchedulerListener.java | 27 ++++
.../ode/scheduler/simple/SimpleScheduler.java | 123 +++++++++++++++----
6 files changed, 191 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index da62139..6803350 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -50,6 +50,7 @@ import org.apache.ode.store.ClusterProcessStoreImpl;
import org.apache.ode.store.ProcessStoreImpl;
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.fs.TempFileManager;
+import org.omg.CORBA.StringHolder;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
@@ -195,7 +196,10 @@ public class ODEServer {
registerExternalVariableModules();
_store.loadAll();
- if (_clusterManager != null) _clusterManager.registerClusterProcessStoreMessageListener();
+ if (_clusterManager != null) {
+ _clusterManager.registerClusterProcessStoreMessageListener();
+ _clusterManager.setScheduler(_scheduler);
+ }
try {
_bpelServer.start();
@@ -524,8 +528,10 @@ public class ODEServer {
}
protected Scheduler createScheduler() {
- SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(),
- new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
+ String nodeId;
+ if (isClusteringEnabled) nodeId = _clusterManager.getUuid();
+ else nodeId = new GUID().toString();
+ SimpleScheduler scheduler = new SimpleScheduler(nodeId, new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled);
scheduler.setExecutorService(_executorService);
scheduler.setTransactionManager(_txMgr);
return scheduler;
http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
----------------------------------------------------------------------
diff --git a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
index d24b59b..cdda50e 100644
--- a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
+++ b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
@@ -18,44 +18,14 @@
*/
package org.apache.ode.test;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.xml.namespace.QName;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.common.evt.DebugBpelEventListener;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.*;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
-import org.apache.ode.bpel.iapi.ProcessStore;
-import org.apache.ode.bpel.iapi.ProcessStoreEvent;
-import org.apache.ode.bpel.iapi.ProcessStoreListener;
import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
import org.apache.ode.il.EmbeddedGeronimoFactory;
import org.apache.ode.il.config.OdeConfigProperties;
@@ -71,6 +41,25 @@ import org.junit.Assert;
import org.junit.Before;
import org.w3c.dom.Element;
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
public abstract class BPELTestAbstract {
private static final Log log = LogFactory.getLog(BPELTestAbstract.class);
public static final long WAIT_BEFORE_INVOKE_TIMEOUT = 2000;
@@ -139,7 +128,7 @@ public abstract class BPELTestAbstract {
{
JdbcDelegate del = new JdbcDelegate(_dataSource);
- scheduler = new SimpleScheduler("node", del, props);
+ scheduler = new SimpleScheduler("node", del, props,false);
scheduler.setTransactionManager(_txManager);
_cf = new BpelDAOConnectionFactoryImpl(scheduler);
_server.setDaoConnectionFactory(_cf);
http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index 5f2b8f5..63a889a 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.clapi.*;
+import org.apache.ode.bpel.iapi.Scheduler;
/**
* This class implements necessary methods to build the cluster using hazelcast
@@ -46,6 +47,7 @@ public class HazelcastClusterImpl implements ClusterManager {
private IMap<Long, Long> instance_lock_map;
private ITopic<ProcessStoreClusterEvent> clusterMessageTopic;
private ClusterProcessStore _clusterProcessStore;
+ private Scheduler _scheduler;
private ClusterLock<String> _hazelcastDeploymentLock;
private ClusterLock<Long> _hazelcastInstanceLock;
@@ -89,13 +91,17 @@ public class HazelcastClusterImpl implements ClusterManager {
class ClusterMemberShipListener implements MembershipListener {
@Override
public void memberAdded(MembershipEvent membershipEvent) {
- __log.info("Member Added " +membershipEvent.getMember().getUuid());
+ String nodeId = membershipEvent.getMember().getUuid();
+ __log.info("Member Added " +nodeId);
+ if(isMaster) _simpleScheduler.memberAdded(nodeId);
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
- __log.info("Member Removed " +membershipEvent.getMember().getUuid());
+ String nodeId = membershipEvent.getMember().getUuid();
+ __log.info("Member Removed " +nodeId);
markAsMaster();
+ if(isMaster) _simpleScheduler.memberRemoved(nodeId, uuid);
}
@Override
@@ -145,6 +151,7 @@ public class HazelcastClusterImpl implements ClusterManager {
leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
if (leader.localMember()) {
isMaster = true;
+ _simpleScheduler.setIsMasterNode(true);
}
__log.info(isMaster);
}
@@ -153,8 +160,17 @@ public class HazelcastClusterImpl implements ClusterManager {
return isMaster;
}
+ public String getUuid() {
+ return uuid;
+ }
+
public void setClusterProcessStore(ClusterProcessStore store) {
- _clusterProcessStore = store;
+ _clusterProcessStore = store;
+ }
+
+ public void setScheduler(Scheduler scheduler) {
+ _scheduler = scheduler;
+ _scheduler.setClusterManager(this);
}
public void registerClusterProcessStoreMessageListener() {
@@ -172,5 +188,12 @@ public class HazelcastClusterImpl implements ClusterManager {
public ClusterLock<Long> getInstanceLock(){
return _hazelcastInstanceLock;
}
+
+ public List<String> getKnownNodes() {
+ List<String> nodesList = new ArrayList<String>();
+ for(Member m : _hazelcastInstance.getCluster().getMembers())
+ nodesList.add(m.getUuid()) ;
+ return nodesList;
+ }
}
http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
----------------------------------------------------------------------
diff --git a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
index 40fb044..0c1b296 100644
--- a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
+++ b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
@@ -19,18 +19,6 @@
package org.apache.ode.jbi;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.util.concurrent.Executors;
-
-import javax.jbi.JBIException;
-import javax.jbi.component.ComponentContext;
-import javax.jbi.component.ComponentLifeCycle;
-import javax.jbi.component.ServiceUnitManager;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.transaction.TransactionManager;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
@@ -50,6 +38,17 @@ import org.apache.ode.store.ProcessStoreImpl;
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.fs.TempFileManager;
+import javax.jbi.JBIException;
+import javax.jbi.component.ComponentContext;
+import javax.jbi.component.ComponentLifeCycle;
+import javax.jbi.component.ServiceUnitManager;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.transaction.TransactionManager;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.concurrent.Executors;
+
/**
* This class implements ComponentLifeCycle. The JBI framework will start this engine class automatically when JBI framework starts
* up.
@@ -243,7 +242,7 @@ public class OdeLifeCycle implements ComponentLifeCycle {
_ode._executorService = Executors.newCachedThreadPool();
else
_ode._executorService = Executors.newFixedThreadPool(_ode._config.getThreadPoolMaxSize());
- _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties());
+ _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties(),false);
_ode._scheduler.setJobProcessor(_ode._server);
_ode._scheduler.setExecutorService(_ode._executorService);
_ode._scheduler.setTransactionManager((TransactionManager) _ode.getContext().getTransactionManager());
http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java
new file mode 100644
index 0000000..3786912
--- /dev/null
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.scheduler.simple;
+
+public interface SchedulerListener {
+
+ void memberAdded(String nodeId);
+
+ void memberRemoved(String nodeId,String masterId);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
index a56b86e..3b6ec4d 100644
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
@@ -19,33 +19,19 @@
package org.apache.ode.scheduler.simple;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.clapi.ClusterManager;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Scheduler;
+import javax.transaction.*;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* A reliable and relatively simple scheduler that uses a database to persist information about
* scheduled tasks.
@@ -66,7 +52,7 @@ import org.apache.ode.bpel.iapi.Scheduler;
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
*
*/
-public class SimpleScheduler implements Scheduler, TaskRunner {
+public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener {
private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
private static final int DEFAULT_TRANSACTION_TIMEOUT = 60 * 1000;
@@ -114,6 +100,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
private DatabaseDelegate _db;
+ private boolean _isClusterEnabled;
+
+ private ClusterManager _clusterManager;
+
/** All the nodes we know about */
private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
@@ -147,9 +137,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
private DateFormat debugDateFormatter = new SimpleDateFormat("HH:mm:ss,SSS");
- public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
+ public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf, boolean clusterState) {
_nodeId = nodeId;
_db = del;
+ _isClusterEnabled = clusterState;
_todoLimit = getIntProperty(conf, "ode.scheduler.queueLength", _todoLimit);
_immediateInterval = getLongProperty(conf, "ode.scheduler.immediateInterval", _immediateInterval);
_nearFutureInterval = getLongProperty(conf, "ode.scheduler.nearFutureInterval", _nearFutureInterval);
@@ -183,6 +174,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
_nodeId = nodeId;
}
+ public void setClusterManager(ClusterManager cm) {
+ _clusterManager = cm;
+ }
+
public void setStaleInterval(long staleInterval) {
_staleInterval = staleInterval;
}
@@ -490,10 +485,11 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
_todo.enqueue(new LoadImmediateTask(now));
// schedule check for stale nodes, make it random so that the nodes don't overlap.
- _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+ if (!_isClusterEnabled)
+ _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
// do the upgrade sometime (random) in the immediate interval.
- _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+ enqueUpgradeJobsTask(now);
_todo.start();
_running = true;
@@ -521,6 +517,18 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
_running = false;
}
+ public void memberAdded(final String nodeId) {
+ _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis()+ randomMean(_immediateInterval)));
+ }
+
+ public void memberRemoved(final String nodeId, final String masterId) {
+ recoverClusterStaleNodes(nodeId, masterId);
+ }
+
+ public void enqueUpgradeJobsTask(long now) {
+ _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+ }
+
class RunJob implements Callable<Void> {
final Job job;
final JobProcessor processor;
@@ -814,6 +822,41 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
}
+ boolean doClusterJobsUpgrade() {
+ __log.debug("UPGRADE started for Cluster Mode");
+ final ArrayList<String> knownNodes = _clusterManager.getKnownNodes();
+ Collections.sort(knownNodes);
+
+ // We're going to try to upgrade near future jobs using the db only.
+ // We assume that the distribution of the trailing digits in the
+ // scheduled time are uniformly distributed, and use modular division
+ // of the time by the number of nodes to create the node assignment.
+ // This can be done in a single update statement.
+ final long maxtime = System.currentTimeMillis() + _nearFutureInterval;
+ try {
+ return execTransaction(new Callable<Boolean>() {
+
+ public Boolean call() throws Exception {
+ int numNodes = knownNodes.size();
+ for (int i = 0; i < numNodes; ++i) {
+ String node = knownNodes.get(i);
+ _db.updateAssignToNode(node, i, numNodes, maxtime);
+ }
+ return true;
+ }
+
+ });
+
+ } catch (Exception ex) {
+ __log.error("Database error upgrading jobs.", ex);
+ return false;
+ } finally {
+ __log.debug("UPGRADE complete");
+ }
+
+ }
+
+
/**
* Re-assign stale node's jobs to self.
* @param nodeId
@@ -857,6 +900,31 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
// return delay;
// }
+ void recoverClusterStaleNodes(final String nodeId, final String masterId) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("recovering stale nodes for Cluster Mode " + nodeId);
+ }
+ try {
+ int numrows = execTransaction(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ return _db.updateReassign(nodeId, masterId);
+ }
+ });
+
+ if (__log.isDebugEnabled()) {
+ __log.debug("reassigned " + numrows + " jobs to master node. ");
+ }
+
+ // Force a load-immediate to catch anything new from the recovered node.
+ doLoadImmediate();
+
+ } catch (Exception ex) {
+ __log.error("Database error reassigning node.", ex);
+ } finally {
+ __log.debug("node recovery complete");
+ }
+ }
+
private abstract class SchedulerTask extends Task implements Runnable {
SchedulerTask(long schedDate) {
super(schedDate);
@@ -911,7 +979,8 @@ public class SimpleScheduler implements Scheduler, TaskRunner {
boolean success = false;
try {
- success = doUpgrade();
+ if (_isClusterEnabled && _clusterManager.getIsMaster()) success = doClusterJobsUpgrade();
+ else success = doUpgrade();
} finally {
long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 1000);
_nextUpgrade.set(future);