You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/11/06 11:51:03 UTC

[08/30] ode git commit: redesigning phase 2

redesigning phase 2


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/521d640d
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/521d640d
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/521d640d

Branch: refs/heads/ODE-563
Commit: 521d640d672133d3864a61c54ba2b59e70e35a4b
Parents: afa36ee
Author: suba <su...@cse.mrt.ac.lk>
Authored: Tue Jun 16 17:20:04 2015 +0530
Committer: suba <su...@cse.mrt.ac.lk>
Committed: Tue Jun 16 17:20:04 2015 +0530

----------------------------------------------------------------------
 Rakefile                                        | 10 +--
 .../java/org/apache/ode/axis2/ODEServer.java    |  6 +-
 .../ode/axis2/deploy/DeploymentPoller.java      | 29 ++++----
 .../ode/axis2/service/DeploymentWebService.java | 25 +++++--
 .../apache/ode/bpel/clapi/ClusterManager.java   | 22 +++++++
 .../bpel/clapi/ProcessStoreDeployedEvent.java   | 40 ++++++++++++
 .../ode/store/ClusterProcessStoreImpl.java      | 43 +++---------
 .../hazelcast/HazelcastClusterImpl.java         | 69 ++++++++++++++++----
 .../hazelcast/HazelcastConstants.java           |  1 -
 .../hazelcast/HazelcastInstanceConfig.java      | 56 ----------------
 10 files changed, 175 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/Rakefile
----------------------------------------------------------------------
diff --git a/Rakefile b/Rakefile
index 06dcbd5..7c0fa67 100644
--- a/Rakefile
+++ b/Rakefile
@@ -86,7 +86,7 @@ define "ode" do
       "scheduler-simple", "bpel-schemas", "bpel-store", "utils", "agents", "clustering"),
       AXIOM, AXIS2_ALL, COMMONS.lang, COMMONS.collections, COMMONS.httpclient, COMMONS.lang,
       DERBY, GERONIMO.kernel, GERONIMO.transaction, JAVAX.activation, JAVAX.servlet, JAVAX.stream,
-      JAVAX.transaction, JENCKS, WSDL4J, WS_COMMONS, XMLBEANS, AXIS2_MODULES.libs, SLF4J, LOG4J, HAZELCAST
+      JAVAX.transaction, JENCKS, WSDL4J, WS_COMMONS, XMLBEANS, AXIS2_MODULES.libs, SLF4J, LOG4J
 
     test.exclude 'org.apache.ode.axis2.management.*'
     test.with project("tools"), AXIOM, JAVAX.javamail, COMMONS.codec, COMMONS.httpclient, XERCES, WOODSTOX
@@ -166,7 +166,7 @@ define "ode" do
 
   desc "ODE APIs"
   define "bpel-api" do
-    compile.with projects("utils", "bpel-obj", "bpel-schemas"), WSDL4J, XERCES, SLF4J, LOG4J, HAZELCAST
+    compile.with projects("utils", "bpel-obj", "bpel-schemas"), WSDL4J, XERCES, SLF4J, LOG4J
     package :jar
   end
 
@@ -208,7 +208,7 @@ define "ode" do
 
   desc "ODE Clustering"
    define "clustering" do
-     compile.with projects("bpel-api"),HAZELCAST, COMMONS.logging
+     compile.with projects("bpel-api","bpel-store"),HAZELCAST, COMMONS.logging
      package :jar
    end
 
@@ -270,8 +270,8 @@ define "ode" do
   desc "ODE Process Store"
   define "bpel-store" do
     compile.with projects("bpel-api", "bpel-compiler", "bpel-dao", "bpel-obj", "bpel-schemas", "bpel-epr",
-      "dao-hibernate", "dao-jpa", "clustering", "utils"),
-      JAVAX.persistence, JAVAX.stream, JAVAX.transaction, HIBERNATE, HSQLDB, XMLBEANS, XERCES, WSDL4J, OPENJPA, SPRING, SLF4J, LOG4J,HAZELCAST
+      "dao-hibernate", "dao-jpa", "utils"),
+      JAVAX.persistence, JAVAX.stream, JAVAX.transaction, HIBERNATE, HSQLDB, XMLBEANS, XERCES, WSDL4J, OPENJPA, SPRING, SLF4J, LOG4J
     compile { open_jpa_enhance }
     resources hibernate_doclet(:package=>"org.apache.ode.store.hib", :excludedtags=>"@version,@author,@todo")
 

