You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2016/09/06 17:05:49 UTC
karaf-cellar git commit: [KARAF-4698] Improve synchronizers. This
closes #34
Repository: karaf-cellar
Updated Branches:
refs/heads/master 355b714e1 -> 790c22bd0
[KARAF-4698] Improve synchronizers. This closes #34
Project: http://git-wip-us.apache.org/repos/asf/karaf-cellar/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-cellar/commit/790c22bd
Tree: http://git-wip-us.apache.org/repos/asf/karaf-cellar/tree/790c22bd
Diff: http://git-wip-us.apache.org/repos/asf/karaf-cellar/diff/790c22bd
Branch: refs/heads/master
Commit: 790c22bd01f4999548b10520c2e993680623deb3
Parents: 355b714
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Tue Sep 6 19:05:09 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Tue Sep 6 19:05:09 2016 +0200
----------------------------------------------------------------------
.../karaf/cellar/bundle/BundleSynchronizer.java | 59 ++++++++++++++++++--
.../config/ConfigurationSynchronizer.java | 42 ++++++++++++--
.../cellar/features/FeaturesSynchronizer.java | 31 ++++++++--
3 files changed, 117 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/790c22bd/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java
----------------------------------------------------------------------
diff --git a/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java b/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java
index 6771d53..a234672 100644
--- a/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java
+++ b/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java
@@ -74,8 +74,12 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer {
}
if (policy.equalsIgnoreCase("cluster")) {
LOGGER.debug("CELLAR BUNDLE: sync policy set as 'cluster' for cluster group {}", group.getName());
- LOGGER.debug("CELLAR BUNDLE: updating node from the cluster (pull first)");
- pull(group);
+ if (clusterManager.listNodesByGroup(group).size() > 1) {
+ LOGGER.debug("CELLAR BUNDLE: updating node from the cluster (pull first)");
+ pull(group);
+ } else {
+ LOGGER.debug("CELLAR BUNDLE: node is the only one in the cluster group, no pull");
+ }
LOGGER.debug("CELLAR BUNDLE: updating cluster from the local node (push after)");
push(group);
} else if (policy.equalsIgnoreCase("node")) {
@@ -86,8 +90,12 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer {
pull(group);
} else if (policy.equalsIgnoreCase("clusterOnly")) {
LOGGER.debug("CELLAR BUNDLE: sync policy set as 'clusterOnly' for cluster group " + group.getName());
- LOGGER.debug("CELLAR BUNDLE: updating node from the cluster (pull only)");
- pull(group);
+ if (clusterManager.listNodesByGroup(group).size() > 1) {
+ LOGGER.debug("CELLAR BUNDLE: updating node from the cluster (pull only)");
+ pull(group);
+ } else {
+ LOGGER.debug("CELLAR BUNDLE: node is the only one in the cluster group, no pull");
+ }
} else if (policy.equalsIgnoreCase("nodeOnly")) {
LOGGER.debug("CELLAR BUNDLE: sync policy set as 'nodeOnly' for cluster group " + group.getName());
LOGGER.debug("CELLAR BUNDLE: updating cluster from the local node (push only)");
@@ -113,6 +121,7 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer {
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
+ // get the bundles on the cluster to update local bundles
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
for (Map.Entry<String, BundleState> entry : clusterBundles.entrySet()) {
String id = entry.getKey();
@@ -152,6 +161,18 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer {
}
}
}
+ // cleanup the local bundles not present on the cluster
+ for (Bundle bundle : bundleContext.getBundles()) {
+ String id = getId(bundle);
+ if (!clusterBundles.containsKey(id)) {
+ // the bundle is not present on the cluster, so it has to be uninstalled locally
+ try {
+ bundle.uninstall();
+ } catch (Exception e) {
+ LOGGER.warn("Can't uninstall {}", id, e);
+ }
+ }
+ }
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
@@ -183,13 +204,15 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer {
BundleContext bundleContext = ((BundleReference) getClass().getClassLoader()).getBundle().getBundleContext();
bundles = bundleContext.getBundles();
+ // push local bundles to the cluster
for (Bundle bundle : bundles) {
long bundleId = bundle.getBundleId();
String symbolicName = bundle.getSymbolicName();
String version = bundle.getHeaders().get(org.osgi.framework.Constants.BUNDLE_VERSION);
String bundleLocation = bundle.getLocation();
int status = bundle.getState();
- String id = symbolicName + "/" + version;
+
+ String id = getId(bundle);
// check if the pid is marked as local.
if (isAllowed(group, Constants.CATEGORY, bundleLocation, EventType.OUTBOUND)) {
@@ -234,6 +257,20 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer {
} else LOGGER.trace("CELLAR BUNDLE: bundle {} is marked BLOCKED OUTBOUND for cluster group {}", bundleLocation, groupName);
}
+ // clean bundles on the cluster not present locally
+ for (String id : clusterBundles.keySet()) {
+ boolean found = false;
+ for (Bundle bundle : bundleContext.getBundles()) {
+ String localBundleId = getId(bundle);
+ if (id.equals(localBundleId)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ clusterBundles.remove(id);
+ }
+ }
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
@@ -241,6 +278,18 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer {
}
/**
+ * Return the Cellar bundle ID for a given bundle.
+ *
+ * @param bundle The bundle.
+ * @return The Cellar bundle ID.
+ */
+ private String getId(Bundle bundle) {
+ String symbolicName = bundle.getSymbolicName();
+ String version = bundle.getHeaders().get(org.osgi.framework.Constants.BUNDLE_VERSION);
+ return symbolicName + "/" + version;
+ }
+
+ /**
* Get the bundle sync policy for the given cluster group.
*
* @param group the cluster group.
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/790c22bd/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java
----------------------------------------------------------------------
diff --git a/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java b/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java
index 75cb4c3..6bab460 100644
--- a/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java
+++ b/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java
@@ -72,8 +72,12 @@ public class ConfigurationSynchronizer extends ConfigurationSupport implements S
}
if (policy.equalsIgnoreCase("cluster")) {
LOGGER.debug("CELLAR CONFIG: sync policy set as 'cluster' for cluster group {}", group.getName());
- LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull first)");
- pull(group);
+ if (clusterManager.listNodesByGroup(group).size() > 1) {
+ LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull first)");
+ pull(group);
+ } else {
+ LOGGER.debug("CELLAR CONFIG: node is the first one in the cluster group, no pull");
+ }
LOGGER.debug("CELLAR CONFIG: updating cluster from the local node (push after)");
push(group);
} else if (policy.equalsIgnoreCase("node")) {
@@ -84,8 +88,12 @@ public class ConfigurationSynchronizer extends ConfigurationSupport implements S
pull(group);
} else if (policy.equalsIgnoreCase("clusterOnly")) {
LOGGER.debug("CELLAR CONFIG: sync policy set as 'clusterOnly' for cluster group " + group.getName());
- LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull only)");
- pull(group);
+ if (clusterManager.listNodesByGroup(group).size() > 1) {
+ LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull only)");
+ pull(group);
+ } else {
+ LOGGER.debug("CELLAR CONFIG: node is the first one in the cluster group, no pull");
+ }
} else if (policy.equalsIgnoreCase("nodeOnly")) {
LOGGER.debug("CELLAR CONFIG: sync policy set as 'nodeOnly' for cluster group " + group.getName());
LOGGER.debug("CELLAR CONFIG: updating cluster from the local node (push only)");
@@ -112,6 +120,7 @@ public class ConfigurationSynchronizer extends ConfigurationSupport implements S
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ // get configurations on the cluster to update local configurations
for (String pid : clusterConfigurations.keySet()) {
if (isAllowed(group, Constants.CATEGORY, pid, EventType.INBOUND)) {
Dictionary clusterDictionary = clusterConfigurations.get(pid);
@@ -133,6 +142,17 @@ public class ConfigurationSynchronizer extends ConfigurationSupport implements S
}
} else LOGGER.trace("CELLAR CONFIG: configuration with PID {} is marked BLOCKED INBOUND for cluster group {}", pid, groupName);
}
+ // cleanup the local configurations not present on the cluster
+ try {
+ for (Configuration configuration : configurationAdmin.listConfigurations(null)) {
+ String pid = configuration.getPid();
+ if (!clusterConfigurations.containsKey(pid)) {
+ configuration.delete();
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Can't get local configurations", e);
+ }
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
@@ -162,6 +182,7 @@ public class ConfigurationSynchronizer extends ConfigurationSupport implements S
Configuration[] localConfigurations;
try {
localConfigurations = configurationAdmin.listConfigurations(null);
+ // push local configurations to the cluster
for (Configuration localConfiguration : localConfigurations) {
String pid = localConfiguration.getPid();
// check if the pid is marked as local.
@@ -195,6 +216,19 @@ public class ConfigurationSynchronizer extends ConfigurationSupport implements S
} else
LOGGER.trace("CELLAR CONFIG: configuration with PID {} is marked BLOCKED OUTBOUND for cluster group {}", pid, groupName);
}
+ // clean configurations on the cluster not present locally
+ for (String pid : clusterConfigurations.keySet()) {
+ boolean found = false;
+ for (Configuration configuration : configurationAdmin.listConfigurations(null)) {
+ if (configuration.getPid().equals(pid)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ clusterConfigurations.remove(pid);
+ }
+ }
} catch (IOException ex) {
LOGGER.error("CELLAR CONFIG: failed to read configuration (IO error)", ex);
} catch (InvalidSyntaxException ex) {
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/790c22bd/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java
----------------------------------------------------------------------
diff --git a/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java b/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java
index be4df54..c25faf8 100644
--- a/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java
+++ b/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java
@@ -77,8 +77,12 @@ public class FeaturesSynchronizer extends FeaturesSupport implements Synchronize
}
if (policy.equalsIgnoreCase("cluster")) {
LOGGER.debug("CELLAR FEATURE: sync policy set as 'cluster' for cluster group {}", group.getName());
- LOGGER.debug("CELLAR FEATURE: updating node from the cluster (pull first)");
- pull(group);
+ if (clusterManager.listNodesByGroup(group).size() > 1) {
+ LOGGER.debug("CELLAR FEATURE: updating node from the cluster (pull first)");
+ pull(group);
+ } else {
+ LOGGER.debug("CELLAR FEATURE: node is the first one in the cluster group, no pull");
+ }
LOGGER.debug("CELLAR FEATURE: updating cluster from the local node (push after)");
push(group);
} else if (policy.equalsIgnoreCase("node")) {
@@ -89,8 +93,12 @@ public class FeaturesSynchronizer extends FeaturesSupport implements Synchronize
pull(group);
} else if (policy.equalsIgnoreCase("clusterOnly")) {
LOGGER.debug("CELLAR FEATURE: sync policy set as 'clusterOnly' for cluster group " + group.getName());
- LOGGER.debug("CELLAR FEATURE: updating node from the cluster (pull only)");
- pull(group);
+ if (clusterManager.listNodesByGroup(group).size() > 1) {
+ LOGGER.debug("CELLAR FEATURE: updating node from the cluster (pull only)");
+ pull(group);
+ } else {
+ LOGGER.debug("CELLAR FEATURE: node is the first one in the cluster group, no pull");
+ }
} else if (policy.equalsIgnoreCase("nodeOnly")) {
LOGGER.debug("CELLAR FEATURE: sync policy set as 'nodeOnly' for cluster group " + group.getName());
LOGGER.debug("CELLAR FEATURE: updating cluster from the local node (push only)");
@@ -118,8 +126,8 @@ public class FeaturesSynchronizer extends FeaturesSupport implements Synchronize
Map<String, String> clusterRepositories = clusterManager.getMap(Constants.REPOSITORIES_MAP + Configurations.SEPARATOR + groupName);
Map<String, FeatureState> clusterFeatures = clusterManager.getMap(Constants.FEATURES_MAP + Configurations.SEPARATOR + groupName);
- // get the features repositories URLs from the cluster group
if (clusterRepositories != null && !clusterRepositories.isEmpty()) {
+ // get the features repositories from the cluster to update locally
for (String url : clusterRepositories.keySet()) {
try {
if (!isRepositoryRegisteredLocally(url)) {
@@ -132,10 +140,21 @@ public class FeaturesSynchronizer extends FeaturesSupport implements Synchronize
LOGGER.error("CELLAR FEATURE: failed to add repository URL {}", url, e);
}
}
+ // cleanup the local features repositories not present on the cluster
+ try {
+ for (Repository repository : featuresService.listRepositories()) {
+ URI uri = repository.getURI();
+ if (!clusterRepositories.containsKey(uri.toString())) {
+ featuresService.removeRepository(uri);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Can't get local features repositories", e);
+ }
}
- // get the features from the cluster group
if (clusterFeatures != null && !clusterFeatures.isEmpty()) {
+ // get the features from the cluster group and update locally
for (FeatureState state : clusterFeatures.values()) {
String name = state.getName();
// check if feature is blocked