You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by ja...@apache.org on 2013/10/29 12:11:16 UTC

svn commit: r1536664 - /ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java

Author: jawi
Date: Tue Oct 29 11:11:15 2013
New Revision: 1536664

URL: http://svn.apache.org/r1536664
Log:
ACE-389 - configure scaling with relays:

- cleaned up the log-sync task a bit, make sure all resources
  are properly closed, and that we're correctly synchronizing
  with the server (by checking the response codes).


Modified:
    ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java

Modified: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java?rev=1536664&r1=1536663&r2=1536664&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java (original)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java Tue Oct 29 11:11:15 2013
@@ -20,12 +20,14 @@ package org.apache.ace.log.server.task;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
+import java.net.ConnectException;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -33,6 +35,8 @@ import java.net.URLConnection;
 import java.util.ArrayList;
 import java.util.List;
 
+import javax.servlet.http.HttpServletResponse;
+
 import org.apache.ace.connectionfactory.ConnectionFactory;
 import org.apache.ace.discovery.Discovery;
 import org.apache.ace.feedback.Descriptor;
@@ -44,100 +48,163 @@ import org.osgi.service.log.LogService;
 
 public class LogSyncTask implements Runnable, LogSync {
 
+    public static enum Mode {
+        PUSH, PULL, PUSHPULL
+    }
+
     private static final String COMMAND_QUERY = "query";
     private static final String COMMAND_SEND = "send";
-    private static final String COMMAND_RECEIVE = "receive";
 
+    private static final String COMMAND_RECEIVE = "receive";
     private static final String TARGETID_KEY = "tid";
     @SuppressWarnings("unused")
     private static final String FILTER_KEY = "filter";
     private static final String LOGID_KEY = "logid";
-    private static final String RANGE_KEY = "range";
 
+    private static final String RANGE_KEY = "range";
     // injected by dependencymanager
     private volatile Discovery m_discovery;
     private volatile LogService m_log;
     private volatile LogStore m_logStore;
+
     private volatile ConnectionFactory m_connectionFactory;
-    
     private final String m_endpoint;
     private final String m_name;
-	private final Mode m_mode;
 
-    public static enum Mode { PUSH, PULL, PUSHPULL };
-    
+    private final Mode m_mode;;
+
     public LogSyncTask(String endpoint, String name, Mode mode) {
         m_endpoint = endpoint;
         m_name = name;
-		m_mode = mode;
+        m_mode = mode;
+    }
+
+    public String getName() {
+        return m_name;
+    }
+
+    public boolean pull() throws IOException {
+        return synchronize(false /* push */, true /* pull */);
+    }
+
+    public boolean push() throws IOException {
+        return synchronize(true /* push */, false /* pull */);
+    }
+
+    public boolean pushpull() throws IOException {
+        return synchronize(true /* push */, true /* pull */);
     }
 
     public void run() {
         try {
-        	switch (m_mode) {
-	        	case PULL:
-	        		pull();
-	        		break;
-	        	case PUSH:
-	        		push();
-	        		break;
-	        	case PUSHPULL:
-	        		pushpull();
-	        		break;
-        	}
+            switch (m_mode) {
+                case PULL:
+                    pull();
+                    break;
+                case PUSH:
+                    push();
+                    break;
+                case PUSHPULL:
+                    pushpull();
+                    break;
+            }
         }
         catch (MalformedURLException e) {
-            m_log.log(LogService.LOG_ERROR, "Unable to (" + m_mode.toString() + ") synchronize log (name=" + m_name + ") with remote");
+            m_log.log(LogService.LOG_ERROR, "Unable to (" + m_mode.toString() + ") synchronize log (name=" + m_name + ") with remote (malformed URL, incorrect configuration?)");
+        }
+        catch (ConnectException e) {
+            m_log.log(LogService.LOG_WARNING, "Unable to (" + m_mode.toString() + ") synchronize log (name=" + m_name + ") with remote (connection refused, remote not up?)");
         }
         catch (IOException e) {
             m_log.log(LogService.LOG_ERROR, "Unable to (" + m_mode.toString() + ") synchronize log (name=" + m_name + ") with remote", e);
         }
     }
 
-    public boolean pull() throws IOException {
-        return synchronize(false, true);
+    /**
+     * Calculates the difference between two lists of <code>Descriptor</code>. The result will contain whatever is not
+     * in <code>destination</code>, but is in <code>source</code>.
+     */
+    protected List<Descriptor> calculateDelta(List<Descriptor> source, List<Descriptor> destination) {
+        /*
+         * For each local descriptor, we try to find a matching remote one. If so, we will synchronize all events that
+         * the remote does not have. If we do not find a matching one at all, we send the complete local log.
+         */
+        List<Descriptor> result = new ArrayList<Descriptor>();
+        for (Descriptor s : source) {
+            Descriptor diffs = s;
+            for (Descriptor d : destination) {
+                if ((s.getStoreID() == d.getStoreID()) && (s.getTargetID().equals(d.getTargetID()))) {
+                    SortedRangeSet rangeDiff = d.getRangeSet().diffDest(s.getRangeSet());
+                    if (!isEmptyRangeSet(rangeDiff)) {
+                        diffs = new Descriptor(s.getTargetID(), s.getStoreID(), rangeDiff);
+                    }
+                    else {
+                        diffs = null;
+                    }
+                }
+            }
+            if (diffs != null) {
+                result.add(diffs);
+            }
+        }
+        return result;
     }
 
-    public boolean push() throws IOException {
-        return synchronize(true, false);
-    }
+    protected boolean doPull(URL host, List<Descriptor> localRanges, List<Descriptor> remoteRanges) {
+        List<Descriptor> delta = calculateDelta(remoteRanges, localRanges);
 
-    public boolean pushpull() throws IOException {
-        return synchronize(true, true);
-    }
+        boolean result = !delta.isEmpty();
+        for (Descriptor descriptor : delta) {
+            InputStream receiveInput = null;
+            URLConnection receiveConnection = null;
+            try {
+                /*
+                 * The request currently contains a range. This is not yet supported by the servlet, but it will simply
+                 * be ignored.
+                 */
+                URL url = createReceiveURL(host, descriptor);
 
-    /**
-     * Synchronizes the local store with the discovered remote one.
-     * @throws java.io.IOException
-     */
-    private boolean synchronize(boolean push, boolean pull) throws IOException {
-        URL host = m_discovery.discover();
+                receiveConnection = createConnection(url);
+                receiveInput = receiveConnection.getInputStream();
 
-        URLConnection queryConnection = m_connectionFactory.createConnection(new URL(host, m_endpoint + "/" + COMMAND_QUERY));
-        InputStream queryInput = queryConnection.getInputStream();
+                BufferedReader reader = new BufferedReader(new InputStreamReader(receiveInput));
+                try {
+                    readLogs(reader);
+                }
+                finally {
+                    reader.close();
+                }
 
-        List<Descriptor> localRanges = m_logStore.getDescriptors();
-        List<Descriptor> remoteRanges = getRanges(queryInput);
+                if (receiveConnection instanceof HttpURLConnection) {
+                    // Will cause a flush and reads the response from the server...
+                    result = ((HttpURLConnection) receiveConnection).getResponseCode() == HttpServletResponse.SC_OK;
+                }
 
-        boolean result = false;
-        if (push) {
-            result |= doPush(host, localRanges, remoteRanges);
-        }
-        if (pull) {
-            result |= doPull(host, localRanges, remoteRanges);
+                m_log.log(LogService.LOG_DEBUG, "Pulled log (" + m_name + ") successfully from remote...");
+            }
+            catch (IOException e) {
+                m_log.log(LogService.LOG_ERROR, "Unable to connect to retrieve log events.", e);
+            }
+            finally {
+                closeSilently(receiveInput);
+                closeSilently(receiveConnection);
+            }
         }
         return result;
     }
 
     protected boolean doPush(URL host, List<Descriptor> localRanges, List<Descriptor> remoteRanges) {
-        boolean result = false;
+        List<Descriptor> delta = calculateDelta(localRanges, remoteRanges);
+        boolean result = !delta.isEmpty();
+
         OutputStream sendOutput = null;
+        URLConnection sendConnection = null;
+
         try {
-            URLConnection sendConnection = m_connectionFactory.createConnection(new URL(host, m_endpoint + "/" + COMMAND_SEND));
-            
+            sendConnection = createConnection(createURL(host, COMMAND_SEND));
             if (sendConnection instanceof HttpURLConnection) {
                 // ACE-294: enable streaming mode causing only small amounts of memory to be
-                // used for this commit. Otherwise, the entire input stream is cached into 
+                // used for this commit. Otherwise, the entire input stream is cached into
                 // memory prior to sending it to the server...
                 ((HttpURLConnection) sendConnection).setChunkedStreamingMode(8192);
             }
@@ -146,180 +213,167 @@ public class LogSyncTask implements Runn
             sendOutput = sendConnection.getOutputStream();
 
             BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(sendOutput));
-            List<Descriptor> delta = calculateDelta(localRanges, remoteRanges);
-            result = !delta.isEmpty();
-            writeDelta(delta, writer);
-
-            sendOutput.flush();
-            sendOutput.close();
+            try {
+                writeDelta(delta, writer);
+            }
+            finally {
+                writer.close();
+            }
 
             if (sendConnection instanceof HttpURLConnection) {
-                HttpURLConnection conn = (HttpURLConnection) sendConnection;
-                conn.getContent();
-                conn.disconnect();
+                // Will cause a flush and reads the response from the server...
+                result = ((HttpURLConnection) sendConnection).getResponseCode() == HttpServletResponse.SC_OK;
             }
+
+            m_log.log(LogService.LOG_DEBUG, "Pushed log (" + m_name + ") successfully to remote...");
         }
         catch (IOException e) {
             m_log.log(LogService.LOG_ERROR, "Unable to (fully) synchronize log with remote", e);
         }
         finally {
-            if (sendOutput != null) {
+            closeSilently(sendOutput);
+            closeSilently(sendConnection);
+        }
+        return result;
+    }
+
+    protected List<Descriptor> getRanges(URL host) throws IOException {
+        List<Descriptor> result = new ArrayList<Descriptor>();
+
+        URLConnection queryConnection = null;
+        InputStream queryInput = null;
+        try {
+            queryConnection = createConnection(createURL(host, COMMAND_QUERY));
+            queryInput = queryConnection.getInputStream();
+
+            BufferedReader queryReader = new BufferedReader(new InputStreamReader(queryInput));
+
+            for (String line = queryReader.readLine(); line != null; line = queryReader.readLine()) {
                 try {
-                    sendOutput.close();
+                    result.add(new Descriptor(line));
                 }
-                catch (Exception ex) {
-                    // not much we can do
+                catch (IllegalArgumentException iae) {
+                    queryReader.close();
+
+                    throw new IOException("Could not determine highest remote event id, received malformed event range: " + line);
                 }
             }
         }
+        finally {
+            closeSilently(queryInput);
+            closeSilently(queryConnection);
+        }
         return result;
+
+    }
+
+    protected void readLogs(BufferedReader reader) {
+        try {
+            List<Event> events = new ArrayList<Event>();
+
+            String eventString = null;
+            while ((eventString = reader.readLine()) != null) {
+                try {
+                    events.add(new Event(eventString));
+                }
+                catch (IllegalArgumentException e) {
+                    // Just skip this one.
+                }
+            }
+            m_logStore.put(events);
+        }
+        catch (IOException e) {
+            m_log.log(LogService.LOG_DEBUG, "Error reading line from reader", e);
+        }
+
     }
 
     /**
      * Writes the difference between local and remote to a writer.
-     * @param descriptors A list of Descriptors that identifies all local log entries that need to be written.
-     * @param writer A writer to write to.
+     * 
+     * @param descriptors
+     *            A list of Descriptors that identifies all local log entries that need to be written.
+     * @param writer
+     *            A writer to write to.
      * @throws java.io.IOException
      */
     protected void writeDelta(List<Descriptor> descriptors, Writer writer) throws IOException {
         for (Descriptor l : descriptors) {
             writeDescriptor(l, writer);
         }
+        writer.flush();
     }
 
     /**
      * Writes the Events described by the descriptor to the writer.
-     * @param descriptor A Descriptor that identifies the events to be written.
-     * @param writer A writer to write the events to.
-     * @throws java.io.IOException Thrown when either the writer goes wrong, or there is a problem
-     * communicating with the local log store.
+     * 
+     * @param descriptor
+     *            A Descriptor that identifies the events to be written.
+     * @param writer
+     *            A writer to write the events to.
+     * @throws java.io.IOException
+     *             Thrown when either the writer goes wrong, or there is a problem communicating with the local log
+     *             store.
      */
     protected void writeDescriptor(Descriptor descriptor, Writer writer) throws IOException {
         List<Event> events = m_logStore.get(descriptor);
         for (Event event : events) {
             writer.write(event.toRepresentation() + "\n");
         }
-        writer.flush();
     }
 
-    protected boolean doPull(URL host, List<Descriptor> localRanges, List<Descriptor> remoteRanges) {
-        boolean result = false;
-        List<Descriptor> delta = calculateDelta(remoteRanges, localRanges);
-        result = !delta.isEmpty();
-        for (Descriptor l : delta) {
+    private void closeSilently(Closeable resource) {
+        if (resource != null) {
             try {
-                /*
-                 * The request currently contains a range. This is not yet supported by the servlet, but it will
-                 * simply be ignored.
-                 */
-                URL url = new URL(host, m_endpoint + "/" + COMMAND_RECEIVE + "?" + TARGETID_KEY + "=" + l.getTargetID() + "&" + LOGID_KEY + "=" + l.getStoreID() + "&" + RANGE_KEY + "=" + l.getRangeSet().toRepresentation());
-                
-                URLConnection receiveConnection = m_connectionFactory.createConnection(url);
-                InputStream receiveInput = receiveConnection.getInputStream();
-                
-                BufferedReader reader = new BufferedReader(new InputStreamReader(receiveInput));
-                readLogs(reader);
-                
-                if (receiveConnection instanceof HttpURLConnection) {
-                    HttpURLConnection conn = (HttpURLConnection) receiveConnection;
-                    conn.getContent();
-                    conn.disconnect();
-                }
+                resource.close();
             }
-            catch (IOException e) {
-                m_log.log(LogService.LOG_ERROR, "Unable to connect to retrieve log events.", e);
+            catch (IOException exception) {
+                // Ignore, not much we can do...
             }
         }
-        return result;
     }
 
-    protected void readLogs(BufferedReader reader) {
-        try {
-            List<Event> events = new ArrayList<Event>();
-
-            String eventString = null;
-            while ((eventString = reader.readLine()) != null) {
-                try {
-                    Event event = new Event(eventString);
-                    events.add(event);
-                }
-                catch (IllegalArgumentException e) {
-                    // Just skip this one.
-                }
-            }
-            m_logStore.put(events);
-        }
-        catch (IOException e) {
-            m_log.log(LogService.LOG_DEBUG, "Error reading line from reader", e);
+    private void closeSilently(URLConnection resource) {
+        if (resource instanceof HttpURLConnection) {
+            ((HttpURLConnection) resource).disconnect();
         }
+    }
 
+    private URLConnection createConnection(URL url) throws IOException {
+        return m_connectionFactory.createConnection(url);
     }
 
-    /**
-     * Calculates the difference between two lists of <code>Descriptor</code>. The result will contain whatever is
-     * not in <code>destination</code>, but is in <code>source</code>.
-     */
-    protected List<Descriptor> calculateDelta(List<Descriptor> source, List<Descriptor> destination) {
-        /*
-         * For each local descriptor, we try to find a matching remote one. If so, we will synchronize all events
-         * that the remote does not have. If we do not find a matching one at all, we send the complete local
-         * log.
-         */
-        List<Descriptor> result = new ArrayList<Descriptor>();
-        for (Descriptor s : source) {
-            Descriptor diffs = s;
-            for (Descriptor d : destination) {
-                if ((s.getStoreID() == d.getStoreID()) && (s.getTargetID().equals(d.getTargetID()))) {
-                    SortedRangeSet rangeDiff = d.getRangeSet().diffDest(s.getRangeSet());
-                    if (!isEmptyRangeSet(rangeDiff)) {
-                        diffs = new Descriptor(s.getTargetID(), s.getStoreID(), rangeDiff);
-                    }
-                    else {
-                        diffs = null;
-                    }
-                }
-            }
-            if (diffs != null) {
-                result.add(diffs);
-            }
-        }
-        return result;
+    private URL createReceiveURL(URL host, Descriptor l) throws MalformedURLException {
+        return new URL(host, String.format("%s/%s?%s=%s&%s=%s&%s=%s", m_endpoint, COMMAND_RECEIVE, TARGETID_KEY, l.getTargetID(), LOGID_KEY, l.getStoreID(), RANGE_KEY, l.getRangeSet().toRepresentation()));
+    }
+
+    private URL createURL(URL host, String command) throws MalformedURLException {
+        return new URL(host, m_endpoint.concat("/").concat(command));
     }
 
     private boolean isEmptyRangeSet(SortedRangeSet set) {
         return !set.iterator().hasNext();
     }
 
-    protected List<Descriptor> getRanges(InputStream stream) throws IOException {
-        List<Descriptor> result = new ArrayList<Descriptor>();
-        BufferedReader queryReader = null;
-        try {
-            queryReader = new BufferedReader(new InputStreamReader(stream));
+    /**
+     * Synchronizes the local store with the discovered remote one.
+     * 
+     * @throws java.io.IOException
+     */
+    private boolean synchronize(boolean push, boolean pull) throws IOException {
+        URL host = m_discovery.discover();
 
-            for (String line = queryReader.readLine(); line != null; line = queryReader.readLine()) {
-                try {
-                    result.add(new Descriptor(line));
-                }
-                catch (IllegalArgumentException iae) {
-                    throw new IOException("Could not determine highest remote event id, received malformed event range: " + line);
-                }
-            }
+        List<Descriptor> localRanges = m_logStore.getDescriptors();
+        List<Descriptor> remoteRanges = getRanges(host);
+
+        boolean result = false;
+        if (push) {
+            result |= doPush(host, localRanges, remoteRanges);
         }
-        finally {
-            if (queryReader != null) {
-                try {
-                    queryReader.close();
-                }
-                catch (Exception ex) {
-                    // not much we can do
-                }
-            }
+        if (pull) {
+            result |= doPull(host, localRanges, remoteRanges);
         }
-        return result;
-
-    }
 
-    public String getName() {
-        return m_name;
+        return result;
     }
-}
\ No newline at end of file
+}