You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/12/01 10:43:26 UTC

stratos git commit: Moving distributed object provider class to stratos common module

Repository: stratos
Updated Branches:
  refs/heads/master dee7291cf -> dce11f62f


Moving distributed object provider class to stratos common module


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/dce11f62
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/dce11f62
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/dce11f62

Branch: refs/heads/master
Commit: dce11f62f2d1c12b7887447513053e1ec58131d9
Parents: dee7291
Author: Imesh Gunaratne <im...@apache.org>
Authored: Mon Dec 1 15:13:05 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Mon Dec 1 15:13:20 2014 +0530

----------------------------------------------------------------------
 .../clustering/DistributedObjectHandler.java    | 140 ---------------
 .../context/CloudControllerContext.java         |  62 +++----
 components/org.apache.stratos.common/pom.xml    |   1 +
 .../clustering/DistributedObjectProvider.java   | 173 +++++++++++++++++++
 4 files changed, 205 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/dce11f62/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java
deleted file mode 100644
index 2b4437e..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.cloud.controller.clustering;
-
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IList;
-import com.hazelcast.core.ILock;
-import com.hazelcast.core.IMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * An object handler for managing objects in distributed and non-distributed environments.
- */
-public class DistributedObjectHandler {
-    private static final Log log = LogFactory.getLog(DistributedObjectHandler.class);
-
-    private final boolean clustered;
-    private final HazelcastInstance hazelcastInstance;
-
-    public DistributedObjectHandler(boolean clustered, HazelcastInstance hazelcastInstance) {
-        this.clustered = clustered;
-        this.hazelcastInstance = hazelcastInstance;
-    }
-
-    private com.hazelcast.core.ILock acquireDistributedLock(Object object) {
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName()));
-        }
-        ILock lock = hazelcastInstance.getLock(object);
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName()));
-        }
-        return lock;
-    }
-
-    private void releaseDistributedLock(ILock lock) {
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Releasing distributed lock for %s...", lock.getKey()));
-        }
-        lock.forceUnlock();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Distributed lock released for %s", lock.getKey()));
-        }
-    }
-
-    public Map getMap(String key) {
-        if(clustered) {
-            return hazelcastInstance.getMap(key);
-        } else {
-            return new ConcurrentHashMap<Object, Object>();
-        }
-    }
-
-    public List getList(String name) {
-        if(clustered) {
-            return hazelcastInstance.getList(name);
-        } else {
-            return new ArrayList();
-        }
-    }
-
-    public void putToMap(Map map, Object key, Object value) {
-         if(clustered) {
-             ILock lock = null;
-             try {
-                 lock = acquireDistributedLock(map);
-                 ((IMap)map).set(key, value);
-             } finally {
-                 releaseDistributedLock(lock);
-             }
-         } else {
-            map.put(key, value);
-         }
-    }
-
-    public void removeFromMap(Map map, Object key) {
-        if(clustered) {
-            ILock lock = null;
-            try {
-                lock = acquireDistributedLock(map);
-                ((IMap)map).delete(key);
-            } finally {
-                releaseDistributedLock(lock);
-            }
-        } else {
-            map.remove(key);
-        }
-    }
-
-    public void addToList(List list, Object value) {
-        if(clustered) {
-            ILock lock = null;
-            try {
-                lock = acquireDistributedLock(list);
-                ((IList)list).add(value);
-            } finally {
-                releaseDistributedLock(lock);
-            }
-        } else {
-            list.add(value);
-        }
-    }
-
-    public void removeFromList(List list, Object value) {
-        if(clustered) {
-            ILock lock = null;
-            try {
-                lock = acquireDistributedLock(list);
-                ((IList)list).remove(value);
-            } finally {
-                releaseDistributedLock(lock);
-            }
-        } else {
-            list.remove(value);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/dce11f62/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
index b589ecc..969c3c0 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
@@ -22,7 +22,7 @@ import org.apache.axis2.clustering.ClusteringAgent;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.clustering.DistributedObjectHandler;
+import org.apache.stratos.common.clustering.DistributedObjectProvider;
 import org.apache.stratos.cloud.controller.domain.*;
 import org.apache.stratos.cloud.controller.internal.ServiceReferenceHolder;
 import org.apache.stratos.cloud.controller.registry.Deserializer;
@@ -60,7 +60,7 @@ public class CloudControllerContext implements Serializable {
 
     private static volatile CloudControllerContext instance;
 
-    private final DistributedObjectHandler distributedObjectHandler;
+    private final DistributedObjectProvider distributedObjectProvider;
 
 	/* We keep following maps in order to make the look up time, small. */
 
@@ -131,18 +131,18 @@ public class CloudControllerContext implements Serializable {
         }
 
         // Initialize distributed object handler
-        distributedObjectHandler = new DistributedObjectHandler(isClustered(),
+        distributedObjectProvider = new DistributedObjectProvider(isClustered(),
                 ServiceReferenceHolder.getInstance().getHazelcastInstance());
 
         // Initialize objects
-        clusterIdToMemberContextListMap = distributedObjectHandler.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX);
-        memberIdToMemberContextMap = distributedObjectHandler.getMap(CC_MEMBER_ID_TO_MEMBER_CTX);
-        memberIdToScheduledTaskMap = distributedObjectHandler.getMap(CC_MEMBER_ID_TO_SCH_TASK);
-        kubClusterIdToKubClusterContextMap = distributedObjectHandler.getMap(CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX);
-        clusterIdToContextMap = distributedObjectHandler.getMap(CC_CLUSTER_ID_TO_CLUSTER_CTX);
-        cartridgeTypeToPartitionIdsMap = distributedObjectHandler.getMap(CC_CARTRIDGE_TYPE_TO_PARTITION_IDS);
-        cartridges = distributedObjectHandler.getList(CC_CARTRIDGES);
-        serviceGroups = distributedObjectHandler.getList(CC_SERVICE_GROUPS);
+        clusterIdToMemberContextListMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX);
+        memberIdToMemberContextMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_MEMBER_CTX);
+        memberIdToScheduledTaskMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_SCH_TASK);
+        kubClusterIdToKubClusterContextMap = distributedObjectProvider.getMap(CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX);
+        clusterIdToContextMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_CLUSTER_CTX);
+        cartridgeTypeToPartitionIdsMap = distributedObjectProvider.getMap(CC_CARTRIDGE_TYPE_TO_PARTITION_IDS);
+        cartridges = distributedObjectProvider.getList(CC_CARTRIDGES);
+        serviceGroups = distributedObjectProvider.getList(CC_SERVICE_GROUPS);
 
         // Update context from the registry
         updateContextFromRegistry();
