You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by ja...@apache.org on 2013/10/31 15:59:07 UTC
svn commit: r1537519 - in /ace/trunk/org.apache.ace.repository:
src/org/apache/ace/repository/task/RepositoryReplicationTask.java task.bnd
Author: jawi
Date: Thu Oct 31 14:59:07 2013
New Revision: 1537519
URL: http://svn.apache.org/r1537519
Log:
Some cleanups and minor improvements:
- log something when this task has done something;
- make use of a concurrent map instead of synchronized blocks;
- restructured the code to make it a little more readabile;
- do not export anything from the generated bundle (there's no need for this).
Modified:
ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/task/RepositoryReplicationTask.java
ace/trunk/org.apache.ace.repository/task.bnd
Modified: ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/task/RepositoryReplicationTask.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/task/RepositoryReplicationTask.java?rev=1537519&r1=1537518&r2=1537519&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/task/RepositoryReplicationTask.java (original)
+++ ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/task/RepositoryReplicationTask.java Thu Oct 31 14:59:07 2013
@@ -27,7 +27,8 @@ import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import javax.servlet.http.HttpServletResponse;
@@ -40,102 +41,148 @@ import org.osgi.framework.ServiceReferen
import org.osgi.service.log.LogService;
/**
- * Repository replication task. Uses discovery to find the server it talks to.
- * Subsequently it checks which local repositories are configured and tries to
- * synchronize them with remote copies. Only pulls stuff in, it does not push
+ * Repository replication task. Uses discovery to find the server it talks to. Subsequently it checks which local
+ * repositories are configured and tries to synchronize them with remote copies. Only pulls stuff in, it does not push
* stuff out.
*/
public class RepositoryReplicationTask implements Runnable {
+ private final ConcurrentMap<ServiceReference, RepositoryReplication> m_replicators = new ConcurrentHashMap<ServiceReference, RepositoryReplication>();
+
private volatile Discovery m_discovery;
private volatile ConnectionFactory m_connectionFactory;
private volatile LogService m_log;
- private final Map<ServiceReference, RepositoryReplication> m_replicators = new HashMap<ServiceReference, RepositoryReplication>();
-
+
+ /**
+ * Called by Felix DM when a {@link RepositoryReplication} service becomes available.
+ */
public void add(ServiceReference ref, RepositoryReplication service) {
- synchronized (m_replicators) {
- m_replicators.put(ref, service);
+ if (m_replicators.putIfAbsent(ref, service) != null) {
+ m_log.log(LogService.LOG_WARNING, "Ignoring duplicate repository replication service for '" + ref.getProperty("name") + "'!");
}
}
-
- public void remove(ServiceReference ref) {
- synchronized (m_replicators) {
- m_replicators.remove(ref);
+
+ /**
+ * Called by Felix DM when a {@link RepositoryReplication} service goes away.
+ */
+ public void remove(ServiceReference ref, RepositoryReplication service) {
+ if (!m_replicators.remove(ref, service)) {
+ m_log.log(LogService.LOG_WARNING, "Repository replication service '" + ref.getProperty("name") + "' not removed?!");
}
}
-
+
+ /**
+ * Replicates all current known repositories.
+ */
public void run() {
- Entry<ServiceReference,RepositoryReplication>[] replicators;
- synchronized (m_replicators) {
- Set<Entry<ServiceReference,RepositoryReplication>> entries = m_replicators.entrySet();
- replicators = entries.toArray(new Entry[entries.size()]);
+ // Take a snapshot of the current available replicators...
+ Map<ServiceReference, RepositoryReplication> replicators = new HashMap<ServiceReference, RepositoryReplication>(m_replicators);
+
+ // The URL to the server to replicate...
+ URL master = m_discovery.discover();
+
+ for (Entry<ServiceReference, RepositoryReplication> entry : replicators.entrySet()) {
+ RepositoryReplication repository = entry.getValue();
+ ServiceReference ref = entry.getKey();
+
+ try {
+ replicate(master, ref, repository);
+ }
+ catch (Exception e) {
+ m_log.log(LogService.LOG_WARNING, "Replicating repository '" + ref.getProperty("name") + "' failed!", e);
+ }
}
-
+ }
+
+ private HttpURLConnection createConnection(URL url) throws IOException {
+ return (HttpURLConnection) m_connectionFactory.createConnection(url);
+ }
+
+ private URL createGetURL(URL master, String customer, String name, long version) throws MalformedURLException {
+ return new URL(master, String.format("/replication/get?customer=%s&name=%s&version=%d", customer, name, version));
+ }
+
+ private URL createQueryURL(URL master, String customer, String name) throws MalformedURLException {
+ return new URL(master, String.format("/replication/query?customer=%s&name=%s", customer, name));
+ }
+
+ private boolean replicateRepository(URL master, String customer, String name, RepositoryReplication repository, HttpURLConnection queryConn) throws IOException {
+ SortedRangeSet localRange = repository.getRange();
+ boolean result = false;
+
+ BufferedReader reader = new BufferedReader(new InputStreamReader(queryConn.getInputStream()));
try {
- for (Entry<ServiceReference,RepositoryReplication> entry : replicators) {
- replicate(entry);
+ String line = reader.readLine();
+ int i = line.lastIndexOf(',');
+ if (i <= 0) {
+ return result;
}
+
+ SortedRangeSet remoteRange = new SortedRangeSet(line.substring(i + 1));
+
+ // check the limit of the repository
+ long limit = repository.getLimit();
+ if (limit == Long.MAX_VALUE) {
+ // no limit, sync all
+ SortedRangeSet delta = localRange.diffDest(remoteRange);
+ RangeIterator iterator = delta.iterator();
+ while (iterator.hasNext()) {
+ long version = iterator.next();
+ replicateVersion(master, customer, name, repository, version);
+ result = true;
+ }
+ }
+ else {
+ // limit, try to get the the 'limit' newest versions
+ SortedRangeSet union = localRange.union(remoteRange);
+ RangeIterator iterator = union.reverseIterator();
+ while (iterator.hasNext() && limit > 0) {
+ long version = iterator.next();
+ if (!localRange.contains(version)) {
+ replicateVersion(master, customer, name, repository, version);
+ }
+ limit--;
+ result = true;
+ }
+ }
+
+ return result;
}
- catch (Exception e) {
- m_log.log(LogService.LOG_WARNING, "Error while replicating", e);
+ finally {
+ reader.close();
}
}
- private void replicate(Entry<ServiceReference, RepositoryReplication> entry) throws MalformedURLException, IOException {
- ServiceReference ref = entry.getKey();
- RepositoryReplication repository = entry.getValue();
- String filter = "customer=" + ref.getProperty("customer") + "&name=" + ref.getProperty("name");
- URL host = m_discovery.discover();
- URL query = new URL(host, "/replication/query?" + filter);
-
- HttpURLConnection connection = (HttpURLConnection) m_connectionFactory.createConnection(query);
- if (connection.getResponseCode() == HttpServletResponse.SC_OK) {
- SortedRangeSet localRange = repository.getRange();
-
- BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
- try {
- String line = reader.readLine();
- int i = line.lastIndexOf(',');
- if (i > 0) {
- SortedRangeSet remoteRange = new SortedRangeSet(line.substring(i + 1));
-
- // check the limit of the repository
- long limit = repository.getLimit();
- if (limit == Long.MAX_VALUE) {
- // no limit, sync all
- SortedRangeSet delta = localRange.diffDest(remoteRange);
- RangeIterator iterator = delta.iterator();
- while (iterator.hasNext()) {
- long version = iterator.next();
- URL get = new URL(host, "/replication/get?" + filter + "&version=" + version);
- HttpURLConnection connection2 = (HttpURLConnection) m_connectionFactory.createConnection(get);
- repository.put(connection2.getInputStream(), version);
- }
- }
- else {
- // limit, try to get the the 'limit' newest versions
- SortedRangeSet union = localRange.union(remoteRange);
- RangeIterator iterator = union.reverseIterator();
- while (iterator.hasNext() && limit > 0) {
- long version = iterator.next();
- if (!localRange.contains(version)) {
- URL get = new URL(host, "/replication/get?" + filter + "&version=" + version);
- HttpURLConnection connection2 = (HttpURLConnection) m_connectionFactory.createConnection(get);
- repository.put(connection2.getInputStream(), version);
- }
- limit--;
- }
- }
+ private void replicate(URL master, ServiceReference ref, RepositoryReplication repository) throws IOException {
+ String customer = (String) ref.getProperty("customer");
+ String name = (String) ref.getProperty("name");
+
+ HttpURLConnection connection = createConnection(createQueryURL(master, customer, name));
+ try {
+ int rc = connection.getResponseCode();
+ if (rc == HttpServletResponse.SC_OK) {
+ if (replicateRepository(master, customer, name, repository, connection)) {
+ m_log.log(LogService.LOG_DEBUG, String.format("Repository '%s' (%s) successfully replicated...", name, customer));
}
}
- catch (Exception e) {
- m_log.log(LogService.LOG_WARNING, "Error parsing remote range", e);
- }
- finally {
- reader.close();
+ else {
+ String msg = connection.getResponseMessage();
+ m_log.log(LogService.LOG_WARNING, String.format("Could not replicate repository '%s' (%s). Server response: %s (%d)", name, customer, msg, rc));
}
}
- else {
- m_log.log(LogService.LOG_WARNING, "Could not sync repository for customer: " + ref.getProperty("customer") + ", name: " + ref.getProperty("name") + ", because: " + connection.getResponseMessage() + " (" + connection.getResponseCode() + ")");
+ finally {
+ connection.disconnect();
+ }
+ }
+
+ private void replicateVersion(URL master, String customer, String name, RepositoryReplication repository, long version) throws IOException {
+ HttpURLConnection conn = createConnection(createGetURL(master, customer, name, version));
+ try {
+ repository.put(conn.getInputStream(), version);
+
+ m_log.log(LogService.LOG_DEBUG, String.format("\tVersion %d of repository '%s' (%s) successfully replicated...", version, name, customer));
+ }
+ finally {
+ conn.disconnect();
}
}
-}
\ No newline at end of file
+}
Modified: ace/trunk/org.apache.ace.repository/task.bnd
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.repository/task.bnd?rev=1537519&r1=1537518&r2=1537519&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.repository/task.bnd (original)
+++ ace/trunk/org.apache.ace.repository/task.bnd Thu Oct 31 14:59:07 2013
@@ -1,5 +1,5 @@
Bundle-Activator: org.apache.ace.repository.task.Activator
-Export-Package: org.apache.ace.repository.task
+Private-Package: org.apache.ace.repository.task
Bundle-Version: 1.0.0
Bundle-Name: Apache ACE Repository Task
Bundle-Description: Registers a runnable task for repository replication
\ No newline at end of file