You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/11/06 11:51:04 UTC

[09/30] ode git commit: completed version-1 of deploying processes in the cluster

completed version-1 of deploying processes in the cluster


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/9cb75820
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/9cb75820
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/9cb75820

Branch: refs/heads/ODE-563
Commit: 9cb75820df9f8a87ebb163e4c1c45d7affd6291f
Parents: 521d640
Author: suba <su...@cse.mrt.ac.lk>
Authored: Wed Jun 17 12:32:40 2015 +0530
Committer: suba <su...@cse.mrt.ac.lk>
Committed: Wed Jun 17 12:32:40 2015 +0530

----------------------------------------------------------------------
 .../webapp/WEB-INF/conf/ode-axis2.properties    |  8 +++-
 .../java/org/apache/ode/axis2/Messages.java     |  4 ++
 .../java/org/apache/ode/axis2/ODEServer.java    | 27 ++++++------
 .../ode/axis2/deploy/DeploymentPoller.java      | 14 +++----
 .../ode/axis2/service/DeploymentWebService.java | 41 +++++++++++++------
 .../apache/ode/bpel/clapi/ClusterManager.java   | 13 +-----
 .../bpel/clapi/ProcessStoreUndeployedEvent.java | 40 ++++++++++++++++++
 .../ode/store/ClusterProcessStoreImpl.java      | 43 ++++++++++++++++----
 .../org/apache/ode/store/ProcessStoreImpl.java  | 22 ++++++++++
 .../hazelcast/HazelcastClusterImpl.java         | 37 +++++++++++------
 10 files changed, 183 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
----------------------------------------------------------------------
diff --git a/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties b/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
index 253037c..03ac79c 100644
--- a/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
+++ b/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
@@ -94,4 +94,10 @@ ode-axis2.db.emb.name=derby-jpadb
 
 ## Event listeners
 #ode-axis2.event.listeners=
-#ode-axis2.event.listeners=org.apache.ode.bpel.common.evt.DebugBpelEventListener
\ No newline at end of file
+#ode-axis2.event.listeners=org.apache.ode.bpel.common.evt.DebugBpelEventListener
+
+## Enable clustering
+#ode-axis2.clustering.enabled=true
+
+## Clustering Implementation class.
+#ode-axis2.clustering.impl.class = org.apache.ode.clustering.hazelcast.HazelcastClusterImpl

http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2/src/main/java/org/apache/ode/axis2/Messages.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/Messages.java b/axis2/src/main/java/org/apache/ode/axis2/Messages.java
index a95c30d..0581c72 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/Messages.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/Messages.java
@@ -58,6 +58,10 @@ public class Messages extends MessageBundle {
         return format("Starting ODE ServiceEngine.");
     }
 
+    public String msgOdeClusteringNotInitialized() {
+        return format("Clustering has not been initialized.");
+    }
+
     public String msgOdeStarted() {
         return format("ODE Service Engine has been started.");
     }

http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index f0ad470..51f05dd 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -57,6 +57,7 @@ import org.apache.ode.axis2.deploy.DeploymentPoller;
 import org.apache.ode.axis2.service.DeploymentWebService;
 import org.apache.ode.axis2.service.ManagementService;
 import org.apache.ode.axis2.util.ClusterUrlTransformer;
+import org.apache.ode.bpel.clapi.ClusterManager;
 import org.apache.ode.bpel.connector.BpelServerConnector;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.engine.BpelServerImpl;
@@ -82,8 +83,6 @@ import org.apache.ode.store.ClusterProcessStoreImpl;
 import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.fs.TempFileManager;
 
