You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by ja...@apache.org on 2013/10/31 15:59:07 UTC

svn commit: r1537519 - in /ace/trunk/org.apache.ace.repository: src/org/apache/ace/repository/task/RepositoryReplicationTask.java task.bnd

Author: jawi
Date: Thu Oct 31 14:59:07 2013
New Revision: 1537519

URL: http://svn.apache.org/r1537519
Log:
Some cleanups and minor improvements:

- log something when this task has done something;
- make use of a concurrent map instead of synchronized blocks;
- restructured the code to make it a little more readabile;
- do not export anything from the generated bundle (there's no need for this).


Modified:
    ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/task/RepositoryReplicationTask.java
    ace/trunk/org.apache.ace.repository/task.bnd

Modified: ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/task/RepositoryReplicationTask.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/task/RepositoryReplicationTask.java?rev=1537519&r1=1537518&r2=1537519&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/task/RepositoryReplicationTask.java (original)
+++ ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/task/RepositoryReplicationTask.java Thu Oct 31 14:59:07 2013
@@ -27,7 +27,8 @@ import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.servlet.http.HttpServletResponse;
 
@@ -40,102 +41,148 @@ import org.osgi.framework.ServiceReferen
 import org.osgi.service.log.LogService;
 
 /**
- * Repository replication task. Uses discovery to find the server it talks to.
- * Subsequently it checks which local repositories are configured and tries to
- * synchronize them with remote copies. Only pulls stuff in, it does not push
+ * Repository replication task. Uses discovery to find the server it talks to. Subsequently it checks which local
+ * repositories are configured and tries to synchronize them with remote copies. Only pulls stuff in, it does not push
  * stuff out.
  */
 public class RepositoryReplicationTask implements Runnable {
+    private final ConcurrentMap<ServiceReference, RepositoryReplication> m_replicators = new ConcurrentHashMap<ServiceReference, RepositoryReplication>();
+
     private volatile Discovery m_discovery;
     private volatile ConnectionFactory m_connectionFactory;
     private volatile LogService m_log;
-    private final Map<ServiceReference, RepositoryReplication> m_replicators = new HashMap<ServiceReference, RepositoryReplication>();
-    
+
+    /**
+     * Called by Felix DM when a {@link RepositoryReplication} service becomes available.
+     */
     public void add(ServiceReference ref, RepositoryReplication service) {
-        synchronized (m_replicators) {
-            m_replicators.put(ref,  service);
+        if (m_replicators.putIfAbsent(ref, service) != null) {
+            m_log.log(LogService.LOG_WARNING, "Ignoring duplicate repository replication service for '" + ref.getProperty("name") + "'!");
         }
     }
-    
-    public void remove(ServiceReference ref) {
-        synchronized (m_replicators) {
-            m_replicators.remove(ref);
+
+    /**
+     * Called by Felix DM when a {@link RepositoryReplication} service goes away.
+     */
+    public void remove(ServiceReference ref, RepositoryReplication service) {
+        if (!m_replicators.remove(ref, service)) {
+            m_log.log(LogService.LOG_WARNING, "Repository replication service '" + ref.getProperty("name") + "' not removed?!");
         }
     }
-    
+
+    /**
+     * Replicates all current known repositories.
+     */
     public void run() {
-        Entry<ServiceReference,RepositoryReplication>[] replicators;
-        synchronized (m_replicators) {
-            Set<Entry<ServiceReference,RepositoryReplication>> entries = m_replicators.entrySet();
-            replicators = entries.toArray(new Entry[entries.size()]);
+        // Take a snapshot of the current available replicators...
+        Map<ServiceReference, RepositoryReplication> replicators = new HashMap<ServiceReference, RepositoryReplication>(m_replicators);
+
+        // The URL to the server to replicate...
+        URL master = m_discovery.discover();
+
+        for (Entry<ServiceReference, RepositoryReplication> entry : replicators.entrySet()) {
+            RepositoryReplication repository = entry.getValue();
+            ServiceReference ref = entry.getKey();
+
+            try {
+                replicate(master, ref, repository);
+            }
+            catch (Exception e) {
+                m_log.log(LogService.LOG_WARNING, "Replicating repository '" + ref.getProperty("name") + "' failed!", e);
+            }
         }
-        
+    }
+
+    private HttpURLConnection createConnection(URL url) throws IOException {
+        return (HttpURLConnection) m_connectionFactory.createConnection(url);
+    }
+
+    private URL createGetURL(URL master, String customer, String name, long version) throws MalformedURLException {
+        return new URL(master, String.format("/replication/get?customer=%s&name=%s&version=%d", customer, name, version));
+    }
+
+    private URL createQueryURL(URL master, String customer, String name) throws MalformedURLException {
+        return new URL(master, String.format("/replication/query?customer=%s&name=%s", customer, name));
+    }
+
+    private boolean replicateRepository(URL master, String customer, String name, RepositoryReplication repository, HttpURLConnection queryConn) throws IOException {
+        SortedRangeSet localRange = repository.getRange();
+        boolean result = false;
+
+        BufferedReader reader = new BufferedReader(new InputStreamReader(queryConn.getInputStream()));
         try {
-            for (Entry<ServiceReference,RepositoryReplication> entry : replicators) {
-                replicate(entry);
+            String line = reader.readLine();
+            int i = line.lastIndexOf(',');
+            if (i <= 0) {
+                return result;
             }
+
+            SortedRangeSet remoteRange = new SortedRangeSet(line.substring(i + 1));
+
+            // check the limit of the repository
+            long limit = repository.getLimit();
+            if (limit == Long.MAX_VALUE) {
+                // no limit, sync all
+                SortedRangeSet delta = localRange.diffDest(remoteRange);
+                RangeIterator iterator = delta.iterator();
+                while (iterator.hasNext()) {
+                    long version = iterator.next();
+                    replicateVersion(master, customer, name, repository, version);
+                    result = true;
+                }
+            }
+            else {
+                // limit, try to get the the 'limit' newest versions
+                SortedRangeSet union = localRange.union(remoteRange);
+                RangeIterator iterator = union.reverseIterator();
+                while (iterator.hasNext() && limit > 0) {
+                    long version = iterator.next();
+                    if (!localRange.contains(version)) {
+                        replicateVersion(master, customer, name, repository, version);
+                    }
+                    limit--;
+                    result = true;
+                }
+            }
+
+            return result;
         }
-        catch (Exception e) {
-            m_log.log(LogService.LOG_WARNING, "Error while replicating", e);
+        finally {
+            reader.close();
         }
     }
 
-    private void replicate(Entry<ServiceReference, RepositoryReplication> entry) throws MalformedURLException, IOException {
-        ServiceReference ref = entry.getKey();
-        RepositoryReplication repository = entry.getValue();
-        String filter = "customer=" + ref.getProperty("customer") + "&name=" + ref.getProperty("name");
-        URL host = m_discovery.discover();
-        URL query = new URL(host, "/replication/query?" + filter);
-   
-        HttpURLConnection connection = (HttpURLConnection) m_connectionFactory.createConnection(query);
-        if (connection.getResponseCode() == HttpServletResponse.SC_OK) {
-            SortedRangeSet localRange = repository.getRange();
-   
-            BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
-            try {
-                String line = reader.readLine();
-                int i = line.lastIndexOf(',');
-                if (i > 0) {
-                    SortedRangeSet remoteRange = new SortedRangeSet(line.substring(i + 1));
-        
-                    // check the limit of the repository
-                    long limit = repository.getLimit();
-                    if (limit == Long.MAX_VALUE) {
-                        // no limit, sync all
-                        SortedRangeSet delta = localRange.diffDest(remoteRange);
-                        RangeIterator iterator = delta.iterator();
-                        while (iterator.hasNext()) {
-                            long version = iterator.next();
-                            URL get = new URL(host, "/replication/get?" + filter + "&version=" + version);
-                            HttpURLConnection connection2 = (HttpURLConnection) m_connectionFactory.createConnection(get);
-                            repository.put(connection2.getInputStream(), version);
-                        }
-                    }
-                    else {
-                        // limit, try to get the the 'limit' newest versions
-                        SortedRangeSet union = localRange.union(remoteRange);
-                        RangeIterator iterator = union.reverseIterator();
-                        while (iterator.hasNext() && limit > 0) {
-                            long version = iterator.next();
-                            if (!localRange.contains(version)) {
-                                URL get = new URL(host, "/replication/get?" + filter + "&version=" + version);
-                                HttpURLConnection connection2 = (HttpURLConnection) m_connectionFactory.createConnection(get);
-                                repository.put(connection2.getInputStream(), version);
-                            }
-                            limit--;
-                        }
-                    }
+    private void replicate(URL master, ServiceReference ref, RepositoryReplication repository) throws IOException {
+        String customer = (String) ref.getProperty("customer");
+        String name = (String) ref.getProperty("name");
+
+        HttpURLConnection connection = createConnection(createQueryURL(master, customer, name));
+        try {
+            int rc = connection.getResponseCode();
+            if (rc == HttpServletResponse.SC_OK) {
+                if (replicateRepository(master, customer, name, repository, connection)) {
+                    m_log.log(LogService.LOG_DEBUG, String.format("Repository '%s' (%s) successfully replicated...", name, customer));
                 }
             }
-            catch (Exception e) {
-                m_log.log(LogService.LOG_WARNING, "Error parsing remote range", e);
-            }
-            finally {
-                reader.close();
+            else {
+                String msg = connection.getResponseMessage();
+                m_log.log(LogService.LOG_WARNING, String.format("Could not replicate repository '%s' (%s). Server response: %s (%d)", name, customer, msg, rc));
             }
         }
-        else {
-            m_log.log(LogService.LOG_WARNING, "Could not sync repository for customer: " + ref.getProperty("customer") + ", name: " + ref.getProperty("name") + ", because: " + connection.getResponseMessage() + " (" + connection.getResponseCode() + ")");
+        finally {
+            connection.disconnect();
+        }
+    }
+
+    private void replicateVersion(URL master, String customer, String name, RepositoryReplication repository, long version) throws IOException {
+        HttpURLConnection conn = createConnection(createGetURL(master, customer, name, version));
+        try {
+            repository.put(conn.getInputStream(), version);
+
+            m_log.log(LogService.LOG_DEBUG, String.format("\tVersion %d of repository '%s' (%s) successfully replicated...", version, name, customer));
+        }
+        finally {
+            conn.disconnect();
         }
     }
-}
\ No newline at end of file
+}

Modified: ace/trunk/org.apache.ace.repository/task.bnd
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.repository/task.bnd?rev=1537519&r1=1537518&r2=1537519&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.repository/task.bnd (original)
+++ ace/trunk/org.apache.ace.repository/task.bnd Thu Oct 31 14:59:07 2013
@@ -1,5 +1,5 @@
 Bundle-Activator: org.apache.ace.repository.task.Activator
-Export-Package: org.apache.ace.repository.task
+Private-Package: org.apache.ace.repository.task
 Bundle-Version: 1.0.0
 Bundle-Name: Apache ACE Repository Task
 Bundle-Description: Registers a runnable task for repository replication
\ No newline at end of file