@@ -185,7 +185,7 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addCartridge(Cartridge newCartridges) {
-        distributedObjectHandler.addToList(cartridges, newCartridges);
+        distributedObjectProvider.addToList(cartridges, newCartridges);
     }
 
     public ServiceGroup getServiceGroup(String name) {
@@ -198,7 +198,7 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addServiceGroup(ServiceGroup newServiceGroup) {
-        distributedObjectHandler.addToList(serviceGroups, newServiceGroup);
+        distributedObjectProvider.addToList(serviceGroups, newServiceGroup);
     }
 
     public void removeServiceGroup(List<ServiceGroup> serviceGroup) {
@@ -240,17 +240,17 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addMemberContext(MemberContext memberContext) {
-        distributedObjectHandler.putToMap(memberIdToMemberContextMap, memberContext.getMemberId(), memberContext);
+        distributedObjectProvider.putToMap(memberIdToMemberContextMap, memberContext.getMemberId(), memberContext);
 
         List<MemberContext> memberContextList;
         if ((memberContextList = clusterIdToMemberContextListMap.get(memberContext.getClusterId())) == null) {
             memberContextList = new ArrayList<MemberContext>();
         }
         if (memberContextList.contains(memberContext)) {
-            distributedObjectHandler.removeFromList(memberContextList,memberContext);
+            distributedObjectProvider.removeFromList(memberContextList,memberContext);
         }
-        distributedObjectHandler.addToList(memberContextList, memberContext);
-        distributedObjectHandler.putToMap(clusterIdToMemberContextListMap, memberContext.getClusterId(),
+        distributedObjectProvider.addToList(memberContextList, memberContext);
+        distributedObjectProvider.putToMap(clusterIdToMemberContextListMap, memberContext.getClusterId(),
                 memberContextList);
         if (log.isDebugEnabled()) {
             log.debug("Added member context to the cloud controller context: " + memberContext);
@@ -258,20 +258,20 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addScheduledFutureJob(String memberId, ScheduledFuture<?> job) {
-        distributedObjectHandler.putToMap(memberIdToScheduledTaskMap, memberId, job);
+        distributedObjectProvider.putToMap(memberIdToScheduledTaskMap, memberId, job);
     }
 
     public List<MemberContext> removeMemberContextsOfCluster(String clusterId) {
         List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId);
-        distributedObjectHandler.removeFromMap(clusterIdToMemberContextListMap, clusterId);
+        distributedObjectProvider.removeFromMap(clusterIdToMemberContextListMap, clusterId);
         if (memberContextList == null) {
             return new ArrayList<MemberContext>();
         }
         for (MemberContext memberContext : memberContextList) {
             String memberId = memberContext.getMemberId();
-            distributedObjectHandler.removeFromMap(memberIdToMemberContextMap, memberId);
+            distributedObjectProvider.removeFromMap(memberIdToMemberContextMap, memberId);
             ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId);
-            distributedObjectHandler.removeFromMap(memberIdToScheduledTaskMap, memberId);
+            distributedObjectProvider.removeFromMap(memberIdToScheduledTaskMap, memberId);
             stopTask(task);
 
             if (log.isDebugEnabled()) {
@@ -284,7 +284,7 @@ public class CloudControllerContext implements Serializable {
 
     public MemberContext removeMemberContext(String memberId, String clusterId) {
         MemberContext removedMemberContext = memberIdToMemberContextMap.get(memberId);
-        distributedObjectHandler.removeFromMap(memberIdToMemberContextMap, memberId);
+        distributedObjectProvider.removeFromMap(memberIdToMemberContextMap, memberId);
 
         List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId);
         if (memberContextList != null) {
@@ -298,10 +298,10 @@ public class CloudControllerContext implements Serializable {
                     iterator.remove();
                 }
             }
-            distributedObjectHandler.putToMap(clusterIdToMemberContextListMap, clusterId, newCtxts);
+            distributedObjectProvider.putToMap(clusterIdToMemberContextListMap, clusterId, newCtxts);
         }
         ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId);
-        distributedObjectHandler.removeFromMap(memberIdToScheduledTaskMap, memberId);
+        distributedObjectProvider.removeFromMap(memberIdToScheduledTaskMap, memberId);
         stopTask(task);
         return removedMemberContext;
     }
@@ -322,7 +322,7 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addClusterContext(ClusterContext ctxt) {
-        distributedObjectHandler.putToMap(clusterIdToContextMap, ctxt.getClusterId(), ctxt);
+        distributedObjectProvider.putToMap(clusterIdToContextMap, ctxt.getClusterId(), ctxt);
     }
 
     public ClusterContext getClusterContext(String clusterId) {
@@ -331,7 +331,7 @@ public class CloudControllerContext implements Serializable {
 
     public ClusterContext removeClusterContext(String clusterId) {
         ClusterContext removed = clusterIdToContextMap.get(clusterId);
-        distributedObjectHandler.removeFromMap(clusterIdToContextMap, clusterId);
+        distributedObjectProvider.removeFromMap(clusterIdToContextMap, clusterId);
         return removed;
     }
 
@@ -349,11 +349,11 @@ public class CloudControllerContext implements Serializable {
             list = new ArrayList<String>();
         }
         list.add(partitionId);
-        distributedObjectHandler.putToMap(cartridgeTypeToPartitionIdsMap, cartridgeType, list);
+        distributedObjectProvider.putToMap(cartridgeTypeToPartitionIdsMap, cartridgeType, list);
     }
 
     public void removeFromCartridgeTypeToPartitionIds(String cartridgeType) {
-        distributedObjectHandler.removeFromMap(cartridgeTypeToPartitionIdsMap, cartridgeType);
+        distributedObjectProvider.removeFromMap(cartridgeTypeToPartitionIdsMap, cartridgeType);
     }
 
     public KubernetesClusterContext getKubernetesClusterContext(String kubClusterId) {
@@ -361,7 +361,7 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addKubernetesClusterContext(KubernetesClusterContext kubernetesClusterContext) {
-        distributedObjectHandler.putToMap(kubClusterIdToKubClusterContextMap,
+        distributedObjectProvider.putToMap(kubClusterIdToKubClusterContextMap,
                 kubernetesClusterContext.getKubernetesClusterId(),
                 kubernetesClusterContext);
     }
@@ -420,13 +420,13 @@ public class CloudControllerContext implements Serializable {
 
     private void copyMap(Map sourceMap, Map destinationMap) {
         for(Object key : sourceMap.keySet()) {
-            distributedObjectHandler.putToMap(destinationMap, key, sourceMap.get(key));
+            distributedObjectProvider.putToMap(destinationMap, key, sourceMap.get(key));
         }
     }
 
     private void copyList(List sourceList, List destinationList) {
         for(Object item : sourceList) {
-            distributedObjectHandler.addToList(destinationList, item);
+            distributedObjectProvider.addToList(destinationList, item);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/dce11f62/components/org.apache.stratos.common/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/pom.xml b/components/org.apache.stratos.common/pom.xml
index 618eb5f..6c33f0d 100644
--- a/components/org.apache.stratos.common/pom.xml
+++ b/components/org.apache.stratos.common/pom.xml
@@ -46,6 +46,7 @@
                         <Bundle-Name>${project.artifactId}</Bundle-Name>
                         <Export-Package>
                             org.apache.stratos.common.*,
+                            org.apache.stratos.common.clustering.*
                             org.apache.stratos.common.statistics.publisher.*,
                         </Export-Package>
                         <Import-Package>

http://git-wip-us.apache.org/repos/asf/stratos/blob/dce11f62/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
new file mode 100644
index 0000000..fe47ca4
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.clustering;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IList;
+import com.hazelcast.core.ILock;
+import com.hazelcast.core.IMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Provides objects to be managed in distributed and non-distributed environments.
+ */
+public class DistributedObjectProvider {
+    private static final Log log = LogFactory.getLog(DistributedObjectProvider.class);
+
+    private final boolean clustered;
+    private final HazelcastInstance hazelcastInstance;
+
+    public DistributedObjectProvider(boolean clustered, HazelcastInstance hazelcastInstance) {
+        this.clustered = clustered;
+        this.hazelcastInstance = hazelcastInstance;
+    }
+
+    private com.hazelcast.core.ILock acquireDistributedLock(Object object) {
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName()));
+        }
+        ILock lock = hazelcastInstance.getLock(object);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName()));
+        }
+        return lock;
+    }
+
+    private void releaseDistributedLock(ILock lock) {
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Releasing distributed lock for %s...", lock.getKey()));
+        }
+        lock.forceUnlock();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Distributed lock released for %s", lock.getKey()));
+        }
+    }
+
+    /**
+     * If clustering is enabled returns a distributed map object, otherwise returns a
+     * concurrent local map object.
+     * @param key
+     * @return
+     */
+    public Map getMap(String key) {
+        if(clustered) {
+            return hazelcastInstance.getMap(key);
+        } else {
+            return new ConcurrentHashMap<Object, Object>();
+        }
+    }
+
+    /**
+     * If clustering is enabled returns a distributed list, otherwise returns
+     * a local array list.
+     * @param name
+     * @return
+     */
+    public List getList(String name) {
+        if(clustered) {
+            return hazelcastInstance.getList(name);
+        } else {
+            return new ArrayList();
+        }
+    }
+
+    /**
+     * Put a key value pair to a map, if clustered use a distributed lock.
+     * @param map
+     * @param key
+     * @param value
+     */
+    public void putToMap(Map map, Object key, Object value) {
+         if(clustered) {
+             ILock lock = null;
+             try {
+                 lock = acquireDistributedLock(map);
+                 ((IMap)map).set(key, value);
+             } finally {
+                 releaseDistributedLock(lock);
+             }
+         } else {
+            map.put(key, value);
+         }
+    }
+
+    /**
+     * Remove an object from a map, if clustered use a distributed lock.
+     * @param map
+     * @param key
+     */
+    public void removeFromMap(Map map, Object key) {
+        if(clustered) {
+            ILock lock = null;
+            try {
+                lock = acquireDistributedLock(map);
+                ((IMap)map).delete(key);
+            } finally {
+                releaseDistributedLock(lock);
+            }
+        } else {
+            map.remove(key);
+        }
+    }
+
+    /**
+     * Add an object to a list, if clustered use a distributed lock.
+     * @param list
+     * @param value
+     */
+    public void addToList(List list, Object value) {
+        if(clustered) {
+            ILock lock = null;
+            try {
+                lock = acquireDistributedLock(list);
+                ((IList)list).add(value);
+            } finally {
+                releaseDistributedLock(lock);
+            }
+        } else {
+            list.add(value);
+        }
+    }
+
+    /**
+     * Remove an object from a list, if clustered use a distributed lock.
+     * @param list
+     * @param value
+     */
+    public void removeFromList(List list, Object value) {
+        if(clustered) {
+            ILock lock = null;
+            try {
+                lock = acquireDistributedLock(list);
+                ((IList)list).remove(value);
+            } finally {
+                releaseDistributedLock(lock);
+            }
+        } else {
+            list.remove(value);
+        }
+    }
+}