You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/11/06 11:51:03 UTC
[08/30] ode git commit: redesigning phase 2
redesigning phase 2
Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/521d640d
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/521d640d
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/521d640d
Branch: refs/heads/ODE-563
Commit: 521d640d672133d3864a61c54ba2b59e70e35a4b
Parents: afa36ee
Author: suba <su...@cse.mrt.ac.lk>
Authored: Tue Jun 16 17:20:04 2015 +0530
Committer: suba <su...@cse.mrt.ac.lk>
Committed: Tue Jun 16 17:20:04 2015 +0530
----------------------------------------------------------------------
Rakefile | 10 +--
.../java/org/apache/ode/axis2/ODEServer.java | 6 +-
.../ode/axis2/deploy/DeploymentPoller.java | 29 ++++----
.../ode/axis2/service/DeploymentWebService.java | 25 +++++--
.../apache/ode/bpel/clapi/ClusterManager.java | 22 +++++++
.../bpel/clapi/ProcessStoreDeployedEvent.java | 40 ++++++++++++
.../ode/store/ClusterProcessStoreImpl.java | 43 +++---------
.../hazelcast/HazelcastClusterImpl.java | 69 ++++++++++++++++----
.../hazelcast/HazelcastConstants.java | 1 -
.../hazelcast/HazelcastInstanceConfig.java | 56 ----------------
10 files changed, 175 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/Rakefile
----------------------------------------------------------------------
diff --git a/Rakefile b/Rakefile
index 06dcbd5..7c0fa67 100644
--- a/Rakefile
+++ b/Rakefile
@@ -86,7 +86,7 @@ define "ode" do
"scheduler-simple", "bpel-schemas", "bpel-store", "utils", "agents", "clustering"),
AXIOM, AXIS2_ALL, COMMONS.lang, COMMONS.collections, COMMONS.httpclient, COMMONS.lang,
DERBY, GERONIMO.kernel, GERONIMO.transaction, JAVAX.activation, JAVAX.servlet, JAVAX.stream,
- JAVAX.transaction, JENCKS, WSDL4J, WS_COMMONS, XMLBEANS, AXIS2_MODULES.libs, SLF4J, LOG4J, HAZELCAST
+ JAVAX.transaction, JENCKS, WSDL4J, WS_COMMONS, XMLBEANS, AXIS2_MODULES.libs, SLF4J, LOG4J
test.exclude 'org.apache.ode.axis2.management.*'
test.with project("tools"), AXIOM, JAVAX.javamail, COMMONS.codec, COMMONS.httpclient, XERCES, WOODSTOX
@@ -166,7 +166,7 @@ define "ode" do
desc "ODE APIs"
define "bpel-api" do
- compile.with projects("utils", "bpel-obj", "bpel-schemas"), WSDL4J, XERCES, SLF4J, LOG4J, HAZELCAST
+ compile.with projects("utils", "bpel-obj", "bpel-schemas"), WSDL4J, XERCES, SLF4J, LOG4J
package :jar
end
@@ -208,7 +208,7 @@ define "ode" do
desc "ODE Clustering"
define "clustering" do
- compile.with projects("bpel-api"),HAZELCAST, COMMONS.logging
+ compile.with projects("bpel-api","bpel-store"),HAZELCAST, COMMONS.logging
package :jar
end
@@ -270,8 +270,8 @@ define "ode" do
desc "ODE Process Store"
define "bpel-store" do
compile.with projects("bpel-api", "bpel-compiler", "bpel-dao", "bpel-obj", "bpel-schemas", "bpel-epr",
- "dao-hibernate", "dao-jpa", "clustering", "utils"),
- JAVAX.persistence, JAVAX.stream, JAVAX.transaction, HIBERNATE, HSQLDB, XMLBEANS, XERCES, WSDL4J, OPENJPA, SPRING, SLF4J, LOG4J,HAZELCAST
+ "dao-hibernate", "dao-jpa", "utils"),
+ JAVAX.persistence, JAVAX.stream, JAVAX.transaction, HIBERNATE, HSQLDB, XMLBEANS, XERCES, WSDL4J, OPENJPA, SPRING, SLF4J, LOG4J
compile { open_jpa_enhance }
resources hibernate_doclet(:package=>"org.apache.ode.store.hib", :excludedtags=>"@version,@author,@todo")
http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index 26489d2..f0ad470 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -234,7 +234,7 @@ public class ODEServer {
try {
__log.debug("Initializing Deployment Web Service");
- new DeploymentWebService().enableService(_configContext.getAxisConfiguration(), _store, _poller, _appRoot.getAbsolutePath(), _workRoot.getAbsolutePath());
+ new DeploymentWebService().enableService(_configContext.getAxisConfiguration(), _store, _poller, _appRoot.getAbsolutePath(), _workRoot.getAbsolutePath(),this);
} catch (Exception e) {
throw new ServletException(e);
}
@@ -490,8 +490,8 @@ public class ODEServer {
private void initClustering() {
String clusterImplName = _odeConfig.getClusteringImplClass();
try {
- Class<?> clustering_class = this.getClass().getClassLoader().loadClass(clusterImplName);
- _clusterManager = (ClusterManager) clustering_class.newInstance();
+ Class<?> clusterImplClass = this.getClass().getClassLoader().loadClass(clusterImplName);
+ _clusterManager = (ClusterManager) clusterImplClass.newInstance();
} catch (Exception ex) {
__log.error(ex);
}
http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
index 66890ba..ccb029b 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
@@ -54,7 +54,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import org.apache.ode.clustering.hazelcast.HazelcastClusterImpl;
+import org.apache.ode.bpel.clapi.ClusterManager;
/**
* Polls a directory for the deployment of a new deployment unit.
@@ -100,7 +100,7 @@ public class DeploymentPoller {
public DeploymentPoller(File deployDir, final ODEServer odeServer) {
_odeServer = odeServer;
_deployDir = deployDir;
- clusterEnabled = _odeServer.getClusteringState();
+ clusterEnabled = _odeServer.getIsCluteringEnabled();
if (!_deployDir.exists()) {
boolean isDeployDirCreated = _deployDir.mkdir();
if (!isDeployDirCreated) {
@@ -140,10 +140,13 @@ public class DeploymentPoller {
// Checking for new deployment directories
if (isDeploymentFromODEFileSystemAllowed() && files != null) {
for (File file : files) {
- __log.info("Trying to access the lock for " +file.getName());
- duLocked = lock(file.getName());
- try {
- if (duLocked) {
+ String test = file.getName();
+ __log.info("Trying to access the lock for " + test);
+ __log.info("Test null key value " +test);
+ duLocked = pollerTryLock(test);
+
+ if (duLocked) {
+ try {
File deployXml = new File(file, "deploy.xml");
File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
@@ -185,10 +188,10 @@ public class DeploymentPoller {
} catch (Exception e) {
__log.error("Deployment of " + file.getName() + " failed, aborting for now.", e);
}
+ } finally {
+ __log.info("Trying to release the lock for " + file.getName());
+ unlock(file.getName());
}
- } finally {
- __log.info("Trying to release the lock for " + file.getName());
- unlock(file.getName());
}
}
}
@@ -341,16 +344,16 @@ public class DeploymentPoller {
}
//Implementation of IMap key Lock
- public boolean lock(String key) {
+ private boolean pollerTryLock(String key) {
if(clusterEnabled) {
- return _odeServer.getBpelServer().getContexts().hazelcastClusterImpl.lock(key);
+ return _odeServer.getBpelServer().getContexts().clusterManager.tryLock(key);
}
else return true;
}
- public boolean unlock(String key) {
+ private boolean unlock(String key) {
if(clusterEnabled) {
- return _odeServer.getBpelServer().getContexts().hazelcastClusterImpl.unlock(key);
+ return _odeServer.getBpelServer().getContexts().clusterManager.unlock(key);
}
else return true;
}
http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
index bd35167..1951cf5 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
@@ -55,6 +55,7 @@ import org.apache.axis2.util.Utils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang.StringUtils;
+import org.apache.ode.axis2.ODEServer;
import org.apache.ode.axis2.OdeFault;
import org.apache.ode.axis2.deploy.DeploymentPoller;
import org.apache.ode.axis2.hooks.ODEAxisService;
@@ -76,9 +77,11 @@ public class DeploymentWebService {
private final OMNamespace _deployapi;
private File _deployPath;
+ private ODEServer _odeServer;
private DeploymentPoller _poller;
private ProcessStore _store;
+ private boolean clusterEnabled;
public DeploymentWebService() {
_pmapi = OMAbstractFactory.getOMFactory().createOMNamespace("http://www.apache.org/ode/pmapi","pmapi");
@@ -86,10 +89,12 @@ public class DeploymentWebService {
}
public void enableService(AxisConfiguration axisConfig, ProcessStore store,
- DeploymentPoller poller, String rootpath, String workPath) throws AxisFault, WSDLException {
+ DeploymentPoller poller, String rootpath, String workPath, ODEServer odeServer) throws AxisFault, WSDLException {
_deployPath = new File(workPath, "processes");
_store = store;
_poller = poller;
+ _odeServer = odeServer;
+ clusterEnabled = _odeServer.getIsCluteringEnabled();
Definition def;
WSDLReader wsdlReader = WSDLFactory.newInstance().newWSDLReader();
@@ -170,7 +175,7 @@ public class DeploymentWebService {
__log.info("Trying to access the lock for " + dest.getName());
//lock on deployment unit directory name
- duLocked = _poller.lock(dest.getName());
+ duLocked = lock(dest.getName());
if (duLocked) {
boolean createDir = dest.mkdir();
@@ -214,7 +219,7 @@ public class DeploymentWebService {
sendResponse(factory, messageContext, "deployResponse", response);
} finally {
__log.info("Trying to release the lock for " + dest.getName());
- _poller.unlock(dest.getName());
+ unlock(dest.getName());
}
}
} finally {
@@ -366,6 +371,18 @@ public class DeploymentWebService {
out.close();
}
+ //Implementation of IMap key Lock
+ private boolean lock(String key) {
+ if(clusterEnabled) {
+ return _odeServer.getBpelServer().getContexts().clusterManager.lock(key);
+ }
+ else return true;
+ }
-
+ private boolean unlock(String key) {
+ if(clusterEnabled) {
+ return _odeServer.getBpelServer().getContexts().clusterManager.unlock(key);
+ }
+ else return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
index 4a0aded..a1fe194 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -54,6 +54,28 @@ public interface ClusterManager {
*/
boolean unlock(String key);
+ /**
+ * Tries to acquire the lock for the specified key.
+ * @param key
+ * @return
+ */
+ boolean tryLock(String key);
+ /**
+ * Set the Process Store object which uses for clustering
+ * @param ps
+ */
+ void setClusterProcessStore(Object ps);
+ /**
+ * Publish Deploy event to the cluster by deploy initiator
+ * @param event
+ */
+ void publishProcessStoreEvent(Object event);
+
+ /**
+ * Handle event according to received event
+ * @param message
+ */
+ void handleEvent(Object message);
}
http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java
new file mode 100644
index 0000000..a623d47
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+import java.io.Serializable;
+
+public class ProcessStoreDeployedEvent implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public final String deploymentUnit;
+
+ public final String info;
+
+ public ProcessStoreDeployedEvent(String deploymentUnit) {
+ this.info = "Deployment Event";
+ this.deploymentUnit = deploymentUnit;
+ }
+
+ @Override
+ public String toString() {
+ return "{ProcessStoreDeployedEvent#" + deploymentUnit +"}";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
index 22ba2cd..6f35110 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
@@ -20,9 +20,9 @@ package org.apache.ode.store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent;
import org.apache.ode.bpel.iapi.ProcessState;
-import org.apache.ode.clustering.hazelcast.HazelcastClusterImpl;
import org.apache.ode.bpel.iapi.EndpointReferenceContext;
import org.apache.ode.il.config.OdeConfigProperties;
@@ -33,24 +33,17 @@ import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.hazelcast.core.*;
-
public class ClusterProcessStoreImpl extends ProcessStoreImpl{
private static final Log __log = LogFactory.getLog(ClusterProcessStoreImpl.class);
- private HazelcastInstance _hazelcastInstance;
- private Member deployInitiator;
- private ITopic<String> clusterMessageTopic;
private final ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>();
+ private ClusterManager _clusterManager;
+ private ProcessStoreDeployedEvent deployedEvent;
-
- public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, HazelcastClusterImpl hazelcastClusterImpl) {
+ public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, ClusterManager clusterManager) {
super(eprContext,ds,persistenceType,props,createDatamodel);
- _hazelcastInstance = hazelcastClusterImpl.getHazelcastInstance();
-
- // Register for listening to message listener
- clusterMessageTopic = _hazelcastInstance.getTopic("deployedMsg");
- clusterMessageTopic.addMessageListener(new ClusterMessageListener());
+ _clusterManager = clusterManager;
+ _clusterManager.setClusterProcessStore(this);
}
public Collection<QName> deploy(final File deploymentUnitDirectory) {
@@ -63,9 +56,9 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
return deployed;
}
- public void publishProcessStoreDeployedEvent(String duName){
- deployInitiator = _hazelcastInstance.getCluster().getLocalMember();
- clusterMessageTopic.publish("Deployed " +duName);
+ private void publishProcessStoreDeployedEvent(String duName){
+ deployedEvent = new ProcessStoreDeployedEvent(duName);
+ _clusterManager.publishProcessStoreEvent(deployedEvent);
}
public void publishService(final String duName) {
@@ -110,22 +103,6 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{
//loadAll();
}
- class ClusterMessageListener implements MessageListener<String> {
- @Override
- public void onMessage(Message<String> msg) {
- String message = msg.getMessageObject();
- String arr[] = message.split(" ", 2);
- String duName = arr[1];
- if(message.contains("Deployed ")) {
- if(_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) {
- __log.info("Receive deployment msg to " +_hazelcastInstance.getCluster().getLocalMember() +" for " +duName);
- publishService(duName);
- }
- else deployInitiator = null;
- }
- }
- }
-
private Pattern getPreviousPackageVersionPattern(String duName) {
String[] nameParts = duName.split("/");
/* Replace the version number (if any) with regexp to match any version number */
http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index 6ae701b..2e6868f 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -19,38 +19,49 @@
package org.apache.ode.clustering.hazelcast;
import com.hazelcast.core.*;
+import com.hazelcast.config.FileSystemXmlConfig;
import java.io.File;
+import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.store.ClusterProcessStoreImpl;
import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent;
/**
* This class implements necessary methods to build the cluster using hazelcast
*/
-public class HazelcastClusterImpl implements ClusterManager{
+public class HazelcastClusterImpl implements ClusterManager {
private static final Log __log = LogFactory.getLog(HazelcastClusterImpl.class);
private HazelcastInstance _hazelcastInstance;
private boolean isMaster = false;
private Member leader;
-
+ private Member deployInitiator;
private IMap<String, String> lock_map;
+ private ITopic<Object> clusterMessageTopic;
+ private ClusterProcessStoreImpl _clusterProcessStore;
public void init(File configRoot) {
- //First,looks for the hazelcast.config system property. If it is set, its value is used as the path.
- //Else it will load the hazelcast.xml file using FileSystemXmlConfig()
+ /*First,looks for the hazelcast.config system property. If it is set, its value is used as the path.
+ Else it will load the hazelcast.xml file using FileSystemXmlConfig()*/
String hzConfig = System.getProperty("hazelcast.config");
if (hzConfig != null) _hazelcastInstance = Hazelcast.newHazelcastInstance();
else {
File hzXml = new File(configRoot, "hazelcast.xml");
if (!hzXml.isFile())
__log.error("hazelcast.xml does not exist or is not a file");
- else _hazelcastInstance = Hazelcast.newHazelcastInstance(new FileSystemXmlConfig(hzXml));
+ else
+ try {
+ _hazelcastInstance = Hazelcast.newHazelcastInstance(new FileSystemXmlConfig(hzXml));
+ } catch (FileNotFoundException fnf) {
+ __log.error(fnf);
+ }
}
if (_hazelcastInstance != null) {
@@ -60,10 +71,15 @@ public class HazelcastClusterImpl implements ClusterManager{
__log.info("Registering HZ localMember ID " + localMember);
markAsMaster();
lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_LOCK_MAP);
+
+ // Register for listening to message listener
+ clusterMessageTopic = _hazelcastInstance.getTopic("deployedMsg");
+ clusterMessageTopic.addMessageListener(new ClusterMessageListener());
}
}
public boolean lock(String key) {
+ lock_map.putIfAbsent(key,key);
lock_map.lock(key);
boolean state = lock_map.isLocked(key);
__log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state);
@@ -81,6 +97,13 @@ public class HazelcastClusterImpl implements ClusterManager{
return state;
}
+ public boolean tryLock(String key) {
+ lock_map.putIfAbsent(key,key);
+ boolean state = lock_map.tryLock(key);
+ __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state );
+ return state;
+ }
+
class ClusterMemberShipListener implements MembershipListener {
@Override
public void memberAdded(MembershipEvent membershipEvent) {
@@ -90,10 +113,6 @@ public class HazelcastClusterImpl implements ClusterManager{
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
markAsMaster();
- // Allow Leader to update distributed map.
- if (isMaster) {
- String leftMemberID = getHazelCastNodeID(membershipEvent.getMember());
- }
}
@Override
@@ -102,6 +121,32 @@ public class HazelcastClusterImpl implements ClusterManager{
}
}
+ public void publishProcessStoreEvent(Object deployedEvent) {
+ deployInitiator = _hazelcastInstance.getCluster().getLocalMember();
+ clusterMessageTopic.publish(deployedEvent);
+ }
+
+
+ class ClusterMessageListener implements MessageListener<Object> {
+ @Override
+ public void onMessage(Message<Object> msg) {
+ handleEvent(msg.getMessageObject());
+ }
+ }
+
+ public void handleEvent(Object message) {
+ if (message instanceof ProcessStoreDeployedEvent) {
+ ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message;
+
+ if (_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) {
+ String duName = event.deploymentUnit;
+ __log.info("Receive deployment msg to " + _hazelcastInstance.getCluster().getLocalMember() + " for " + duName);
+ _clusterProcessStore.publishService(duName);
+ } else deployInitiator = null;
+ }
+
+ }
+
public void markAsMaster() {
leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
if (leader.localMember()) {
@@ -114,7 +159,9 @@ public class HazelcastClusterImpl implements ClusterManager{
return isMaster;
}
- public HazelcastInstance getHazelcastInstance() {
- return _hazelcastInstance;
+ public void setClusterProcessStore(Object store) {
+ if (store instanceof ClusterProcessStoreImpl)
+ _clusterProcessStore = (ClusterProcessStoreImpl) store;
}
}
+
http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
index e201b70..f9d1004 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
@@ -21,7 +21,6 @@ package org.apache.ode.clustering.hazelcast;
* Constants used in Hazelcast based clustering implementation
*/
public final class HazelcastConstants {
- public static final String ODE_CLUSTER_NODE_MAP = "ODE_NODE_ID_MAP";
public static final String ODE_CLUSTER_LOCK_MAP = "ODE_LOCK_MAP";
private HazelcastConstants() {
http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceConfig.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceConfig.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceConfig.java
deleted file mode 100644
index 9e8c59b..0000000
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceConfig.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.ode.clustering.hazelcast;
-
-import com.hazelcast.config.Config;
-import com.hazelcast.config.FileSystemXmlConfig;
-import com.hazelcast.core.*;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-
-/**
- * This is to create hazelcast instance.
- * It sets the config object using hazelcast.xml file.First, it looks for the hazelcast.config system property. If it is set, its value is used as the path.
- * Else it will load the hazelcast.xml file using FileSystemXmlConfig()
- */
-public class HazelcastInstanceConfig {
- private HazelcastInstance hazelcastInstance;
-
- public HazelcastInstanceConfig() {
- hazelcastInstance = Hazelcast.newHazelcastInstance();
- }
-
- /**
- *
- * @param hzXml
- */
- public HazelcastInstanceConfig(File hzXml) {
- try {
- Config config = new FileSystemXmlConfig(hzXml);
- hazelcastInstance = Hazelcast.newHazelcastInstance(config);
- } catch (FileNotFoundException fnf) {
- }
- }
-
- public HazelcastInstance getHazelcastInstance() {
- return hazelcastInstance;
- }
-}
-