-import org.apache.ode.bpel.clapi.ClusterManager;
-
 /**
  * Server class called by our Axis hooks to handle all ODE lifecycle management.
  *
@@ -122,6 +121,8 @@ public class ODEServer {
 
     protected Database _db;
 
+    protected ClusterManager _clusterManager;
+
     private DeploymentPoller _poller;
 
     private BpelServerConnector _connector;
@@ -135,10 +136,6 @@ public class ODEServer {
     
     public Runnable txMgrCreatedCallback;
 
-    private ClusterManager _clusterManager;
-
-    private String clusteringState = "";
-
     private boolean isClusteringEnabled;
 
     public void init(ServletConfig config, ConfigurationContext configContext) throws ServletException {
@@ -193,9 +190,10 @@ public class ODEServer {
             txMgrCreatedCallback.run();
         }
 
-        clusteringState = _odeConfig.getClusteringState();
-        if (isClusteringEnabled()) initClustering();
-        else __log.info("Clustering has not been initialized");
+        String clusteringState = _odeConfig.getClusteringState();
+        if (clusteringState != null && isClusteringEnabled(clusteringState)) {
+            initClustering();
+        } else __log.info(__msgs.msgOdeClusteringNotInitialized());
 
         __log.debug("Creating data source.");
         initDataSource();
@@ -384,6 +382,11 @@ public class ODEServer {
                 _txMgr = null;
             }
 
+            if (_clusterManager != null) {
+                __log.debug("shutting down cluster manager.");
+                _clusterManager = null;
+            }
+
             if (_connector != null) {
                 try {
                     __log.debug("shutdown BpelConnector");
@@ -468,7 +471,7 @@ public class ODEServer {
         }
     }
 
-    public boolean isClusteringEnabled() {
+    private boolean isClusteringEnabled(String clusteringState) {
         boolean state;
         if (clusteringState.equals("true")) state = true;
         else state = false;
@@ -476,7 +479,7 @@ public class ODEServer {
         return state;
     }
 
-    public void  setClustering (boolean state) {
+    private void  setClustering (boolean state) {
         isClusteringEnabled = state;
     }
 
@@ -493,7 +496,7 @@ public class ODEServer {
             Class<?> clusterImplClass = this.getClass().getClassLoader().loadClass(clusterImplName);
             _clusterManager = (ClusterManager) clusterImplClass.newInstance();
         } catch (Exception ex) {
-            __log.error(ex);
+            __log.error("Error while loading class : " +clusterImplName ,ex);
         }
         _clusterManager.init(_configRoot);
     }

http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
index ccb029b..9964af0 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
@@ -41,6 +41,7 @@ package org.apache.ode.axis2.deploy;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.axis2.ODEServer;
+import org.apache.ode.bpel.clapi.ClusterManager;
 import org.apache.ode.bpel.engine.cron.CronScheduler;
 import org.apache.ode.bpel.engine.cron.SystemSchedulesConfig;
 import org.apache.ode.utils.WatchDog;
@@ -54,8 +55,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.ode.bpel.clapi.ClusterManager;
-
 /**
  * Polls a directory for the deployment of a new deployment unit.
  */
