You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/11/06 11:51:04 UTC
[09/30] ode git commit: completed version-1 of deploying processes in
the cluster
completed version-1 of deploying processes in the cluster
Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/9cb75820
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/9cb75820
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/9cb75820
Branch: refs/heads/ODE-563
Commit: 9cb75820df9f8a87ebb163e4c1c45d7affd6291f
Parents: 521d640
Author: suba <su...@cse.mrt.ac.lk>
Authored: Wed Jun 17 12:32:40 2015 +0530
Committer: suba <su...@cse.mrt.ac.lk>
Committed: Wed Jun 17 12:32:40 2015 +0530
----------------------------------------------------------------------
.../webapp/WEB-INF/conf/ode-axis2.properties | 8 +++-
.../java/org/apache/ode/axis2/Messages.java | 4 ++
.../java/org/apache/ode/axis2/ODEServer.java | 27 ++++++------
.../ode/axis2/deploy/DeploymentPoller.java | 14 +++----
.../ode/axis2/service/DeploymentWebService.java | 41 +++++++++++++------
.../apache/ode/bpel/clapi/ClusterManager.java | 13 +-----
.../bpel/clapi/ProcessStoreUndeployedEvent.java | 40 ++++++++++++++++++
.../ode/store/ClusterProcessStoreImpl.java | 43 ++++++++++++++++----
.../org/apache/ode/store/ProcessStoreImpl.java | 22 ++++++++++
.../hazelcast/HazelcastClusterImpl.java | 37 +++++++++++------
10 files changed, 183 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
----------------------------------------------------------------------
diff --git a/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties b/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
index 253037c..03ac79c 100644
--- a/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
+++ b/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
@@ -94,4 +94,10 @@ ode-axis2.db.emb.name=derby-jpadb
## Event listeners
#ode-axis2.event.listeners=
-#ode-axis2.event.listeners=org.apache.ode.bpel.common.evt.DebugBpelEventListener
\ No newline at end of file
+#ode-axis2.event.listeners=org.apache.ode.bpel.common.evt.DebugBpelEventListener
+
+## Enable clustering
+#ode-axis2.clustering.enabled=true
+
+## Clustering Implementation class.
+#ode-axis2.clustering.impl.class = org.apache.ode.clustering.hazelcast.HazelcastClusterImpl
http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2/src/main/java/org/apache/ode/axis2/Messages.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/Messages.java b/axis2/src/main/java/org/apache/ode/axis2/Messages.java
index a95c30d..0581c72 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/Messages.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/Messages.java
@@ -58,6 +58,10 @@ public class Messages extends MessageBundle {
return format("Starting ODE ServiceEngine.");
}
+ public String msgOdeClusteringNotInitialized() {
+ return format("Clustering has not been initialized.");
+ }
+
public String msgOdeStarted() {
return format("ODE Service Engine has been started.");
}
http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index f0ad470..51f05dd 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -57,6 +57,7 @@ import org.apache.ode.axis2.deploy.DeploymentPoller;
import org.apache.ode.axis2.service.DeploymentWebService;
import org.apache.ode.axis2.service.ManagementService;
import org.apache.ode.axis2.util.ClusterUrlTransformer;
+import org.apache.ode.bpel.clapi.ClusterManager;
import org.apache.ode.bpel.connector.BpelServerConnector;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
@@ -82,8 +83,6 @@ import org.apache.ode.store.ClusterProcessStoreImpl;
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.fs.TempFileManager;
-import org.apache.ode.bpel.clapi.ClusterManager;
-
/**
* Server class called by our Axis hooks to handle all ODE lifecycle management.
*
@@ -122,6 +121,8 @@ public class ODEServer {
protected Database _db;
+ protected ClusterManager _clusterManager;
+
private DeploymentPoller _poller;
private BpelServerConnector _connector;
@@ -135,10 +136,6 @@ public class ODEServer {
public Runnable txMgrCreatedCallback;
- private ClusterManager _clusterManager;
-
- private String clusteringState = "";
-
private boolean isClusteringEnabled;
public void init(ServletConfig config, ConfigurationContext configContext) throws ServletException {
@@ -193,9 +190,10 @@ public class ODEServer {
txMgrCreatedCallback.run();
}
- clusteringState = _odeConfig.getClusteringState();
- if (isClusteringEnabled()) initClustering();
- else __log.info("Clustering has not been initialized");
+ String clusteringState = _odeConfig.getClusteringState();
+ if (clusteringState != null && isClusteringEnabled(clusteringState)) {
+ initClustering();
+ } else __log.info(__msgs.msgOdeClusteringNotInitialized());
__log.debug("Creating data source.");
initDataSource();
@@ -384,6 +382,11 @@ public class ODEServer {
_txMgr = null;
}
+ if (_clusterManager != null) {
+ __log.debug("shutting down cluster manager.");
+ _clusterManager = null;
+ }
+
if (_connector != null) {
try {
__log.debug("shutdown BpelConnector");
@@ -468,7 +471,7 @@ public class ODEServer {
}
}
- public boolean isClusteringEnabled() {
+ private boolean isClusteringEnabled(String clusteringState) {
boolean state;
if (clusteringState.equals("true")) state = true;
else state = false;
@@ -476,7 +479,7 @@ public class ODEServer {
return state;
}
- public void setClustering (boolean state) {
+ private void setClustering (boolean state) {
isClusteringEnabled = state;
}
@@ -493,7 +496,7 @@ public class ODEServer {
Class<?> clusterImplClass = this.getClass().getClassLoader().loadClass(clusterImplName);
_clusterManager = (ClusterManager) clusterImplClass.newInstance();
} catch (Exception ex) {
- __log.error(ex);
+ __log.error("Error while loading class : " +clusterImplName ,ex);
}
_clusterManager.init(_configRoot);
}
http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
index ccb029b..9964af0 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
@@ -41,6 +41,7 @@ package org.apache.ode.axis2.deploy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.axis2.ODEServer;
+import org.apache.ode.bpel.clapi.ClusterManager;
import org.apache.ode.bpel.engine.cron.CronScheduler;
import org.apache.ode.bpel.engine.cron.SystemSchedulesConfig;
import org.apache.ode.utils.WatchDog;
@@ -54,8 +55,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import org.apache.ode.bpel.clapi.ClusterManager;
-
/**
* Polls a directory for the deployment of a new deployment unit.
*/
@@ -140,10 +139,9 @@ public class DeploymentPoller {
// Checking for new deployment directories
if (isDeploymentFromODEFileSystemAllowed() && files != null) {
for (File file : files) {
- String test = file.getName();
- __log.info("Trying to access the lock for " + test);
- __log.info("Test null key value " +test);
- duLocked = pollerTryLock(test);
+ String duName = file.getName();
+ __log.info("Trying to acquire the lock for " + duName);
+ duLocked = pollerTryLock(duName);
if (duLocked) {
try {
@@ -343,7 +341,9 @@ public class DeploymentPoller {
}
}
- //Implementation of IMap key Lock
+ /**
+ * Use to acquire the lock by poller
+ */
private boolean pollerTryLock(String key) {
if(clusterEnabled) {
return _odeServer.getBpelServer().getContexts().clusterManager.tryLock(key);
http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
index 1951cf5..89c5a63 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
@@ -172,7 +172,7 @@ public class DeploymentWebService {
_poller.hold();
File dest = new File(_deployPath, bundleName + "-" + _store.getCurrentVersion());
- __log.info("Trying to access the lock for " + dest.getName());
+ __log.info("Trying to acquire the lock for deploying: " + dest.getName());
//lock on deployment unit directory name
duLocked = lock(dest.getName());
@@ -218,7 +218,7 @@ public class DeploymentWebService {
}
sendResponse(factory, messageContext, "deployResponse", response);
} finally {
- __log.info("Trying to release the lock for " + dest.getName());
+ __log.info("Trying to release the lock for deploying: " + dest.getName());
unlock(dest.getName());
}
}
@@ -243,20 +243,30 @@ public class DeploymentWebService {
// Put the poller on hold to avoid undesired side effects
_poller.hold();
- Collection<QName> undeployed = _store.undeploy(deploymentDir);
+ __log.info("Trying to acquire the lock for undeploying: " + deploymentDir.getName());
+ duLocked = lock(deploymentDir.getName());
- File deployedMarker = new File(deploymentDir + ".deployed");
- boolean isDeleted = deployedMarker.delete();
+ if (duLocked) {
+ try {
+ Collection<QName> undeployed = _store.undeploy(deploymentDir);
- if (!isDeleted)
- __log.error("Error while deleting file " + deployedMarker.getName());
+ File deployedMarker = new File(deploymentDir + ".deployed");
+ boolean isDeleted = deployedMarker.delete();
- FileUtils.deepDelete(deploymentDir);
+ if (!isDeleted)
+ __log.error("Error while deleting file " + deployedMarker.getName());
- OMElement response = factory.createOMElement("response", null);
- response.setText("" + (undeployed.size() > 0));
- sendResponse(factory, messageContext, "undeployResponse", response);
- _poller.markAsUndeployed(deploymentDir);
+ FileUtils.deepDelete(deploymentDir);
+
+ OMElement response = factory.createOMElement("response", null);
+ response.setText("" + (undeployed.size() > 0));
+ sendResponse(factory, messageContext, "undeployResponse", response);
+ _poller.markAsUndeployed(deploymentDir);
+ } finally {
+ __log.info("Trying to release the lock for undeploying: " + deploymentDir.getName());
+ unlock(deploymentDir.getName());
+ }
+ }
} finally {
_poller.release();
}
@@ -371,7 +381,9 @@ public class DeploymentWebService {
out.close();
}
- //Implementation of IMap key Lock
+ /**
+ * Acquire the lock when deploying using web service
+ */
private boolean lock(String key) {
if(clusterEnabled) {
return _odeServer.getBpelServer().getContexts().clusterManager.lock(key);
@@ -379,6 +391,9 @@ public class DeploymentWebService {
else return true;
}
+ /**
+ * Release the lock after completing deploy process
+ */
private boolean unlock(String key) {
if(clusterEnabled) {
return _odeServer.getBpelServer().getContexts().clusterManager.unlock(key);
http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
index a1fe194..df4342e 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -30,12 +30,7 @@ public interface ClusterManager {
void init(File file);
/**
- * Check whether current node is the leader or not.
- */
- void markAsMaster();
-
- /**
- * Return isMaster
+ * Return whether the local member is Master or not
* @return
*/
boolean getIsMaster();
@@ -72,10 +67,4 @@ public interface ClusterManager {
* @param event
*/
void publishProcessStoreEvent(Object event);
-
- /**
- * Handle event according to received event
- * @param message
- */
- void handleEvent(Object message);
}
http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java
new file mode 100644
index 0000000..347312f
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+import java.io.Serializable;
+
+public class ProcessStoreUndeployedEvent implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public final String deploymentUnit;
+
+ public final String info;
+
+ public ProcessStoreUndeployedEvent(String deploymentUnit) {
+ this.info = "Undeployment Event";
+ this.deploymentUnit = deploymentUnit;
+ }
+
+ @Override
+ public String toString() {
+ return "{ProcessStoreUndeployedEvent#" + deploymentUnit +"}";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
index 6f35110..551fd72 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.clapi.ClusterManager;
import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent;
+import org.apache.ode.bpel.clapi.ProcessStoreUndeployedEvent;
import org.apache.ode.bpel.iapi.ProcessState;
import org.apache.ode.bpel.iapi.EndpointReferenceContext;
import org.apache.ode.il.config.OdeConfigProperties;
@@ -36,9 +37,10 @@ import java.util.regex.Pattern;
public class ClusterProcessStoreImpl extends ProcessStoreImpl{
private static final Log __log = LogFactory.getLog(ClusterProcessStoreImpl.class);
- private final ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>();
+ private final Map<QName, ProcessConfImpl> loaded = new HashMap<QName, ProcessConfImpl>();
private ClusterManager _clusterManager;
private ProcessStoreDeployedEvent deployedEvent;
+ private ProcessStoreUndeployedEvent undeployedEvent;
public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, ClusterManager clusterManager) {
super(eprContext,ds,persistenceType,props,createDatamodel);
@@ -49,8 +51,8 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
public Collection<QName> deploy(final File deploymentUnitDirectory) {
Collection<QName> deployed = super.deploy(deploymentUnitDirectory);
Map<QName, ProcessConfImpl> _processes = getProcessesMap();
- for (QName key :_processes.keySet()) {
- if(!loaded.contains(_processes.get(key))) loaded.add(_processes.get(key));
+ for (QName key : deployed) {
+ loaded.put(key,_processes.get(key));
}
publishProcessStoreDeployedEvent(deploymentUnitDirectory.getName());
return deployed;
@@ -67,8 +69,8 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
Pattern duNamePattern = getPreviousPackageVersionPattern(duName);
- for (Iterator<ProcessConfImpl> iterator = loaded.iterator(); iterator.hasNext();) {
- ProcessConfImpl pconf = iterator.next();
+ for (QName key : loaded.keySet()) {
+ ProcessConfImpl pconf = loaded.get(key);
Matcher matcher = duNamePattern.matcher(pconf.getPackage());
if (matcher.matches() && pconf.getState().equals(state)) {
pconf.setState(ProcessState.RETIRED);
@@ -82,9 +84,10 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName);
if (dudao != null) {
List<ProcessConfImpl> load = load(dudao);
- loaded.addAll(load);
+ for(ProcessConfImpl p : load) {
+ loaded.put(p.getProcessId(),p);
+ }
confs.addAll(load);
-
}
return null;
}
@@ -97,10 +100,9 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
try {
fireStateChange(p.getProcessId(), p.getState(), p.getDeploymentUnit().getName());
} catch (Exception except) {
- __log.error("Error while activating process: pid=" + p.getProcessId() + " package="+p.getDeploymentUnit().getName(), except);
+ __log.error("Error with process retiring or activating : pid=" + p.getProcessId() + " package="+p.getDeploymentUnit().getName(), except);
}
}
- //loadAll();
}
private Pattern getPreviousPackageVersionPattern(String duName) {
@@ -116,4 +118,27 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
Pattern duNamePattern = Pattern.compile(duNameRegExp.toString());
return duNamePattern;
}
+
+ public Collection<QName> undeploy(final File dir) {
+ Collection<QName> undeployed = super.undeploy(dir);
+ loaded.keySet().removeAll(undeployed);
+ publishProcessStoreUndeployedEvent(dir.getName());
+ return undeployed;
+ }
+
+ private void publishProcessStoreUndeployedEvent(String duName){
+ undeployedEvent = new ProcessStoreUndeployedEvent(duName);
+ _clusterManager.publishProcessStoreEvent(undeployedEvent);
+ }
+
+ /**
+ * Use to unregister processes when deployment unit is undeployed
+ * @param duName
+ * @return
+ */
+ public Collection<QName> undeployProcesses(final String duName) {
+ Collection<QName> undeployed = super.undeployProcesses(duName);
+ loaded.keySet().removeAll(undeployed);
+ return undeployed;
+ }
}
http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
index d6f76f3..77afe5a 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
@@ -903,4 +903,26 @@ public class ProcessStoreImpl implements ProcessStore {
protected Map<QName, ProcessConfImpl> getProcessesMap() {
return _processes;
}
+
+ protected Collection<QName> undeployProcesses(final String duName) {
+ Collection<QName> undeployed = Collections.emptyList();
+ DeploymentUnitDir du;
+ _rw.writeLock().lock();
+ try {
+ du = _deploymentUnits.remove(duName);
+ if (du != null) {
+ undeployed = toPids(du.getProcessNames(), du.getVersion());
+ }
+
+ for (QName pn : undeployed) {
+ fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.UNDEPLOYED, pn, du.getName()));
+ __log.info(__msgs.msgProcessUndeployed(pn));
+ }
+
+ _processes.keySet().removeAll(undeployed);
+ } finally {
+ _rw.writeLock().unlock();
+ }
+ return undeployed;
+ }
}
http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index 2e6868f..beba779 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -29,9 +29,10 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.store.ClusterProcessStoreImpl;
import org.apache.ode.bpel.clapi.ClusterManager;
import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent;
+import org.apache.ode.bpel.clapi.ProcessStoreUndeployedEvent;
+import org.apache.ode.store.ClusterProcessStoreImpl;
/**
* This class implements necessary methods to build the cluster using hazelcast
@@ -42,14 +43,16 @@ public class HazelcastClusterImpl implements ClusterManager {
private HazelcastInstance _hazelcastInstance;
private boolean isMaster = false;
private Member leader;
- private Member deployInitiator;
+ private Member eventInitiator;
private IMap<String, String> lock_map;
private ITopic<Object> clusterMessageTopic;
private ClusterProcessStoreImpl _clusterProcessStore;
public void init(File configRoot) {
+
/*First,looks for the hazelcast.config system property. If it is set, its value is used as the path.
Else it will load the hazelcast.xml file using FileSystemXmlConfig()*/
+
String hzConfig = System.getProperty("hazelcast.config");
if (hzConfig != null) _hazelcastInstance = Hazelcast.newHazelcastInstance();
else {
@@ -79,7 +82,7 @@ public class HazelcastClusterImpl implements ClusterManager {
}
public boolean lock(String key) {
- lock_map.putIfAbsent(key,key);
+ lock_map.putIfAbsent(key, key);
lock_map.lock(key);
boolean state = lock_map.isLocked(key);
__log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state);
@@ -98,9 +101,9 @@ public class HazelcastClusterImpl implements ClusterManager {
}
public boolean tryLock(String key) {
- lock_map.putIfAbsent(key,key);
+ lock_map.putIfAbsent(key, key);
boolean state = lock_map.tryLock(key);
- __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state );
+ __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state);
return state;
}
@@ -121,9 +124,9 @@ public class HazelcastClusterImpl implements ClusterManager {
}
}
- public void publishProcessStoreEvent(Object deployedEvent) {
- deployInitiator = _hazelcastInstance.getCluster().getLocalMember();
- clusterMessageTopic.publish(deployedEvent);
+ public void publishProcessStoreEvent(Object event) {
+ eventInitiator = _hazelcastInstance.getCluster().getLocalMember();
+ clusterMessageTopic.publish(event);
}
@@ -134,20 +137,30 @@ public class HazelcastClusterImpl implements ClusterManager {
}
}
- public void handleEvent(Object message) {
+ private void handleEvent(Object message) {
if (message instanceof ProcessStoreDeployedEvent) {
ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message;
- if (_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) {
+ if (_hazelcastInstance.getCluster().getLocalMember() != eventInitiator) {
String duName = event.deploymentUnit;
__log.info("Receive deployment msg to " + _hazelcastInstance.getCluster().getLocalMember() + " for " + duName);
_clusterProcessStore.publishService(duName);
- } else deployInitiator = null;
+ } else eventInitiator = null;
+ }
+
+ else if (message instanceof ProcessStoreUndeployedEvent) {
+ ProcessStoreUndeployedEvent event = (ProcessStoreUndeployedEvent) message;
+
+ if (_hazelcastInstance.getCluster().getLocalMember() != eventInitiator) {
+ String duName = event.deploymentUnit;
+ __log.info("Receive undeployment msg to " + _hazelcastInstance.getCluster().getLocalMember() + " for " + duName);
+ _clusterProcessStore.undeployProcesses(duName);
+ } else eventInitiator = null;
}
}
- public void markAsMaster() {
+ private void markAsMaster() {
leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
if (leader.localMember()) {
isMaster = true;