You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/12/01 10:43:26 UTC
stratos git commit: Moving distributed object provider class to
stratos common module
Repository: stratos
Updated Branches:
refs/heads/master dee7291cf -> dce11f62f
Moving distributed object provider class to stratos common module
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/dce11f62
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/dce11f62
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/dce11f62
Branch: refs/heads/master
Commit: dce11f62f2d1c12b7887447513053e1ec58131d9
Parents: dee7291
Author: Imesh Gunaratne <im...@apache.org>
Authored: Mon Dec 1 15:13:05 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Mon Dec 1 15:13:20 2014 +0530
----------------------------------------------------------------------
.../clustering/DistributedObjectHandler.java | 140 ---------------
.../context/CloudControllerContext.java | 62 +++----
components/org.apache.stratos.common/pom.xml | 1 +
.../clustering/DistributedObjectProvider.java | 173 +++++++++++++++++++
4 files changed, 205 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/dce11f62/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java
deleted file mode 100644
index 2b4437e..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.cloud.controller.clustering;
-
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IList;
-import com.hazelcast.core.ILock;
-import com.hazelcast.core.IMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * An object handler for managing objects in distributed and non-distributed environments.
- */
-public class DistributedObjectHandler {
- private static final Log log = LogFactory.getLog(DistributedObjectHandler.class);
-
- private final boolean clustered;
- private final HazelcastInstance hazelcastInstance;
-
- public DistributedObjectHandler(boolean clustered, HazelcastInstance hazelcastInstance) {
- this.clustered = clustered;
- this.hazelcastInstance = hazelcastInstance;
- }
-
- private com.hazelcast.core.ILock acquireDistributedLock(Object object) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName()));
- }
- ILock lock = hazelcastInstance.getLock(object);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName()));
- }
- return lock;
- }
-
- private void releaseDistributedLock(ILock lock) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Releasing distributed lock for %s...", lock.getKey()));
- }
- lock.forceUnlock();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Distributed lock released for %s", lock.getKey()));
- }
- }
-
- public Map getMap(String key) {
- if(clustered) {
- return hazelcastInstance.getMap(key);
- } else {
- return new ConcurrentHashMap<Object, Object>();
- }
- }
-
- public List getList(String name) {
- if(clustered) {
- return hazelcastInstance.getList(name);
- } else {
- return new ArrayList();
- }
- }
-
- public void putToMap(Map map, Object key, Object value) {
- if(clustered) {
- ILock lock = null;
- try {
- lock = acquireDistributedLock(map);
- ((IMap)map).set(key, value);
- } finally {
- releaseDistributedLock(lock);
- }
- } else {
- map.put(key, value);
- }
- }
-
- public void removeFromMap(Map map, Object key) {
- if(clustered) {
- ILock lock = null;
- try {
- lock = acquireDistributedLock(map);
- ((IMap)map).delete(key);
- } finally {
- releaseDistributedLock(lock);
- }
- } else {
- map.remove(key);
- }
- }
-
- public void addToList(List list, Object value) {
- if(clustered) {
- ILock lock = null;
- try {
- lock = acquireDistributedLock(list);
- ((IList)list).add(value);
- } finally {
- releaseDistributedLock(lock);
- }
- } else {
- list.add(value);
- }
- }
-
- public void removeFromList(List list, Object value) {
- if(clustered) {
- ILock lock = null;
- try {
- lock = acquireDistributedLock(list);
- ((IList)list).remove(value);
- } finally {
- releaseDistributedLock(lock);
- }
- } else {
- list.remove(value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/dce11f62/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
index b589ecc..969c3c0 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
@@ -22,7 +22,7 @@ import org.apache.axis2.clustering.ClusteringAgent;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.clustering.DistributedObjectHandler;
+import org.apache.stratos.common.clustering.DistributedObjectProvider;
import org.apache.stratos.cloud.controller.domain.*;
import org.apache.stratos.cloud.controller.internal.ServiceReferenceHolder;
import org.apache.stratos.cloud.controller.registry.Deserializer;
@@ -60,7 +60,7 @@ public class CloudControllerContext implements Serializable {
private static volatile CloudControllerContext instance;
- private final DistributedObjectHandler distributedObjectHandler;
+ private final DistributedObjectProvider distributedObjectProvider;
/* We keep following maps in order to make the look up time, small. */
@@ -131,18 +131,18 @@ public class CloudControllerContext implements Serializable {
}
// Initialize distributed object handler
- distributedObjectHandler = new DistributedObjectHandler(isClustered(),
+ distributedObjectProvider = new DistributedObjectProvider(isClustered(),
ServiceReferenceHolder.getInstance().getHazelcastInstance());
// Initialize objects
- clusterIdToMemberContextListMap = distributedObjectHandler.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX);
- memberIdToMemberContextMap = distributedObjectHandler.getMap(CC_MEMBER_ID_TO_MEMBER_CTX);
- memberIdToScheduledTaskMap = distributedObjectHandler.getMap(CC_MEMBER_ID_TO_SCH_TASK);
- kubClusterIdToKubClusterContextMap = distributedObjectHandler.getMap(CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX);
- clusterIdToContextMap = distributedObjectHandler.getMap(CC_CLUSTER_ID_TO_CLUSTER_CTX);
- cartridgeTypeToPartitionIdsMap = distributedObjectHandler.getMap(CC_CARTRIDGE_TYPE_TO_PARTITION_IDS);
- cartridges = distributedObjectHandler.getList(CC_CARTRIDGES);
- serviceGroups = distributedObjectHandler.getList(CC_SERVICE_GROUPS);
+ clusterIdToMemberContextListMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX);
+ memberIdToMemberContextMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_MEMBER_CTX);
+ memberIdToScheduledTaskMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_SCH_TASK);
+ kubClusterIdToKubClusterContextMap = distributedObjectProvider.getMap(CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX);
+ clusterIdToContextMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_CLUSTER_CTX);
+ cartridgeTypeToPartitionIdsMap = distributedObjectProvider.getMap(CC_CARTRIDGE_TYPE_TO_PARTITION_IDS);
+ cartridges = distributedObjectProvider.getList(CC_CARTRIDGES);
+ serviceGroups = distributedObjectProvider.getList(CC_SERVICE_GROUPS);
// Update context from the registry
updateContextFromRegistry();
@@ -185,7 +185,7 @@ public class CloudControllerContext implements Serializable {
}
public void addCartridge(Cartridge newCartridges) {
- distributedObjectHandler.addToList(cartridges, newCartridges);
+ distributedObjectProvider.addToList(cartridges, newCartridges);
}
public ServiceGroup getServiceGroup(String name) {
@@ -198,7 +198,7 @@ public class CloudControllerContext implements Serializable {
}
public void addServiceGroup(ServiceGroup newServiceGroup) {
- distributedObjectHandler.addToList(serviceGroups, newServiceGroup);
+ distributedObjectProvider.addToList(serviceGroups, newServiceGroup);
}
public void removeServiceGroup(List<ServiceGroup> serviceGroup) {
@@ -240,17 +240,17 @@ public class CloudControllerContext implements Serializable {
}
public void addMemberContext(MemberContext memberContext) {
- distributedObjectHandler.putToMap(memberIdToMemberContextMap, memberContext.getMemberId(), memberContext);
+ distributedObjectProvider.putToMap(memberIdToMemberContextMap, memberContext.getMemberId(), memberContext);
List<MemberContext> memberContextList;
if ((memberContextList = clusterIdToMemberContextListMap.get(memberContext.getClusterId())) == null) {
memberContextList = new ArrayList<MemberContext>();
}
if (memberContextList.contains(memberContext)) {
- distributedObjectHandler.removeFromList(memberContextList,memberContext);
+ distributedObjectProvider.removeFromList(memberContextList,memberContext);
}
- distributedObjectHandler.addToList(memberContextList, memberContext);
- distributedObjectHandler.putToMap(clusterIdToMemberContextListMap, memberContext.getClusterId(),
+ distributedObjectProvider.addToList(memberContextList, memberContext);
+ distributedObjectProvider.putToMap(clusterIdToMemberContextListMap, memberContext.getClusterId(),
memberContextList);
if (log.isDebugEnabled()) {
log.debug("Added member context to the cloud controller context: " + memberContext);
@@ -258,20 +258,20 @@ public class CloudControllerContext implements Serializable {
}
public void addScheduledFutureJob(String memberId, ScheduledFuture<?> job) {
- distributedObjectHandler.putToMap(memberIdToScheduledTaskMap, memberId, job);
+ distributedObjectProvider.putToMap(memberIdToScheduledTaskMap, memberId, job);
}
public List<MemberContext> removeMemberContextsOfCluster(String clusterId) {
List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId);
- distributedObjectHandler.removeFromMap(clusterIdToMemberContextListMap, clusterId);
+ distributedObjectProvider.removeFromMap(clusterIdToMemberContextListMap, clusterId);
if (memberContextList == null) {
return new ArrayList<MemberContext>();
}
for (MemberContext memberContext : memberContextList) {
String memberId = memberContext.getMemberId();
- distributedObjectHandler.removeFromMap(memberIdToMemberContextMap, memberId);
+ distributedObjectProvider.removeFromMap(memberIdToMemberContextMap, memberId);
ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId);
- distributedObjectHandler.removeFromMap(memberIdToScheduledTaskMap, memberId);
+ distributedObjectProvider.removeFromMap(memberIdToScheduledTaskMap, memberId);
stopTask(task);
if (log.isDebugEnabled()) {
@@ -284,7 +284,7 @@ public class CloudControllerContext implements Serializable {
public MemberContext removeMemberContext(String memberId, String clusterId) {
MemberContext removedMemberContext = memberIdToMemberContextMap.get(memberId);
- distributedObjectHandler.removeFromMap(memberIdToMemberContextMap, memberId);
+ distributedObjectProvider.removeFromMap(memberIdToMemberContextMap, memberId);
List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId);
if (memberContextList != null) {
@@ -298,10 +298,10 @@ public class CloudControllerContext implements Serializable {
iterator.remove();
}
}
- distributedObjectHandler.putToMap(clusterIdToMemberContextListMap, clusterId, newCtxts);
+ distributedObjectProvider.putToMap(clusterIdToMemberContextListMap, clusterId, newCtxts);
}
ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId);
- distributedObjectHandler.removeFromMap(memberIdToScheduledTaskMap, memberId);
+ distributedObjectProvider.removeFromMap(memberIdToScheduledTaskMap, memberId);
stopTask(task);
return removedMemberContext;
}
@@ -322,7 +322,7 @@ public class CloudControllerContext implements Serializable {
}
public void addClusterContext(ClusterContext ctxt) {
- distributedObjectHandler.putToMap(clusterIdToContextMap, ctxt.getClusterId(), ctxt);
+ distributedObjectProvider.putToMap(clusterIdToContextMap, ctxt.getClusterId(), ctxt);
}
public ClusterContext getClusterContext(String clusterId) {
@@ -331,7 +331,7 @@ public class CloudControllerContext implements Serializable {
public ClusterContext removeClusterContext(String clusterId) {
ClusterContext removed = clusterIdToContextMap.get(clusterId);
- distributedObjectHandler.removeFromMap(clusterIdToContextMap, clusterId);
+ distributedObjectProvider.removeFromMap(clusterIdToContextMap, clusterId);
return removed;
}
@@ -349,11 +349,11 @@ public class CloudControllerContext implements Serializable {
list = new ArrayList<String>();
}
list.add(partitionId);
- distributedObjectHandler.putToMap(cartridgeTypeToPartitionIdsMap, cartridgeType, list);
+ distributedObjectProvider.putToMap(cartridgeTypeToPartitionIdsMap, cartridgeType, list);
}
public void removeFromCartridgeTypeToPartitionIds(String cartridgeType) {
- distributedObjectHandler.removeFromMap(cartridgeTypeToPartitionIdsMap, cartridgeType);
+ distributedObjectProvider.removeFromMap(cartridgeTypeToPartitionIdsMap, cartridgeType);
}
public KubernetesClusterContext getKubernetesClusterContext(String kubClusterId) {
@@ -361,7 +361,7 @@ public class CloudControllerContext implements Serializable {
}
public void addKubernetesClusterContext(KubernetesClusterContext kubernetesClusterContext) {
- distributedObjectHandler.putToMap(kubClusterIdToKubClusterContextMap,
+ distributedObjectProvider.putToMap(kubClusterIdToKubClusterContextMap,
kubernetesClusterContext.getKubernetesClusterId(),
kubernetesClusterContext);
}
@@ -420,13 +420,13 @@ public class CloudControllerContext implements Serializable {
private void copyMap(Map sourceMap, Map destinationMap) {
for(Object key : sourceMap.keySet()) {
- distributedObjectHandler.putToMap(destinationMap, key, sourceMap.get(key));
+ distributedObjectProvider.putToMap(destinationMap, key, sourceMap.get(key));
}
}
private void copyList(List sourceList, List destinationList) {
for(Object item : sourceList) {
- distributedObjectHandler.addToList(destinationList, item);
+ distributedObjectProvider.addToList(destinationList, item);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/dce11f62/components/org.apache.stratos.common/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/pom.xml b/components/org.apache.stratos.common/pom.xml
index 618eb5f..6c33f0d 100644
--- a/components/org.apache.stratos.common/pom.xml
+++ b/components/org.apache.stratos.common/pom.xml
@@ -46,6 +46,7 @@
<Bundle-Name>${project.artifactId}</Bundle-Name>
<Export-Package>
org.apache.stratos.common.*,
+ org.apache.stratos.common.clustering.*
org.apache.stratos.common.statistics.publisher.*,
</Export-Package>
<Import-Package>
http://git-wip-us.apache.org/repos/asf/stratos/blob/dce11f62/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
new file mode 100644
index 0000000..fe47ca4
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.clustering;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IList;
+import com.hazelcast.core.ILock;
+import com.hazelcast.core.IMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Provides objects to be managed in distributed and non-distributed environments.
+ */
+public class DistributedObjectProvider {
+ private static final Log log = LogFactory.getLog(DistributedObjectProvider.class);
+
+ private final boolean clustered;
+ private final HazelcastInstance hazelcastInstance;
+
+ public DistributedObjectProvider(boolean clustered, HazelcastInstance hazelcastInstance) {
+ this.clustered = clustered;
+ this.hazelcastInstance = hazelcastInstance;
+ }
+
+ private com.hazelcast.core.ILock acquireDistributedLock(Object object) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName()));
+ }
+ ILock lock = hazelcastInstance.getLock(object);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName()));
+ }
+ return lock;
+ }
+
+ private void releaseDistributedLock(ILock lock) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Releasing distributed lock for %s...", lock.getKey()));
+ }
+ lock.forceUnlock();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Distributed lock released for %s", lock.getKey()));
+ }
+ }
+
+ /**
+ * If clustering is enabled returns a distributed map object, otherwise returns a
+ * concurrent local map object.
+ * @param key
+ * @return
+ */
+ public Map getMap(String key) {
+ if(clustered) {
+ return hazelcastInstance.getMap(key);
+ } else {
+ return new ConcurrentHashMap<Object, Object>();
+ }
+ }
+
+ /**
+ * If clustering is enabled returns a distributed list, otherwise returns
+ * a local array list.
+ * @param name
+ * @return
+ */
+ public List getList(String name) {
+ if(clustered) {
+ return hazelcastInstance.getList(name);
+ } else {
+ return new ArrayList();
+ }
+ }
+
+ /**
+ * Put a key value pair to a map, if clustered use a distributed lock.
+ * @param map
+ * @param key
+ * @param value
+ */
+ public void putToMap(Map map, Object key, Object value) {
+ if(clustered) {
+ ILock lock = null;
+ try {
+ lock = acquireDistributedLock(map);
+ ((IMap)map).set(key, value);
+ } finally {
+ releaseDistributedLock(lock);
+ }
+ } else {
+ map.put(key, value);
+ }
+ }
+
+ /**
+ * Remove an object from a map, if clustered use a distributed lock.
+ * @param map
+ * @param key
+ */
+ public void removeFromMap(Map map, Object key) {
+ if(clustered) {
+ ILock lock = null;
+ try {
+ lock = acquireDistributedLock(map);
+ ((IMap)map).delete(key);
+ } finally {
+ releaseDistributedLock(lock);
+ }
+ } else {
+ map.remove(key);
+ }
+ }
+
+ /**
+ * Add an object to a list, if clustered use a distributed lock.
+ * @param list
+ * @param value
+ */
+ public void addToList(List list, Object value) {
+ if(clustered) {
+ ILock lock = null;
+ try {
+ lock = acquireDistributedLock(list);
+ ((IList)list).add(value);
+ } finally {
+ releaseDistributedLock(lock);
+ }
+ } else {
+ list.add(value);
+ }
+ }
+
+ /**
+ * Remove an object from a list, if clustered use a distributed lock.
+ * @param list
+ * @param value
+ */
+ public void removeFromList(List list, Object value) {
+ if(clustered) {
+ ILock lock = null;
+ try {
+ lock = acquireDistributedLock(list);
+ ((IList)list).remove(value);
+ } finally {
+ releaseDistributedLock(lock);
+ }
+ } else {
+ list.remove(value);
+ }
+ }
+}