@@ -140,10 +139,9 @@ public class DeploymentPoller {
         // Checking for new deployment directories
         if (isDeploymentFromODEFileSystemAllowed() && files != null) {
             for (File file : files) {
-                String test = file.getName();
-                __log.info("Trying to access the lock for " + test);
-                __log.info("Test null key value " +test);
-                duLocked = pollerTryLock(test);
+                String duName = file.getName();
+                __log.info("Trying to acquire the lock for " + duName);
+                duLocked = pollerTryLock(duName);
 
                 if (duLocked) {
                     try {
@@ -343,7 +341,9 @@ public class DeploymentPoller {
         }
     }
 
-    //Implementation of IMap key Lock
+    /**
+     * Use to acquire the lock by poller
+     */
     private boolean pollerTryLock(String key) {
         if(clusterEnabled) {
             return _odeServer.getBpelServer().getContexts().clusterManager.tryLock(key);

http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
index 1951cf5..89c5a63 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
@@ -172,7 +172,7 @@ public class DeploymentWebService {
                         _poller.hold();
 
                         File dest = new File(_deployPath, bundleName + "-" + _store.getCurrentVersion());
-                        __log.info("Trying to access the lock for " + dest.getName());
+                        __log.info("Trying to acquire the lock for deploying: " + dest.getName());
 
                         //lock on deployment unit directory name
                         duLocked = lock(dest.getName());
@@ -218,7 +218,7 @@ public class DeploymentWebService {
                                 }
                                 sendResponse(factory, messageContext, "deployResponse", response);
                             } finally {
-                                __log.info("Trying to release the lock for " + dest.getName());
+                                __log.info("Trying to release the lock for deploying: " + dest.getName());
                                 unlock(dest.getName());
                             }
                         }
@@ -243,20 +243,30 @@ public class DeploymentWebService {
                         // Put the poller on hold to avoid undesired side effects
                         _poller.hold();
 
-                        Collection<QName> undeployed = _store.undeploy(deploymentDir);
+                        __log.info("Trying to acquire the lock for undeploying: " + deploymentDir.getName());
+                        duLocked = lock(deploymentDir.getName());
 
-                        File deployedMarker = new File(deploymentDir + ".deployed");
-                        boolean isDeleted = deployedMarker.delete();
+                        if (duLocked) {
+                            try {
+                                Collection<QName> undeployed = _store.undeploy(deploymentDir);
 
-                        if (!isDeleted)
-                            __log.error("Error while deleting file " + deployedMarker.getName());
+                                File deployedMarker = new File(deploymentDir + ".deployed");
+                                boolean isDeleted = deployedMarker.delete();
 
-                        FileUtils.deepDelete(deploymentDir);
+                                if (!isDeleted)
+                                    __log.error("Error while deleting file " + deployedMarker.getName());
 
-                        OMElement response = factory.createOMElement("response", null);
-                        response.setText("" + (undeployed.size() > 0));
-                        sendResponse(factory, messageContext, "undeployResponse", response);
-                        _poller.markAsUndeployed(deploymentDir);
+                                FileUtils.deepDelete(deploymentDir);
+
+                                OMElement response = factory.createOMElement("response", null);
+                                response.setText("" + (undeployed.size() > 0));
+                                sendResponse(factory, messageContext, "undeployResponse", response);
+                                _poller.markAsUndeployed(deploymentDir);
+                            } finally {
+                                __log.info("Trying to release the lock for undeploying: " + deploymentDir.getName());
+                                unlock(deploymentDir.getName());
+                            }
+                        }
                     } finally {
                         _poller.release();
                     }
@@ -371,7 +381,9 @@ public class DeploymentWebService {
         out.close();
     }
 
-    //Implementation of IMap key Lock
+    /**
+     * Acquire the lock when deploying using web service
+     */
     private boolean lock(String key) {
         if(clusterEnabled) {
             return _odeServer.getBpelServer().getContexts().clusterManager.lock(key);
@@ -379,6 +391,9 @@ public class DeploymentWebService {
         else return true;
     }
 
+    /**
+     * Release the lock after completing deploy process
+     */
     private boolean unlock(String key) {
         if(clusterEnabled) {
             return _odeServer.getBpelServer().getContexts().clusterManager.unlock(key);

http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
index a1fe194..df4342e 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -30,12 +30,7 @@ public interface ClusterManager {
     void init(File file);
 
     /**
-     * Check whether current node is the leader or not.
-     */
-     void markAsMaster();
-
-    /**
-     * Return isMaster
+     * Return whether the local member is Master or not
      * @return
      */
     boolean getIsMaster();
@@ -72,10 +67,4 @@ public interface ClusterManager {
      * @param event
      */
     void publishProcessStoreEvent(Object event);
-
-    /**
-     * Handle event according to received event
-     * @param message
-     */
-    void handleEvent(Object message);
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java
new file mode 100644
index 0000000..347312f
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+import java.io.Serializable;
+
+public class ProcessStoreUndeployedEvent implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public final String deploymentUnit;
+
+    public final String info;
+
+    public ProcessStoreUndeployedEvent(String deploymentUnit) {
+        this.info = "Undeployment Event";
+        this.deploymentUnit = deploymentUnit;
+    }
+
+    @Override
+    public String toString() {
+        return "{ProcessStoreUndeployedEvent#" + deploymentUnit +"}";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
index 6f35110..551fd72 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.clapi.ClusterManager;
 import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent;
+import org.apache.ode.bpel.clapi.ProcessStoreUndeployedEvent;
 import org.apache.ode.bpel.iapi.ProcessState;
 import org.apache.ode.bpel.iapi.EndpointReferenceContext;
 import org.apache.ode.il.config.OdeConfigProperties;
@@ -36,9 +37,10 @@ import java.util.regex.Pattern;
 public class ClusterProcessStoreImpl extends ProcessStoreImpl{
     private static final Log __log = LogFactory.getLog(ClusterProcessStoreImpl.class);
 
-    private final ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>();
+    private final Map<QName, ProcessConfImpl> loaded = new HashMap<QName, ProcessConfImpl>();
     private ClusterManager _clusterManager;
     private  ProcessStoreDeployedEvent deployedEvent;
+    private  ProcessStoreUndeployedEvent undeployedEvent;
 
     public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, ClusterManager clusterManager) {
         super(eprContext,ds,persistenceType,props,createDatamodel);
@@ -49,8 +51,8 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
     public Collection<QName> deploy(final File deploymentUnitDirectory) {
         Collection<QName> deployed = super.deploy(deploymentUnitDirectory);
         Map<QName, ProcessConfImpl> _processes = getProcessesMap();
-        for (QName key :_processes.keySet()) {
-            if(!loaded.contains(_processes.get(key))) loaded.add(_processes.get(key));
+        for (QName key : deployed) {
+            loaded.put(key,_processes.get(key));
         }
         publishProcessStoreDeployedEvent(deploymentUnitDirectory.getName());
         return deployed;
@@ -67,8 +69,8 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
 
         Pattern duNamePattern = getPreviousPackageVersionPattern(duName);
 
-        for (Iterator<ProcessConfImpl> iterator = loaded.iterator(); iterator.hasNext();) {
-            ProcessConfImpl pconf = iterator.next();
+        for (QName key : loaded.keySet()) {
+            ProcessConfImpl pconf = loaded.get(key);
             Matcher matcher = duNamePattern.matcher(pconf.getPackage());
             if (matcher.matches() && pconf.getState().equals(state)) {
                   pconf.setState(ProcessState.RETIRED);
@@ -82,9 +84,10 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
                     DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName);
                     if (dudao != null) {
                         List<ProcessConfImpl> load = load(dudao);
-                        loaded.addAll(load);
+                        for(ProcessConfImpl p : load) {
+                        loaded.put(p.getProcessId(),p);
+                        }
                         confs.addAll(load);
-
                     }
                     return null;
                 }
@@ -97,10 +100,9 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
             try {
                 fireStateChange(p.getProcessId(), p.getState(), p.getDeploymentUnit().getName());
             } catch (Exception except) {
-                __log.error("Error while activating process: pid=" + p.getProcessId() + " package="+p.getDeploymentUnit().getName(), except);
+                __log.error("Error with process retiring or activating : pid=" + p.getProcessId() + " package="+p.getDeploymentUnit().getName(), except);
             }
         }
-        //loadAll();
     }
 
     private Pattern getPreviousPackageVersionPattern(String duName) {
@@ -116,4 +118,27 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
         Pattern duNamePattern = Pattern.compile(duNameRegExp.toString());
         return duNamePattern;
     }
+
+    public Collection<QName> undeploy(final File dir) {
+        Collection<QName> undeployed = super.undeploy(dir);
+        loaded.keySet().removeAll(undeployed);
+        publishProcessStoreUndeployedEvent(dir.getName());
+        return undeployed;
+    }
+
+    private void publishProcessStoreUndeployedEvent(String duName){
+        undeployedEvent = new ProcessStoreUndeployedEvent(duName);
+        _clusterManager.publishProcessStoreEvent(undeployedEvent);
+    }
+
+    /**
+     * Use to unregister processes when deployment unit is undeployed
+     * @param duName
+     * @return
+     */
+    public Collection<QName> undeployProcesses(final String duName) {
+        Collection<QName> undeployed = super.undeployProcesses(duName);
+        loaded.keySet().removeAll(undeployed);
+        return undeployed;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
index d6f76f3..77afe5a 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
@@ -903,4 +903,26 @@ public class ProcessStoreImpl implements ProcessStore {
     protected Map<QName, ProcessConfImpl> getProcessesMap() {
         return _processes;
     }
+
+    protected  Collection<QName> undeployProcesses(final String duName) {
+        Collection<QName> undeployed = Collections.emptyList();
+        DeploymentUnitDir du;
+        _rw.writeLock().lock();
+        try {
+            du = _deploymentUnits.remove(duName);
+            if (du != null) {
+                undeployed = toPids(du.getProcessNames(), du.getVersion());
+            }
+
+            for (QName pn : undeployed) {
+                fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.UNDEPLOYED, pn, du.getName()));
+                __log.info(__msgs.msgProcessUndeployed(pn));
+            }
+
+            _processes.keySet().removeAll(undeployed);
+        } finally {
+            _rw.writeLock().unlock();
+        }
+        return undeployed;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index 2e6868f..beba779 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -29,9 +29,10 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.ode.store.ClusterProcessStoreImpl;
 import org.apache.ode.bpel.clapi.ClusterManager;
 import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent;
+import org.apache.ode.bpel.clapi.ProcessStoreUndeployedEvent;
+import org.apache.ode.store.ClusterProcessStoreImpl;
 
 /**
  * This class implements necessary methods to build the cluster using hazelcast
@@ -42,14 +43,16 @@ public class HazelcastClusterImpl implements ClusterManager {
     private HazelcastInstance _hazelcastInstance;
     private boolean isMaster = false;
     private Member leader;
-    private Member deployInitiator;
+    private Member eventInitiator;
     private IMap<String, String> lock_map;
     private ITopic<Object> clusterMessageTopic;
     private ClusterProcessStoreImpl _clusterProcessStore;
 
     public void init(File configRoot) {
+
         /*First,looks for the hazelcast.config system property. If it is set, its value is used as the path.
         Else it will load the hazelcast.xml file using FileSystemXmlConfig()*/
+
         String hzConfig = System.getProperty("hazelcast.config");
         if (hzConfig != null) _hazelcastInstance = Hazelcast.newHazelcastInstance();
         else {
@@ -79,7 +82,7 @@ public class HazelcastClusterImpl implements ClusterManager {
     }
 
     public boolean lock(String key) {
-        lock_map.putIfAbsent(key,key);
+        lock_map.putIfAbsent(key, key);
         lock_map.lock(key);
         boolean state = lock_map.isLocked(key);
         __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state);
@@ -98,9 +101,9 @@ public class HazelcastClusterImpl implements ClusterManager {
     }
 
     public boolean tryLock(String key) {
-        lock_map.putIfAbsent(key,key);
+        lock_map.putIfAbsent(key, key);
         boolean state = lock_map.tryLock(key);
-        __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state );
+        __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state);
         return state;
     }
 
@@ -121,9 +124,9 @@ public class HazelcastClusterImpl implements ClusterManager {
         }
     }
 
-    public void publishProcessStoreEvent(Object deployedEvent) {
-        deployInitiator = _hazelcastInstance.getCluster().getLocalMember();
-        clusterMessageTopic.publish(deployedEvent);
+    public void publishProcessStoreEvent(Object event) {
+        eventInitiator = _hazelcastInstance.getCluster().getLocalMember();
+        clusterMessageTopic.publish(event);
     }
 
 
@@ -134,20 +137,30 @@ public class HazelcastClusterImpl implements ClusterManager {
         }
     }
 
-    public void handleEvent(Object message) {
+    private void handleEvent(Object message) {
         if (message instanceof ProcessStoreDeployedEvent) {
             ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message;
 
-            if (_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) {
+            if (_hazelcastInstance.getCluster().getLocalMember() != eventInitiator) {
                 String duName = event.deploymentUnit;
                 __log.info("Receive deployment msg to " + _hazelcastInstance.getCluster().getLocalMember() + " for " + duName);
                 _clusterProcessStore.publishService(duName);
-            } else deployInitiator = null;
+            } else eventInitiator = null;
+        }
+
+        else if (message instanceof ProcessStoreUndeployedEvent) {
+            ProcessStoreUndeployedEvent event = (ProcessStoreUndeployedEvent) message;
+
+            if (_hazelcastInstance.getCluster().getLocalMember() != eventInitiator) {
+                String duName = event.deploymentUnit;
+                __log.info("Receive undeployment msg to " + _hazelcastInstance.getCluster().getLocalMember() + " for " + duName);
+                _clusterProcessStore.undeployProcesses(duName);
+            } else eventInitiator = null;
         }
 
     }
 
-    public void markAsMaster() {
+    private void markAsMaster() {
         leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
         if (leader.localMember()) {
             isMaster = true;