You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2018/10/07 05:19:22 UTC
[karaf-cellar] branch cellar-4.1.x updated: [KARAF-5562] Improve
cellar groups configuration synchronisation from hazelcast
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch cellar-4.1.x
in repository https://gitbox.apache.org/repos/asf/karaf-cellar.git
The following commit(s) were added to refs/heads/cellar-4.1.x by this push:
new 0ce1d86 [KARAF-5562] Improve cellar groups configuration synchronisation from hazelcast
0ce1d86 is described below
commit 0ce1d861327f763f824f62909eee97469572a441
Author: Thomas Draier <td...@jahia.com>
AuthorDate: Mon Jan 8 11:20:31 2018 +0100
[KARAF-5562] Improve cellar groups configuration synchronisation from hazelcast
---
.../cellar/hazelcast/HazelcastGroupManager.java | 442 +++++++++++++--------
1 file changed, 274 insertions(+), 168 deletions(-)
diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
index 96b923d..835b547 100644
--- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
+++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
@@ -13,15 +13,8 @@
*/
package org.apache.karaf.cellar.hazelcast;
-import java.io.IOException;
-import java.util.*;
-
import com.hazelcast.core.*;
-import org.apache.karaf.cellar.core.Configurations;
-import org.apache.karaf.cellar.core.Group;
-import org.apache.karaf.cellar.core.GroupManager;
-import org.apache.karaf.cellar.core.Node;
-import org.apache.karaf.cellar.core.Synchronizer;
+import org.apache.karaf.cellar.core.*;
import org.apache.karaf.cellar.core.event.EventConsumer;
import org.apache.karaf.cellar.core.event.EventProducer;
import org.apache.karaf.cellar.core.event.EventTransportFactory;
@@ -36,22 +29,26 @@ import org.osgi.service.cm.ConfigurationEvent;
import org.osgi.service.cm.ConfigurationListener;
import org.slf4j.Logger;
+import java.io.IOException;
+import java.util.*;
+
/**
* A group manager implementation powered by Hazelcast.
* The role of this class is to provide means of creating groups, setting nodes to groups etc.
* Keep in sync the distributed group configuration with the locally persisted.
*/
-public class HazelcastGroupManager implements GroupManager, EntryListener, ConfigurationListener {
+public class HazelcastGroupManager implements GroupManager, EntryListener<String,Object>, ConfigurationListener {
private static final transient Logger LOGGER = org.slf4j.LoggerFactory.getLogger(HazelcastGroupManager.class);
- private static final String GROUPS = "org.apache.karaf.cellar.groups";
- private static final String GROUPS_CONFIG = "org.apache.karaf.cellar.groups.config";
- private static final String DEFAULT_GROUP = "default";
+ private static final String HAZELCAST_GROUPS = "org.apache.karaf.cellar.groups";
+ private static final String HAZELCAST_GROUPS_CONFIG = "org.apache.karaf.cellar.groups.config";
private Map<String, ServiceRegistration> producerRegistrations = new HashMap<String, ServiceRegistration>();
private Map<String, ServiceRegistration> consumerRegistrations = new HashMap<String, ServiceRegistration>();
+ private Map<String, Object> localConfig = new HashMap<String, Object>();
+
private Map<String, EventProducer> groupProducers = new HashMap<String, EventProducer>();
private Map<String, EventConsumer> groupConsumer = new HashMap<String, EventConsumer>();
@@ -64,47 +61,140 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
private CombinedClassLoader combinedClassLoader;
public void init() {
- // create a listener for group configuration.
- IMap groupConfiguration = instance.getMap(GROUPS_CONFIG);
- groupConfiguration.addEntryListener(this, true);
try {
// create group stored in configuration admin
- Configuration configuration = configurationAdmin.getConfiguration(Configurations.GROUP, null);
- if (configuration != null) {
- Dictionary<String, Object> properties = configuration.getProperties();
- if (properties == null) {
+ Configuration groupsConfiguration = getConfigurationForGroups();
+ Dictionary<String, Object> properties = groupsConfiguration.getProperties();
+
+ // create a listener for group configuration.
+ IMap<String,Object> hazelcastGroupsConfig = getClusterGroupsConfig();
+
+ hazelcastGroupsConfig.addEntryListener(this, true);
+
+ if (hazelcastGroupsConfig.isEmpty()) {
+ // First one to be here - initialize hazelcast map with local configuration
+ LOGGER.debug("CELLAR HAZELCAST: intialize cluster with local config");
+
+ Map<String, Object> updates = getUpdatesForHazelcastMap(properties);
+ hazelcastGroupsConfig.putAll(updates);
+ } else {
+ if (properties != null) {
+ Enumeration<String> en = properties.keys();
+ while (en.hasMoreElements()) {
+ String key = en.nextElement();
+ localConfig.put(key , properties.get(key));
+ }
+ } else {
properties = new Hashtable<String, Object>();
}
- String groups = (String) properties.get(Configurations.GROUPS_KEY);
- Set<String> groupNames = convertStringToSet(groups);
- if (groupNames != null && !groupNames.isEmpty()) {
- for (String groupName : groupNames) {
- createGroup(groupName);
- }
+ boolean updated = false;
+ for (String key : hazelcastGroupsConfig.keySet()) {
+ Object value = hazelcastGroupsConfig.get(key);
+ updated |= updatePropertiesFromHazelcastMap(properties, key, value);
+ }
+ if (updated) {
+ updateConfiguration(groupsConfiguration, properties);
}
}
+
+ Node node = getNode();
+
+ // add group membership from configuration
+ properties = getConfigurationForNode().getProperties();
+ Set<String> groupNames = convertStringToSet(properties != null ? (String) properties.get(Configurations.GROUPS_KEY) : null);
+ getClusterGroups().put(node, groupNames);
} catch (IOException e) {
LOGGER.warn("CELLAR HAZELCAST: can't create cluster group from configuration admin", e);
}
- try {
- // add group membership from configuration
- Configuration configuration = configurationAdmin.getConfiguration(Configurations.NODE, null);
- if (configuration != null) {
- Dictionary<String, Object> properties = configuration.getProperties();
- if (properties == null) {
- properties = new Hashtable<String, Object>();
+ }
+
+ private boolean updatePropertiesFromHazelcastMap(Dictionary<String, Object> properties, String key, Object value) {
+ if (!(value instanceof Map)) {
+ return false;
+ }
+ boolean changed = false;
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map = (Map<String, Object>) value;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ String entryKey = entry.getKey();
+ Object entryValue = entry.getValue();
+ if (entryKey.equals(".change")) {
+ Set<String> groups = convertStringToSet((String) properties.get(Configurations.GROUPS_KEY));
+ if ((entryValue.equals("added") && !groups.contains(key))
+ || entryValue.equals("removed") && groups.contains(key)) {
+ LOGGER.debug("CELLAR HAZELCAST: get group " + key + " configuration from cluster : " + key
+ + " has been " + entryValue);
+ if (entryValue.equals("added")) {
+ groups.add(key);
+ } else {
+ groups.remove(key);
+ }
+ String newValue = convertSetToString(groups);
+ properties.put(Configurations.GROUPS_KEY, newValue);
+ localConfig.put(Configurations.GROUPS_KEY, newValue);
+ changed = true;
}
- String groups = (String) properties.get(Configurations.GROUPS_KEY);
- Set<String> groupNames = convertStringToSet(groups);
- if (groupNames != null && !groupNames.isEmpty()) {
- for (String groupName : groupNames) {
- registerGroup(groupName);
+ } else if (properties.get(entryKey) == null || !properties.get(entryKey).equals(entryValue)) {
+ LOGGER.debug("CELLAR HAZELCAST: get group " + key + " configuration from cluster : " + entryKey + " = "
+ + entryValue);
+ properties.put(entryKey, entryValue);
+ localConfig.put(entryKey, entryValue);
+ changed = true;
+ }
+ }
+ return changed;
+ }
+
+ private Map<String, Object> getUpdatesForHazelcastMap(Dictionary<String, Object> properties) {
+ Map<String,Object> updates = new HashMap<String,Object>();
+ if (properties == null) {
+ return updates;
+ }
+ Enumeration<String> en = properties.keys();
+ while (en.hasMoreElements()) {
+ String key = en.nextElement();
+ Object value = properties.get(key);
+
+ if (!key.startsWith("felix.") && !key.startsWith("service.")) {
+ Object localValue = localConfig.get(key);
+ if (localValue == null || !localValue.equals(value)) {
+ if (key.equals(Configurations.GROUPS_KEY)) {
+ Set<String> removedGroups = convertStringToSet((String) localValue);
+ Set<String> addedGroups = convertStringToSet((String) value);
+ addedGroups.removeAll(removedGroups);
+ removedGroups.removeAll(convertStringToSet((String) value));
+ for (String addedGroup : addedGroups) {
+ getOrCreateMap(updates, addedGroup).put(".change", "added");
+ }
+ for (String removedGroup : removedGroups) {
+ getOrCreateMap(updates, removedGroup).put(".change", "removed");
+ }
+ } else {
+ int idx = key.indexOf(".");
+ if (idx > 0) {
+ String groupKey = key.substring(0, idx);
+ getOrCreateMap(updates, groupKey).put(key, value);
+ } else {
+ LOGGER.warn("CELLAR HAZELCAST: found group property that is not prefixed with a group name: {}. Skipping it", key);
+ }
}
}
}
- } catch (IOException e) {
- LOGGER.error("CELLAR HAZELCAST: can't set group membership for the current node", e);
+
+ localConfig.put(key , value);
+
}
+ return updates;
+ }
+
+ private Map<String, Object> getOrCreateMap(Map<String, Object> updates, String group) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> props = (Map<String, Object>) updates.get(group);
+ if (props == null) {
+ props = new HashMap<String, Object>();
+ updates.put(group, props);
+ }
+ return props;
}
public void destroy() {
@@ -113,12 +203,9 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
Thread.currentThread().setContextClassLoader(combinedClassLoader);
// update the group
Node local = this.getNode();
- Set<Group> groups = this.listGroups(local);
- for (Group group : groups) {
- String groupName = group.getName();
- group.getNodes().remove(local);
- listGroups().put(groupName, group);
- }
+
+ getClusterGroups().remove(local);
+
// shutdown the group consumer/producers
for (Map.Entry<String, EventConsumer> consumerEntry : groupConsumer.entrySet()) {
EventConsumer consumer = consumerEntry.getValue();
@@ -153,18 +240,24 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(combinedClassLoader);
- Group group = listGroups().get(groupName);
+ Map<String, Group> listGroups = listGroups();
+ Group group = listGroups.get(groupName);
if (group == null) {
group = new Group(groupName);
- }
- if (!listGroups().containsKey(groupName)) {
- copyGroupConfiguration(Configurations.DEFAULT_GROUP_NAME, groupName);
- listGroups().put(groupName, group);
try {
- // store the group list to configuration admin
- persist(listGroups());
- } catch (Exception e) {
- LOGGER.warn("CELLAR HAZELCAST: can't store cluster group list", e);
+ Configuration configuration = getConfigurationForGroups();
+ if (configuration != null) {
+ Dictionary<String, Object> properties = configuration.getProperties();
+ if (properties != null && !properties.isEmpty()) {
+ properties = copyGroupConfiguration(Configurations.DEFAULT_GROUP_NAME + '.', groupName + '.', properties);
+ Set<String> groups = convertStringToSet((String) properties.get(Configurations.GROUPS_KEY));
+ groups.add(groupName);
+ properties.put(Configurations.GROUPS_KEY, convertSetToString(groups));
+ updateConfiguration(configuration, properties);
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.error("CELLAR HAZELCAST: failed to update cluster group configuration", e);
}
}
return group;
@@ -179,11 +272,15 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
try {
Thread.currentThread().setContextClassLoader(combinedClassLoader);
if (!groupName.equals(Configurations.DEFAULT_GROUP_NAME)) {
- listGroups().remove(groupName);
try {
// store the group list to configuration admin
- persist(listGroups());
- } catch (Exception e) {
+ Configuration configuration = getConfigurationForGroups();
+ Dictionary<String, Object> properties = configuration.getProperties();
+ Set<String> groups = convertStringToSet((String) properties.get(Configurations.GROUPS_KEY));
+ groups.remove(groupName);
+ properties.put(Configurations.GROUPS_KEY, convertSetToString(groups));
+ updateConfiguration(configuration, properties);
+ } catch (IOException e) {
LOGGER.warn("CELLAR HAZELCAST: can't store cluster group list", e);
}
}
@@ -192,24 +289,6 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
}
}
- /**
- * Store the group names in configuration admin.
- *
- * @param groups the list of group to store.
- * @throws Exception in case of storage failure.
- */
- private void persist(Map<String, Group> groups) throws Exception {
- Configuration configuration = configurationAdmin.getConfiguration(Configurations.GROUP, null);
- if (configuration != null) {
- Dictionary<String, Object> properties = configuration.getProperties();
- if (properties == null) {
- properties = new Hashtable<String, Object>();
- }
- properties.put(Configurations.GROUPS_KEY, convertSetToString(groups.keySet()));
- configuration.update(properties);
- }
- }
-
@Override
public Set<Group> listLocalGroups() {
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
@@ -264,8 +343,24 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
public Map<String, Group> listGroups() {
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
+ Map<String, Group> res = new HashMap<String, Group>();
Thread.currentThread().setContextClassLoader(combinedClassLoader);
- return instance.getMap(GROUPS);
+ Map<Node, Set<String>> nodes = getClusterGroups();
+
+ Set<String> groups = convertStringToSet((String) localConfig.get(Configurations.GROUPS_KEY));
+ groups.add(Configurations.DEFAULT_GROUP_NAME);
+
+ for (String groupName : groups) {
+ Group group = new Group(groupName);
+ res.put(groupName, group);
+ for (Map.Entry<Node, Set<String>> entry : nodes.entrySet()) {
+ if (entry.getValue().contains(groupName)) {
+ group.getNodes().add(entry.getKey());
+ }
+ }
+ }
+
+ return res;
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
@@ -278,15 +373,15 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
Thread.currentThread().setContextClassLoader(combinedClassLoader);
Set<Group> result = new HashSet<Group>();
- Map<String, Group> groupMap = instance.getMap(GROUPS);
- Collection<Group> groupCollection = groupMap.values();
- if (groupCollection != null && !groupCollection.isEmpty()) {
- for (Group group : groupCollection) {
- if (group.getNodes().contains(node)) {
- result.add(group);
- }
- }
+ Map<Node, Set<String>> groupMap = getClusterGroups();
+ Set<String> groupNames = groupMap.get(node);
+
+ if (groupNames != null) {
+ Map<String, Group> g = listGroups();
+ g.keySet().retainAll(groupNames);
+ result.addAll(g.values());
}
+
return result;
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
@@ -336,7 +431,6 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
try {
Thread.currentThread().setContextClassLoader(combinedClassLoader);
String groupName = group.getName();
- createGroup(groupName);
LOGGER.debug("CELLAR HAZELCAST: registering cluster group {}.", groupName);
Properties serviceProperties = new Properties();
@@ -366,12 +460,17 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
consumerRegistrations.put(groupName, consumerRegistration);
}
- group.getNodes().add(getNode());
- listGroups().put(groupName, group);
+ Node node = getNode();
+ group.getNodes().add(node);
+ Map<Node, Set<String>> map = getClusterGroups();
+ Set<String> groupNames = (Set<String>) map.get(node);
+ groupNames = new HashSet<String>(groupNames);
+ groupNames.add(groupName);
+ map.put(node, groupNames);
// add group to configuration
try {
- Configuration configuration = configurationAdmin.getConfiguration(Configurations.NODE, null);
+ Configuration configuration = getConfigurationForNode();
if (configuration != null) {
Dictionary<String, Object> properties = configuration.getProperties();
if (properties != null) {
@@ -388,7 +487,7 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
groups = groupName;
}
properties.put(Configurations.GROUPS_KEY, groups);
- configuration.update(properties);
+ updateConfiguration(configuration, properties);
}
}
} catch (IOException e) {
@@ -420,11 +519,7 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(combinedClassLoader);
- Group group = listGroups().get(groupName);
- if (group == null) {
- group = new Group(groupName);
- }
- registerGroup(group);
+ registerGroup(new Group(groupName));
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
@@ -476,9 +571,17 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
consumer.stop();
}
+ Node node = getNode();
+ group.getNodes().add(node);
+ Map<Node, Set<String>> map = getClusterGroups();
+ Set<String> groupNames = (Set<String>) map.get(node);
+ groupNames = new HashSet<String>(groupNames);
+ groupNames.remove(groupName);
+ map.put(node, groupNames);
+
// remove cluster group from configuration
try {
- Configuration configuration = configurationAdmin.getConfiguration(Configurations.NODE, null);
+ Configuration configuration = getConfigurationForNode();
Dictionary<String, Object> properties = configuration.getProperties();
String groups = (String) properties.get(Configurations.GROUPS_KEY);
if (groups == null || groups.isEmpty()) {
@@ -489,7 +592,7 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
groups = convertSetToString(groupNamesSet);
}
properties.put(Configurations.GROUPS_KEY, groups);
- configuration.update(properties);
+ updateConfiguration(configuration, properties);
} catch (IOException e) {
LOGGER.error("CELLAR HAZELCAST: failed to read cluster group configuration", e);
}
@@ -501,50 +604,24 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
/**
* Copy the configuration of a cluster {@link Group}.
*
- * <b>1.</b> Updates configuration admin from Hazelcast using source config.
- * <b>2.</b> Creates target configuration both on Hazelcast and configuration admin.
- *
- * @param sourceGroupName the source cluster group.
- * @param targetGroupName the target cluster group.
+ * @param sourceGroupPrefix the source cluster group prefix
+ * @param targetGroupPrefix the target cluster group prefix
+ * @return new properties object
*/
- public void copyGroupConfiguration(String sourceGroupName, String targetGroupName) {
- try {
- Configuration conf = configurationAdmin.getConfiguration(Configurations.GROUP, null);
- if (conf != null) {
-
- // get configuration from config admin
- Dictionary configAdminProperties = conf.getProperties();
- if (configAdminProperties == null) {
- configAdminProperties = new Properties();
- }
- // get configuration from Hazelcast
- Map<String, String> sourceGroupConfig = instance.getMap(GROUPS_CONFIG);
-
- // update local configuration from cluster
- for (Map.Entry<String, String> parentEntry : sourceGroupConfig.entrySet()) {
- configAdminProperties.put(parentEntry.getKey(), parentEntry.getValue());
- }
-
- Dictionary updatedProperties = new Properties();
- Enumeration keyEnumeration = configAdminProperties.keys();
- while (keyEnumeration.hasMoreElements()) {
- String key = (String) keyEnumeration.nextElement();
- String value = configAdminProperties.get(key).toString();
-
- if (key.startsWith(sourceGroupName)) {
- String newKey = key.replace(sourceGroupName, targetGroupName);
- updatedProperties.put(newKey, value);
- sourceGroupConfig.put(key, value);
- }
- updatedProperties.put(key, value);
- }
-
- conf.update(updatedProperties);
+ public Dictionary<String, Object> copyGroupConfiguration(String sourceGroupPrefix, String targetGroupPrefix, Dictionary<String, Object> properties) {
+ Dictionary<String, Object> updatedProperties = new Hashtable<String, Object>();
+ Enumeration<String> keyEnumeration = properties.keys();
+ while (keyEnumeration.hasMoreElements()) {
+ String key = keyEnumeration.nextElement();
+ Object value = properties.get(key);
+
+ if (key.startsWith(sourceGroupPrefix)) {
+ String newKey = targetGroupPrefix + key.substring(sourceGroupPrefix.length());
+ updatedProperties.put(newKey, value);
}
-
- } catch (IOException e) {
- LOGGER.error("CELLAR HAZELCAST: failed to read cluster group configuration", e);
+ updatedProperties.put(key, value);
}
+ return updatedProperties;
}
/**
@@ -554,7 +631,7 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
* @return the String corresponding to the Set.
*/
protected String convertSetToString(Set<String> set) {
- StringBuffer result = new StringBuffer();
+ StringBuilder result = new StringBuilder();
Iterator<String> groupIterator = set.iterator();
while (groupIterator.hasNext()) {
String name = groupIterator.next();
@@ -573,14 +650,14 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
* @return the Set corresponding to the String.
*/
protected Set<String> convertStringToSet(String string) {
- if (string == null)
- return Collections.EMPTY_SET;
- Set<String> result = new HashSet<String>();
+ if (string == null)
+ return Collections.emptySet();
+ Set<String> result = new TreeSet<String>();
String[] groupNames = string.split(",");
if (groupNames != null && groupNames.length > 0) {
for (String name : groupNames) {
- result.add(name);
+ result.add(name.trim());
}
} else {
result.add(string);
@@ -593,26 +670,35 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
*
* @param configurationEvent the local configuration event.
*/
+ @SuppressWarnings("unchecked")
@Override
public void configurationEvent(ConfigurationEvent configurationEvent) {
- String pid = configurationEvent.getPid();
- if (pid.equals(GROUPS)) {
- Map groupConfiguration = instance.getMap(GROUPS_CONFIG);
-
- try {
- Configuration conf = configurationAdmin.getConfiguration(GROUPS, null);
- Dictionary properties = conf.getProperties();
- Enumeration keyEnumeration = properties.keys();
- while (keyEnumeration.hasMoreElements()) {
- Object key = keyEnumeration.nextElement();
- Object value = properties.get(key);
- if (!groupConfiguration.containsKey(key) || groupConfiguration.get(key) == null || !groupConfiguration.get(key).equals(value)) {
- groupConfiguration.put(key, value);
+ if (!Configurations.GROUP.equals(configurationEvent.getPid())) {
+ return;
+ }
+ try {
+ Map<String, Object> hazelcastGroupConfig = getClusterGroupsConfig();
+ Configuration conf = getConfigurationForGroups();
+ Dictionary<String, Object> properties = conf.getProperties();
+ Map<String, Object> updates = getUpdatesForHazelcastMap(properties);
+ for (Map.Entry<String, Object> entry : updates.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ Object clusterValue = hazelcastGroupConfig.get(key);
+ if (clusterValue == null || !clusterValue.equals(value)) {
+ LOGGER.debug("CELLAR HAZELCAST : sending updates to cluster : " + key + " = " + value);
+ if (clusterValue != null && value instanceof Map) {
+ @SuppressWarnings("rawtypes")
+ Map<String, Object> newValue = new HashMap((Map) clusterValue);
+ newValue.putAll((Map<? extends String, ?>) value);
+ hazelcastGroupConfig.put(key, newValue);
+ } else {
+ hazelcastGroupConfig.put(key, value);
}
}
- } catch (Exception e) {
- LOGGER.warn("CELLAR HAZELCAST: failed to update cluster group configuration", e);
}
+ } catch (Exception e) {
+ LOGGER.warn("CELLAR HAZELCAST: failed to update cluster group configuration", e);
}
}
@@ -622,7 +708,7 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
* @param entryEvent entry event
*/
@Override
- public void entryAdded(EntryEvent entryEvent) {
+ public void entryAdded(EntryEvent<String,Object> entryEvent) {
entryUpdated(entryEvent);
}
@@ -632,7 +718,7 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
* @param entryEvent entry event
*/
@Override
- public void entryRemoved(EntryEvent entryEvent) {
+ public void entryRemoved(EntryEvent<String,Object> entryEvent) {
entryUpdated(entryEvent);
}
@@ -642,16 +728,16 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
* @param entryEvent entry event
*/
@Override
- public void entryUpdated(EntryEvent entryEvent) {
- LOGGER.debug("CELLAR HAZELCAST: cluster group configuration has been updated, updating local configuration");
+ public void entryUpdated(EntryEvent<String,Object> entryEvent) {
try {
- Configuration conf = configurationAdmin.getConfiguration(GROUPS, null);
- Dictionary props = conf.getProperties();
- Object key = entryEvent.getKey();
+ Configuration conf = getConfigurationForGroups();
+ Dictionary<String, Object> properties = conf.getProperties();
+ String key = entryEvent.getKey();
Object value = entryEvent.getValue();
- if (props.get(key) == null || !props.get(key).equals(value)) {
- props.put(key, value);
- conf.update(props);
+
+ if (updatePropertiesFromHazelcastMap(properties, key, value)) {
+ LOGGER.debug("CELLAR HAZELCAST: cluster group configuration has been updated, updating local configuration: {} = {}", key, value);
+ updateConfiguration(conf, properties);
}
} catch (Exception ex) {
LOGGER.warn("CELLAR HAZELCAST: failed to update local configuration", ex);
@@ -664,7 +750,7 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
* @param entryEvent entry event
*/
@Override
- public void entryEvicted(EntryEvent entryEvent) {
+ public void entryEvicted(EntryEvent<String,Object> entryEvent) {
entryUpdated(entryEvent);
}
@@ -718,4 +804,24 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
this.combinedClassLoader = combinedClassLoader;
}
+ private Configuration getConfigurationForGroups() throws IOException {
+ return configurationAdmin.getConfiguration(Configurations.GROUP, null);
+ }
+
+ private Configuration getConfigurationForNode() throws IOException {
+ return configurationAdmin.getConfiguration(Configurations.NODE, null);
+ }
+
+ private IMap<Node, Set<String>> getClusterGroups() {
+ return instance.getMap(HAZELCAST_GROUPS);
+ }
+
+ private IMap<String, Object> getClusterGroupsConfig() {
+ return instance.getMap(HAZELCAST_GROUPS_CONFIG);
+ }
+
+ private void updateConfiguration(Configuration cfg, Dictionary<String, Object> properties) throws IOException {
+ cfg.update(properties);
+ LOGGER.debug("CELLAR HAZELCAST: updated configuration with pid: {}", cfg.getPid());
+ }
}