You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/11/06 11:51:22 UTC
[27/30] ode git commit: Done cleanup within cluster implementation
Done cleanup within cluster implementation
Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/348ae9de
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/348ae9de
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/348ae9de
Branch: refs/heads/ODE-563
Commit: 348ae9deb804ac5e9a9f1bafd1ebdbf53b490e1a
Parents: 43a8df8
Author: suba <su...@cse.mrt.ac.lk>
Authored: Sun Jul 26 15:55:32 2015 +0530
Committer: suba <su...@cse.mrt.ac.lk>
Committed: Sun Jul 26 15:55:32 2015 +0530
----------------------------------------------------------------------
.../java/org/apache/ode/axis2/ODEServer.java | 37 ++---
.../ode/axis2/deploy/DeploymentPoller.java | 3 +-
.../ode/axis2/service/DeploymentWebService.java | 40 ++---
.../apache/ode/bpel/clapi/ClusterManager.java | 7 +-
.../bpel/clapi/ProcessStoreClusterEvent.java | 11 +-
.../bpel/clapi/ProcessStoreClusterListener.java | 24 +++
.../ode/il/config/OdeConfigProperties.java | 12 +-
.../ode/store/ClusterProcessStoreImpl.java | 5 +-
.../hazelcast/HazelcastClusterImpl.java | 152 ++++++++++++++-----
.../hazelcast/HazelcastConstants.java | 6 +-
repositories.rb | 2 +-
.../ode/scheduler/simple/SimpleScheduler.java | 16 +-
12 files changed, 192 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index b3f5d2f..4860150 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -32,6 +32,7 @@ import org.apache.ode.axis2.service.ManagementService;
import org.apache.ode.axis2.util.ClusterUrlTransformer;
import org.apache.ode.bpel.clapi.ClusterManager;
import org.apache.ode.bpel.clapi.ClusterMemberListener;
+import org.apache.ode.bpel.clapi.ClusterProcessStore;
import org.apache.ode.bpel.connector.BpelServerConnector;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
@@ -119,7 +120,7 @@ public class ODEServer {
public Runnable txMgrCreatedCallback;
- private boolean isClusteringEnabled = false;
+ private boolean clusteringEnabled = false;
public void init(ServletConfig config, ConfigurationContext configContext) throws ServletException {
init(config.getServletContext().getRealPath("/WEB-INF"), configContext);
@@ -173,8 +174,8 @@ public class ODEServer {
txMgrCreatedCallback.run();
}
- String clusteringState = _odeConfig.getClusteringState();
- if (clusteringState != null && isClusteringEnabled(clusteringState)) {
+ clusteringEnabled = _odeConfig.isClusteringEnabled();
+ if (clusteringEnabled) {
initClustering();
} else __log.info(__msgs.msgOdeClusteringNotInitialized());
@@ -197,10 +198,9 @@ public class ODEServer {
_store.loadAll();
if (_clusterManager != null) {
- _clusterManager.registerClusterProcessStoreMessageListener();
- if (_scheduler instanceof SimpleScheduler) {
- _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler);
- }
+ _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler);
+ _clusterManager.setClusterProcessStore((ClusterProcessStore) _store);
+ _clusterManager.init(_configRoot);
}
try {
@@ -466,20 +466,8 @@ public class ODEServer {
}
}
- private boolean isClusteringEnabled(String clusteringState) {
- boolean state;
- if (clusteringState.equals("true")) state = true;
- else state = false;
- setClustering(state);
- return state;
- }
-
- private void setClustering (boolean state) {
- isClusteringEnabled = state;
- }
-
- public boolean getIsCluteringEnabled() {
- return isClusteringEnabled;
+ public boolean isClusteringEnabled() {
+ return clusteringEnabled;
}
/**
@@ -493,7 +481,6 @@ public class ODEServer {
} catch (Exception ex) {
__log.error("Error while loading class : " + clusterImplName, ex);
}
- _clusterManager.init(_configRoot);
}
/**
@@ -524,15 +511,15 @@ public class ODEServer {
}
protected ProcessStoreImpl createProcessStore(EndpointReferenceContext eprContext, DataSource ds) {
- if (isClusteringEnabled)
+ if (clusteringEnabled)
return new ClusterProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false, _clusterManager);
else return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false);
}
protected Scheduler createScheduler() {
SimpleScheduler scheduler;
- if (isClusteringEnabled) {
- scheduler = new SimpleScheduler(_clusterManager.getUuid(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled);
+ if (clusteringEnabled) {
+ scheduler = new SimpleScheduler(_clusterManager.getNodeID(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), clusteringEnabled);
scheduler.setClusterManager(_clusterManager);
} else
scheduler = new SimpleScheduler(new GUID().toString(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
index baa790b..169ca4f 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
@@ -49,7 +49,6 @@ import org.apache.ode.utils.WatchDog;
import javax.xml.namespace.QName;
import java.io.File;
import java.io.FileFilter;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
@@ -99,7 +98,7 @@ public class DeploymentPoller {
public DeploymentPoller(File deployDir, final ODEServer odeServer) {
_odeServer = odeServer;
_deployDir = deployDir;
- clusterEnabled = _odeServer.getIsCluteringEnabled();
+ clusterEnabled = _odeServer.isClusteringEnabled();
if (!_deployDir.exists()) {
boolean isDeployDirCreated = _deployDir.mkdir();
if (!isDeployDirCreated) {
http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
index 61bf00d..01e003b 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
@@ -20,24 +20,6 @@
package org.apache.ode.axis2.service;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.List;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-
-import javax.activation.DataHandler;
-import javax.wsdl.Definition;
-import javax.wsdl.WSDLException;
-import javax.wsdl.factory.WSDLFactory;
-import javax.wsdl.xml.WSDLReader;
-import javax.xml.namespace.QName;
-
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMNamespace;
@@ -45,27 +27,37 @@ import org.apache.axiom.om.OMText;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.soap.SOAPFactory;
import org.apache.axis2.AxisFault;
-import org.apache.ode.bpel.clapi.ClusterLock;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.receivers.AbstractMessageReceiver;
import org.apache.axis2.util.Utils;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.lang.StringUtils;
import org.apache.ode.axis2.ODEServer;
import org.apache.ode.axis2.OdeFault;
import org.apache.ode.axis2.deploy.DeploymentPoller;
import org.apache.ode.axis2.hooks.ODEAxisService;
-import org.apache.ode.bpel.iapi.BpelServer;
+import org.apache.ode.bpel.clapi.ClusterLock;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.ProcessStore;
import org.apache.ode.il.OMUtils;
-import org.apache.ode.utils.fs.FileUtils;
import org.apache.ode.utils.Namespaces;
+import org.apache.ode.utils.fs.FileUtils;
+
+import javax.activation.DataHandler;
+import javax.wsdl.Definition;
+import javax.wsdl.WSDLException;
+import javax.wsdl.factory.WSDLFactory;
+import javax.wsdl.xml.WSDLReader;
+import javax.xml.namespace.QName;
+import java.io.*;
+import java.util.Collection;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
/**
* Axis wrapper for process deployment.
@@ -95,7 +87,7 @@ public class DeploymentWebService {
_store = store;
_poller = poller;
_odeServer = odeServer;
- clusterEnabled = _odeServer.getIsCluteringEnabled();
+ clusterEnabled = _odeServer.isClusteringEnabled();
Definition def;
WSDLReader wsdlReader = WSDLFactory.newInstance().newWSDLReader();
http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
index 07d3d8d..5a2e0f9 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -38,7 +38,7 @@ public interface ClusterManager {
* Return whether the local member is Master or not
* @return
*/
- boolean getIsMaster();
+ boolean isMaster();
/**
* Set the Process Store object which uses for clustering
@@ -55,8 +55,7 @@ public interface ClusterManager {
/**
* Register the cluster for message listener
*/
- void registerClusterProcessStoreMessageListener();
-
+ void registerClusterProcessStoreMessageListener(ProcessStoreClusterListener listener);
/**
* Register Scheduler as ClusterMemberListener
* @param listener
@@ -81,5 +80,5 @@ public interface ClusterManager {
/**
* Return local member's uuid in the cluster
*/
- String getUuid();
+ String getNodeID();
}
http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
index a396f6f..79d9a78 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
@@ -27,7 +27,8 @@ public abstract class ProcessStoreClusterEvent implements Serializable {
protected String info ;
- private String uuid;
+ /** Unique ID of the Node in the Cluster generating the Event */
+ private String eventGeneratingNode;
public ProcessStoreClusterEvent(String deploymentUnit) {
this.deploymentUnit = deploymentUnit;
@@ -38,12 +39,12 @@ public abstract class ProcessStoreClusterEvent implements Serializable {
return "{ProcessStoreClusterEvent#" + deploymentUnit +"}";
}
- public void setUuid(String uuid) {
- this.uuid = uuid;
+ public void setEventGeneratingNode(String uuid) {
+ this.eventGeneratingNode = uuid;
}
- public String getUuid() {
- return uuid;
+ public String getEventGeneratingNode() {
+ return eventGeneratingNode;
}
public String getDuName() {
http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java
new file mode 100644
index 0000000..26f42cf
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.bpel.clapi;
+
+public interface ProcessStoreClusterListener {
+ public void onProcessStoreClusterEvent(ProcessStoreClusterEvent message);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
----------------------------------------------------------------------
diff --git a/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java b/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
index 5c0ed13..5697422 100644
--- a/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
+++ b/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
@@ -19,6 +19,10 @@
package org.apache.ode.il.config;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.utils.SystemUtils;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -26,10 +30,6 @@ import java.io.IOException;
import java.util.Map;
import java.util.Properties;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ode.utils.SystemUtils;
-
/**
* Configuration object used for configuring the intergration layer. The propereties are those likely to be common to all layers.
*
@@ -295,8 +295,8 @@ public class OdeConfigProperties {
return getProperty(OdeConfigProperties.PROP_DEPLOY_DIR);
}
- public String getClusteringState() {
- return getProperty(OdeConfigProperties.PROP_CLUSTERING_ENABLED);
+ public boolean isClusteringEnabled() {
+ return Boolean.valueOf(getProperty(OdeConfigProperties.PROP_CLUSTERING_ENABLED, "false"));
}
public String getClusteringImplClass() {
http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
index 51fea5a..d701e23 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
@@ -47,7 +47,6 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl implements Cluster
public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, ClusterManager clusterManager) {
super(eprContext,ds,persistenceType,props,createDatamodel);
_clusterManager = clusterManager;
- _clusterManager.setClusterProcessStore(this);
}
public Collection<QName> deploy(final File deploymentUnitDirectory) {
@@ -59,7 +58,7 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl implements Cluster
private void publishProcessStoreDeployedEvent(String duName){
deployedEvent = new ProcessStoreDeployedEvent(duName);
_clusterManager.publishProcessStoreClusterEvent(deployedEvent);
- __log.info("Completed actual deployment for " +duName +" by " +deployedEvent.getUuid());
+ __log.info("Completed actual deployment for " +duName +" by " +deployedEvent.getEventGeneratingNode());
}
public void deployProcesses(final String duName) {
@@ -121,7 +120,7 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl implements Cluster
private void publishProcessStoreUndeployedEvent(String duName){
undeployedEvent = new ProcessStoreUndeployedEvent(duName);
_clusterManager.publishProcessStoreClusterEvent(undeployedEvent);
- __log.info("Completed actual undeployment for " +duName +" by " +undeployedEvent.getUuid());
+ __log.info("Completed actual undeployment for " +duName +" by " +undeployedEvent.getEventGeneratingNode());
}
/**
http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index f68068a..9d2a554 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -18,8 +18,20 @@
*/
package org.apache.ode.clustering.hazelcast;
-import com.hazelcast.core.*;
+import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.config.ListenerConfig;
+import com.hazelcast.config.TopicConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberAttributeEvent;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
import java.io.File;
import java.io.FileNotFoundException;
@@ -34,21 +46,28 @@ import org.apache.ode.bpel.clapi.*;
/**
* This class implements necessary methods to build the cluster using hazelcast
*/
-public class HazelcastClusterImpl implements ClusterManager {
+public class HazelcastClusterImpl implements ClusterManager, ProcessStoreClusterListener {
private static final Log __log = LogFactory.getLog(HazelcastClusterImpl.class);
private HazelcastInstance _hazelcastInstance;
private boolean isMaster = false;
+ private String nodeHostName;
private String nodeID;
- private String uuid;
- private Member leader;
private IMap<String, String> deployment_lock_map;
private IMap<Long, Long> instance_lock_map;
- private ITopic<ProcessStoreClusterEvent> clusterMessageTopic;
+ private ITopic<ProcessStoreClusterEvent> clusterDeploymentMessageTopic;
private ClusterProcessStore _clusterProcessStore;
- private ClusterMemberListener _listener;
private ClusterLock<String> _hazelcastDeploymentLock;
private ClusterLock<Long> _hazelcastInstanceLock;
+ private ClusterDeploymentMessageListener clusterDeploymentMessageListener;
+ private ClusterMemberShipListener clusterMemberShipListener;
+ private List<ClusterMemberListener> clusterMemberListenerList = null;
+
+ public HazelcastClusterImpl() {
+ clusterMemberShipListener = new ClusterMemberShipListener();
+ clusterDeploymentMessageListener = new ClusterDeploymentMessageListener();
+ clusterDeploymentMessageListener.registerClusterProcessStoreListener((ProcessStoreClusterListener)this);
+ }
public void init(File configRoot) {
@@ -63,7 +82,8 @@ public class HazelcastClusterImpl implements ClusterManager {
__log.error("hazelcast.xml does not exist or is not a file");
else
try {
- _hazelcastInstance = Hazelcast.newHazelcastInstance(new FileSystemXmlConfig(hzXml));
+ Config config = loadConfig(hzXml);
+ _hazelcastInstance = Hazelcast.newHazelcastInstance(config);
} catch (FileNotFoundException fnf) {
__log.error(fnf);
}
@@ -71,37 +91,71 @@ public class HazelcastClusterImpl implements ClusterManager {
if (_hazelcastInstance != null) {
// Registering this node in the cluster.
- _hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener());
+ //_hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener());
Member localMember = _hazelcastInstance.getCluster().getLocalMember();
- nodeID = localMember.getInetSocketAddress().getHostName() +":" +localMember.getInetSocketAddress().getPort();
- uuid = localMember.getUuid();
- __log.info("Registering HZ localMember ID " + nodeID);
-
- markAsMaster();
+ nodeHostName = localMember.getSocketAddress().getHostName() + ":" + localMember.getSocketAddress().getPort();
+ nodeID = localMember.getUuid();
+ __log.info("Registering HZ localMember:" + nodeHostName);
deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK);
instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK);
- clusterMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_MSG);
+ clusterDeploymentMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC);
_hazelcastDeploymentLock = (ClusterLock) new HazelcastDeploymentLock(deployment_lock_map);
_hazelcastInstanceLock = (ClusterLock) new HazelcastInstanceLock(instance_lock_map);
+
+ markAsMaster();
}
}
+ protected Config loadConfig(File hazelcastConfigFile) throws FileNotFoundException {
+ Config config = new FileSystemXmlConfig(hazelcastConfigFile);
+
+ //add Cluster membership listener
+ ListenerConfig clusterMemberShipListenerConfig = new ListenerConfig();
+ clusterMemberShipListenerConfig.setImplementation(clusterMemberShipListener);
+ config.addListenerConfig(clusterMemberShipListenerConfig);
+
+ //set topic message listener
+ ListenerConfig topicListenerConfig = new ListenerConfig();
+ topicListenerConfig.setImplementation(clusterDeploymentMessageListener);
+ TopicConfig topicConfig = config.getTopicConfig(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC);
+ topicConfig.addMessageListenerConfig(topicListenerConfig);
+
+ return config;
+ }
+
class ClusterMemberShipListener implements MembershipListener {
+
+ public ClusterMemberShipListener() {
+ clusterMemberListenerList = new ArrayList<ClusterMemberListener>();
+ }
+
+ public void registerClusterMemberListener(ClusterMemberListener listener) {
+ clusterMemberListenerList.add(listener);
+ }
+
@Override
public void memberAdded(MembershipEvent membershipEvent) {
- String nodeId = membershipEvent.getMember().getUuid();
- __log.info("Member Added " +nodeId);
- if(isMaster && _listener != null) _listener.memberAdded(nodeId);
+ String eventNodeID = membershipEvent.getMember().getUuid();
+ __log.info("Member Added " + eventNodeID);
+ if (isMaster) {
+ for (ClusterMemberListener listener : clusterMemberListenerList) {
+ listener.memberAdded(eventNodeID);
+ }
+ }
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
- String nodeId = membershipEvent.getMember().getUuid();
- __log.info("Member Removed " + nodeId);
+ String eventNodeID = membershipEvent.getMember().getUuid();
+ __log.info("Member Removed " + eventNodeID);
markAsMaster();
- if(isMaster && _listener != null) _listener.memberRemoved(nodeId);
+ if (isMaster) {
+ for (ClusterMemberListener listener : clusterMemberListenerList) {
+ listener.memberRemoved(eventNodeID);
+ }
+ }
}
@Override
@@ -111,36 +165,48 @@ public class HazelcastClusterImpl implements ClusterManager {
}
public void publishProcessStoreClusterEvent(ProcessStoreClusterEvent clusterEvent) {
- clusterEvent.setUuid(uuid);
- __log.info("Send " +clusterEvent.getInfo() +" Cluster Message " +"for " +clusterEvent.getDuName() +" [" +nodeID +"]");
- clusterMessageTopic.publish(clusterEvent);
+ clusterEvent.setEventGeneratingNode(nodeID);
+ __log.info("Send " + clusterEvent.getInfo() + " Cluster Message " + "for " + clusterEvent.getDuName() + " [" + nodeHostName + "]");
+ clusterDeploymentMessageTopic.publish(clusterEvent);
}
- class ClusterMessageListener implements MessageListener<ProcessStoreClusterEvent> {
+ class ClusterDeploymentMessageListener implements MessageListener<ProcessStoreClusterEvent> {
+ List<ProcessStoreClusterListener> clusterProcessStoreListenerList = null;
+
+ public ClusterDeploymentMessageListener() {
+ clusterProcessStoreListenerList = new ArrayList<ProcessStoreClusterListener>();
+ }
+
+ public void registerClusterProcessStoreListener(ProcessStoreClusterListener listener) {
+ clusterProcessStoreListenerList.add(listener);
+ }
+
@Override
public void onMessage(Message<ProcessStoreClusterEvent> msg) {
- handleEvent(msg.getMessageObject());
+ for (ProcessStoreClusterListener listener : clusterProcessStoreListenerList) {
+ listener.onProcessStoreClusterEvent(msg.getMessageObject());
+ }
}
}
- private void handleEvent(ProcessStoreClusterEvent message) {
+ public void onProcessStoreClusterEvent(ProcessStoreClusterEvent message) {
if (message instanceof ProcessStoreDeployedEvent) {
ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message;
- String eventUuid = event.getUuid();
- if (!uuid.equals(eventUuid)) {
+ String eventUuid = event.getEventGeneratingNode();
+ if (!nodeID.equals(eventUuid)) {
String duName = event.getDuName();
- __log.info("Receive " +event.getInfo() +" Cluster Message " +"for " +event.getDuName() +" [" +nodeID +"]");
+ __log.info("Receive " + event.getInfo() + " Cluster Message " + "for " + event.getDuName() + " [" + nodeHostName + "]");
_clusterProcessStore.deployProcesses(duName);
}
}
else if (message instanceof ProcessStoreUndeployedEvent) {
ProcessStoreUndeployedEvent event = (ProcessStoreUndeployedEvent) message;
- String eventUuid = event.getUuid();
- if (!uuid.equals(eventUuid)) {
+ String eventUuid = event.getEventGeneratingNode();
+ if (!nodeID.equals(eventUuid)) {
String duName = event.getDuName();
- __log.info("Receive " +event.getInfo() +" Cluster Message " +"for " +event.getDuName() +" [" +nodeID +"]");
+ __log.info("Receive " + event.getInfo() + " Cluster Message " + "for " + event.getDuName() + " [" + nodeHostName + "]");
_clusterProcessStore.undeployProcesses(duName);
}
}
@@ -148,36 +214,38 @@ public class HazelcastClusterImpl implements ClusterManager {
}
private void markAsMaster() {
- leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
- if (leader.localMember() && isMaster == false) {
+ Member member = _hazelcastInstance.getCluster().getMembers().iterator().next();
+ if (member.localMember() && isMaster == false) {
isMaster = true;
- if(_listener != null) _listener.memberElectedAsMaster(uuid);
+ for (ClusterMemberListener listener : clusterMemberListenerList) {
+ listener.memberElectedAsMaster(nodeID);
+ }
}
__log.info(isMaster);
}
- public boolean getIsMaster() {
+ public boolean isMaster() {
return isMaster;
}
- public String getUuid() {
- return uuid;
+ public String getNodeID() {
+ return nodeID;
}
public void setClusterProcessStore(ClusterProcessStore store) {
_clusterProcessStore = store;
}
- public void registerClusterProcessStoreMessageListener() {
- clusterMessageTopic.addMessageListener(new ClusterMessageListener());
+ public void registerClusterProcessStoreMessageListener(ProcessStoreClusterListener listener) {
+ clusterDeploymentMessageListener.registerClusterProcessStoreListener(listener);
}
public void registerClusterMemberListener(ClusterMemberListener listener) {
- _listener = listener;
+ clusterMemberShipListener.registerClusterMemberListener(listener);
}
public void shutdown() {
- if(_hazelcastInstance != null) _hazelcastInstance.getLifecycleService().shutdown();
+ if (_hazelcastInstance != null) _hazelcastInstance.shutdown();
}
public ClusterLock<String> getDeploymentLock(){
http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
index 76e7341..aa787e9 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
@@ -21,9 +21,9 @@ package org.apache.ode.clustering.hazelcast;
* Constants used in Hazelcast based clustering implementation
*/
public final class HazelcastConstants {
- public static final String ODE_CLUSTER_DEPLOYMENT_LOCK = "DEPLOYMENT_LOCK";
- public static final String ODE_CLUSTER_PROCESS_INSTANCE_LOCK = "PROCESS_INSTANCE_LOCK ";
- public static final String ODE_CLUSTER_MSG = "CLUSTER_MSG";
+ public static final String ODE_CLUSTER_DEPLOYMENT_LOCK = "ODE_DEPLOYMENT_LOCK";
+ public static final String ODE_CLUSTER_PROCESS_INSTANCE_LOCK = "ODE_PROCESS_INSTANCE_LOCK ";
+ public static final String ODE_CLUSTER_DEPLOYMENT_TOPIC = "ODE_DEPLOYMENT_TOPIC";
private HazelcastConstants() {
}
http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/repositories.rb
----------------------------------------------------------------------
diff --git a/repositories.rb b/repositories.rb
index f6a40fa..e23cde1 100644
--- a/repositories.rb
+++ b/repositories.rb
@@ -15,5 +15,5 @@
repositories.remote << "http://repo1.maven.org/maven2"
repositories.remote << "http://people.apache.org/~vanto/m2/"
-repositories.remote << "https://repository.apache.org/content/groups/snapshots"
+repositories.remote << "http://repository.apache.org/content/groups/snapshots"
repositories.release_to[:url] ||= "sftp://guest@localhost/home/guest"
http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
index df33ae0..1da5571 100644
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
@@ -482,7 +482,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
if(!_isClusterEnabled) enqueueTasksReadnodeIds();
else {
- if (_clusterManager.getIsMaster()) enqueueTasksReadnodeIds();
+ if (_clusterManager.isMaster()) enqueueTasksReadnodeIds();
}
_todo.start();
@@ -725,16 +725,16 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
}
}
- /*public void updateHeartBeat(String nodeId) {
+ public void updateHeartBeat(String nodeId) {
if (nodeId == null)
return;
- if (_nodeId.equals(nodeId))
- return;
+ /*if (_nodeId.equals(nodeId))
+ return;*/
- _lastHeartBeat.put(nodeId, System.currentTimeMillis());
+ //_lastHeartBeat.put(nodeId, System.currentTimeMillis());
_knownNodes.add(nodeId);
- }*/
+ }
boolean doLoadImmediate() {
__log.debug("LOAD IMMEDIATE started");
@@ -815,7 +815,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
final ArrayList<String> activeNodes;
// for cluster mode
- if (_isClusterEnabled && _clusterManager.getIsMaster()) {
+ if (_isClusterEnabled && _clusterManager.isMaster()) {
activeNodes = (ArrayList) _clusterManager.getActiveNodes();
}
//for standalone ODE deployments
@@ -984,7 +984,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
// for cluster mode
- if (_isClusterEnabled && _clusterManager.getIsMaster()) {
+ if (_isClusterEnabled && _clusterManager.isMaster()) {
ArrayList<String> memberList = (ArrayList) _clusterManager.getActiveNodes();
//find stale nodes