http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index 26489d2..f0ad470 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -234,7 +234,7 @@ public class ODEServer {
 
         try {
             __log.debug("Initializing Deployment Web Service");
-            new DeploymentWebService().enableService(_configContext.getAxisConfiguration(), _store, _poller, _appRoot.getAbsolutePath(), _workRoot.getAbsolutePath());
+            new DeploymentWebService().enableService(_configContext.getAxisConfiguration(), _store, _poller, _appRoot.getAbsolutePath(), _workRoot.getAbsolutePath(),this);
         } catch (Exception e) {
             throw new ServletException(e);
         }
@@ -490,8 +490,8 @@ public class ODEServer {
     private void initClustering() {
         String clusterImplName = _odeConfig.getClusteringImplClass();
         try {
-            Class<?> clustering_class = this.getClass().getClassLoader().loadClass(clusterImplName);
-            _clusterManager = (ClusterManager) clustering_class.newInstance();
+            Class<?> clusterImplClass = this.getClass().getClassLoader().loadClass(clusterImplName);
+            _clusterManager = (ClusterManager) clusterImplClass.newInstance();
         } catch (Exception ex) {
             __log.error(ex);
         }

http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
index 66890ba..ccb029b 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
@@ -54,7 +54,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
-import  org.apache.ode.clustering.hazelcast.HazelcastClusterImpl;
+import org.apache.ode.bpel.clapi.ClusterManager;
 
 /**
  * Polls a directory for the deployment of a new deployment unit.
@@ -100,7 +100,7 @@ public class DeploymentPoller {
     public DeploymentPoller(File deployDir, final ODEServer odeServer) {
         _odeServer = odeServer;
         _deployDir = deployDir;
-        clusterEnabled = _odeServer.getClusteringState();
+        clusterEnabled = _odeServer.getIsCluteringEnabled();
         if (!_deployDir.exists()) {
             boolean isDeployDirCreated = _deployDir.mkdir();
             if (!isDeployDirCreated) {
@@ -140,10 +140,13 @@ public class DeploymentPoller {
         // Checking for new deployment directories
         if (isDeploymentFromODEFileSystemAllowed() && files != null) {
             for (File file : files) {
-                __log.info("Trying to access the lock for " +file.getName());
-                duLocked = lock(file.getName());
-                try {
-                    if (duLocked) {
+                String test = file.getName();
+                __log.info("Trying to access the lock for " + test);
+                __log.info("Test null key value " +test);
+                duLocked = pollerTryLock(test);
+
+                if (duLocked) {
+                    try {
                         File deployXml = new File(file, "deploy.xml");
                         File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
 
@@ -185,10 +188,10 @@ public class DeploymentPoller {
                         } catch (Exception e) {
                             __log.error("Deployment of " + file.getName() + " failed, aborting for now.", e);
                         }
+                    } finally {
+                        __log.info("Trying to release the lock for " + file.getName());
+                        unlock(file.getName());
                     }
-                } finally {
-                    __log.info("Trying to release the lock for " + file.getName());
-                    unlock(file.getName());
                 }
             }
         }
@@ -341,16 +344,16 @@ public class DeploymentPoller {
     }
 
     //Implementation of IMap key Lock
-    public boolean lock(String key) {
+    private boolean pollerTryLock(String key) {
         if(clusterEnabled) {
-            return _odeServer.getBpelServer().getContexts().hazelcastClusterImpl.lock(key);
+            return _odeServer.getBpelServer().getContexts().clusterManager.tryLock(key);
         }
         else return true;
     }
 
-    public boolean unlock(String key) {
+    private boolean unlock(String key) {
         if(clusterEnabled) {
-            return _odeServer.getBpelServer().getContexts().hazelcastClusterImpl.unlock(key);
+            return _odeServer.getBpelServer().getContexts().clusterManager.unlock(key);
         }
         else return true;
     }

http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
index bd35167..1951cf5 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
@@ -55,6 +55,7 @@ import org.apache.axis2.util.Utils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.lang.StringUtils;
+import org.apache.ode.axis2.ODEServer;
 import org.apache.ode.axis2.OdeFault;
 import org.apache.ode.axis2.deploy.DeploymentPoller;
 import org.apache.ode.axis2.hooks.ODEAxisService;
@@ -76,9 +77,11 @@ public class DeploymentWebService {
     private final OMNamespace _deployapi;
 
     private File _deployPath;
+    private  ODEServer _odeServer;
     private DeploymentPoller _poller;
     private ProcessStore _store;
 
+    private boolean clusterEnabled;
 
     public DeploymentWebService() {
         _pmapi = OMAbstractFactory.getOMFactory().createOMNamespace("http://www.apache.org/ode/pmapi","pmapi");
@@ -86,10 +89,12 @@ public class DeploymentWebService {
     }
 
     public void enableService(AxisConfiguration axisConfig, ProcessStore store,
-                              DeploymentPoller poller, String rootpath, String workPath) throws AxisFault, WSDLException {
+                              DeploymentPoller poller, String rootpath, String workPath, ODEServer odeServer) throws AxisFault, WSDLException {
         _deployPath = new File(workPath, "processes");
         _store = store;
         _poller = poller;
+        _odeServer = odeServer;
+        clusterEnabled = _odeServer.getIsCluteringEnabled();
 
         Definition def;
         WSDLReader wsdlReader = WSDLFactory.newInstance().newWSDLReader();
@@ -170,7 +175,7 @@ public class DeploymentWebService {
                         __log.info("Trying to access the lock for " + dest.getName());
 
                         //lock on deployment unit directory name
-                        duLocked = _poller.lock(dest.getName());
+                        duLocked = lock(dest.getName());
 
                         if (duLocked) {
                             boolean createDir = dest.mkdir();
@@ -214,7 +219,7 @@ public class DeploymentWebService {
                                 sendResponse(factory, messageContext, "deployResponse", response);
                             } finally {
                                 __log.info("Trying to release the lock for " + dest.getName());
-                                _poller.unlock(dest.getName());
+                                unlock(dest.getName());
                             }
                         }
                     } finally {
@@ -366,6 +371,18 @@ public class DeploymentWebService {
         out.close();
     }
 
+    //Implementation of IMap key Lock
+    private boolean lock(String key) {
+        if(clusterEnabled) {
+            return _odeServer.getBpelServer().getContexts().clusterManager.lock(key);
+        }
+        else return true;
+    }
 
-
+    private boolean unlock(String key) {
+        if(clusterEnabled) {
+            return _odeServer.getBpelServer().getContexts().clusterManager.unlock(key);
+        }
+        else return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
index 4a0aded..a1fe194 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -54,6 +54,28 @@ public interface ClusterManager {
      */
     boolean unlock(String key);
 
+    /**
+     * Tries to acquire the lock for the specified key.
+     * @param key
+     * @return
+     */
+    boolean tryLock(String key);
 
+    /**
+     * Set the Process Store object which uses for clustering
+     * @param ps
+     */
+    void setClusterProcessStore(Object ps);
 
+    /**
+     * Publish Deploy event to the cluster by deploy initiator
+     * @param event
+     */
+    void publishProcessStoreEvent(Object event);
+
+    /**
+     * Handle event according to received event
+     * @param message
+     */
+    void handleEvent(Object message);
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java
new file mode 100644
index 0000000..a623d47
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+import java.io.Serializable;
+
+public class ProcessStoreDeployedEvent implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public final String deploymentUnit;
+
+    public final String info;
+
+    public ProcessStoreDeployedEvent(String deploymentUnit) {
+        this.info = "Deployment Event";
+        this.deploymentUnit = deploymentUnit;
+    }
+
+    @Override
+    public String toString() {
+        return "{ProcessStoreDeployedEvent#" + deploymentUnit +"}";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
index 22ba2cd..6f35110 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
@@ -20,9 +20,9 @@ package org.apache.ode.store;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent;
 import org.apache.ode.bpel.iapi.ProcessState;
-import org.apache.ode.clustering.hazelcast.HazelcastClusterImpl;
 import org.apache.ode.bpel.iapi.EndpointReferenceContext;
 import org.apache.ode.il.config.OdeConfigProperties;
 
@@ -33,24 +33,17 @@ import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.hazelcast.core.*;
-
 public class ClusterProcessStoreImpl extends ProcessStoreImpl{
     private static final Log __log = LogFactory.getLog(ClusterProcessStoreImpl.class);
 
-    private HazelcastInstance _hazelcastInstance;
-    private Member deployInitiator;
-    private ITopic<String> clusterMessageTopic;
     private final ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>();
+    private ClusterManager _clusterManager;
+    private  ProcessStoreDeployedEvent deployedEvent;
 
-
-    public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, HazelcastClusterImpl hazelcastClusterImpl) {
+    public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, ClusterManager clusterManager) {
         super(eprContext,ds,persistenceType,props,createDatamodel);
-        _hazelcastInstance = hazelcastClusterImpl.getHazelcastInstance();
-
-        // Register for listening to message listener
-        clusterMessageTopic = _hazelcastInstance.getTopic("deployedMsg");
-        clusterMessageTopic.addMessageListener(new ClusterMessageListener());
+        _clusterManager = clusterManager;
+        _clusterManager.setClusterProcessStore(this);
     }
 
     public Collection<QName> deploy(final File deploymentUnitDirectory) {
@@ -63,9 +56,9 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
         return deployed;
     }
 
-    public void publishProcessStoreDeployedEvent(String duName){
-        deployInitiator = _hazelcastInstance.getCluster().getLocalMember();
-        clusterMessageTopic.publish("Deployed " +duName);
+    private void publishProcessStoreDeployedEvent(String duName){
+        deployedEvent = new ProcessStoreDeployedEvent(duName);
+        _clusterManager.publishProcessStoreEvent(deployedEvent);
     }
 
     public void publishService(final String duName) {
@@ -110,22 +103,6 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
         //loadAll();
     }
 
-    class ClusterMessageListener implements MessageListener<String> {
-        @Override
-        public void onMessage(Message<String> msg) {
-            String message = msg.getMessageObject();
-            String arr[] = message.split(" ", 2);
-            String duName = arr[1];
-            if(message.contains("Deployed ")) {
-                if(_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) {
-                    __log.info("Receive deployment msg to " +_hazelcastInstance.getCluster().getLocalMember() +" for " +duName);
-                    publishService(duName);
-                }
-                else deployInitiator = null;
-            }
-        }
-    }
-
     private Pattern getPreviousPackageVersionPattern(String duName) {
         String[] nameParts = duName.split("/");
         /* Replace the version number (if any) with regexp to match any version number */

http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index 6ae701b..2e6868f 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -19,38 +19,49 @@
 package org.apache.ode.clustering.hazelcast;
 
 import com.hazelcast.core.*;
+import com.hazelcast.config.FileSystemXmlConfig;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.ode.store.ClusterProcessStoreImpl;
 import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent;
 
 /**
  * This class implements necessary methods to build the cluster using hazelcast
  */
-public class HazelcastClusterImpl implements ClusterManager{
+public class HazelcastClusterImpl implements ClusterManager {
     private static final Log __log = LogFactory.getLog(HazelcastClusterImpl.class);
 
     private HazelcastInstance _hazelcastInstance;
     private boolean isMaster = false;
     private Member leader;
-
+    private Member deployInitiator;
     private IMap<String, String> lock_map;
+    private ITopic<Object> clusterMessageTopic;
+    private ClusterProcessStoreImpl _clusterProcessStore;
 
     public void init(File configRoot) {
-        //First,looks for the hazelcast.config system property. If it is set, its value is used as the path.
-        //Else it will load the hazelcast.xml file using FileSystemXmlConfig()
+        /*First,looks for the hazelcast.config system property. If it is set, its value is used as the path.
+        Else it will load the hazelcast.xml file using FileSystemXmlConfig()*/
         String hzConfig = System.getProperty("hazelcast.config");
         if (hzConfig != null) _hazelcastInstance = Hazelcast.newHazelcastInstance();
         else {
             File hzXml = new File(configRoot, "hazelcast.xml");
             if (!hzXml.isFile())
                 __log.error("hazelcast.xml does not exist or is not a file");
-            else _hazelcastInstance = Hazelcast.newHazelcastInstance(new FileSystemXmlConfig(hzXml));
+            else
+                try {
+                    _hazelcastInstance = Hazelcast.newHazelcastInstance(new FileSystemXmlConfig(hzXml));
+                } catch (FileNotFoundException fnf) {
+                    __log.error(fnf);
+                }
         }
 
         if (_hazelcastInstance != null) {
@@ -60,10 +71,15 @@ public class HazelcastClusterImpl implements ClusterManager{
             __log.info("Registering HZ localMember ID " + localMember);
             markAsMaster();
             lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_LOCK_MAP);
+
+            // Register for listening to message listener
+            clusterMessageTopic = _hazelcastInstance.getTopic("deployedMsg");
+            clusterMessageTopic.addMessageListener(new ClusterMessageListener());
         }
     }
 
     public boolean lock(String key) {
+        lock_map.putIfAbsent(key,key);
         lock_map.lock(key);
         boolean state = lock_map.isLocked(key);
         __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state);
@@ -81,6 +97,13 @@ public class HazelcastClusterImpl implements ClusterManager{
         return state;
     }
 
+    public boolean tryLock(String key) {
+        lock_map.putIfAbsent(key,key);
+        boolean state = lock_map.tryLock(key);
+        __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state );
+        return state;
+    }
+
     class ClusterMemberShipListener implements MembershipListener {
         @Override
         public void memberAdded(MembershipEvent membershipEvent) {
@@ -90,10 +113,6 @@ public class HazelcastClusterImpl implements ClusterManager{
         @Override
         public void memberRemoved(MembershipEvent membershipEvent) {
             markAsMaster();
-            // Allow Leader to update distributed map.
-            if (isMaster) {
-                String leftMemberID = getHazelCastNodeID(membershipEvent.getMember());
-            }
         }
 
         @Override
@@ -102,6 +121,32 @@ public class HazelcastClusterImpl implements ClusterManager{
         }
     }
 
+    public void publishProcessStoreEvent(Object deployedEvent) {
+        deployInitiator = _hazelcastInstance.getCluster().getLocalMember();
+        clusterMessageTopic.publish(deployedEvent);
+    }
+
+
+    class ClusterMessageListener implements MessageListener<Object> {
+        @Override
+        public void onMessage(Message<Object> msg) {
+            handleEvent(msg.getMessageObject());
+        }
+    }
+
+    public void handleEvent(Object message) {
+        if (message instanceof ProcessStoreDeployedEvent) {
+            ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message;
+
+            if (_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) {
+                String duName = event.deploymentUnit;
+                __log.info("Receive deployment msg to " + _hazelcastInstance.getCluster().getLocalMember() + " for " + duName);
+                _clusterProcessStore.publishService(duName);
+            } else deployInitiator = null;
+        }
+
+    }
+
     public void markAsMaster() {
         leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
         if (leader.localMember()) {
@@ -114,7 +159,9 @@ public class HazelcastClusterImpl implements ClusterManager{
         return isMaster;
     }
 
-    public HazelcastInstance getHazelcastInstance() {
-        return _hazelcastInstance;
+    public void setClusterProcessStore(Object store) {
+        if (store instanceof ClusterProcessStoreImpl)
+            _clusterProcessStore = (ClusterProcessStoreImpl) store;
     }
 }
+

http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
index e201b70..f9d1004 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
@@ -21,7 +21,6 @@ package org.apache.ode.clustering.hazelcast;
  * Constants used in Hazelcast based clustering implementation
  */
 public final class HazelcastConstants {
-    public static final String ODE_CLUSTER_NODE_MAP = "ODE_NODE_ID_MAP";
     public static final String ODE_CLUSTER_LOCK_MAP = "ODE_LOCK_MAP";
 
     private HazelcastConstants() {

http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceConfig.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceConfig.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceConfig.java
deleted file mode 100644
index 9e8c59b..0000000
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceConfig.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.ode.clustering.hazelcast;
-
-import com.hazelcast.config.Config;
-import com.hazelcast.config.FileSystemXmlConfig;
-import com.hazelcast.core.*;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-
-/**
- * This is to create hazelcast instance.
- * It sets the config object using hazelcast.xml file.First, it looks for the hazelcast.config system property. If it is set, its value is used as the path.
- * Else it will load the hazelcast.xml file using FileSystemXmlConfig()
- */
-public class HazelcastInstanceConfig {
-    private HazelcastInstance hazelcastInstance;
-
-    public HazelcastInstanceConfig() {
-        hazelcastInstance = Hazelcast.newHazelcastInstance();
-    }
-
-    /**
-     *
-     * @param hzXml
-     */
-    public HazelcastInstanceConfig(File hzXml) {
-        try {
-            Config config = new FileSystemXmlConfig(hzXml);
-            hazelcastInstance = Hazelcast.newHazelcastInstance(config);
-        } catch (FileNotFoundException fnf) {
-        }
-    }
-
-    public HazelcastInstance getHazelcastInstance() {
-        return hazelcastInstance;
-    }
-}
-