You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by ja...@apache.org on 2013/10/29 12:11:16 UTC
svn commit: r1536664 -
/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java
Author: jawi
Date: Tue Oct 29 11:11:15 2013
New Revision: 1536664
URL: http://svn.apache.org/r1536664
Log:
ACE-389 - configure scaling with relays:
- cleaned up the log-sync task a bit, make sure all resources
are properly closed, and that we're correctly synchronizing
with the server (by checking the response codes).
Modified:
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java
Modified: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java?rev=1536664&r1=1536663&r2=1536664&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java (original)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java Tue Oct 29 11:11:15 2013
@@ -20,12 +20,14 @@ package org.apache.ace.log.server.task;
import java.io.BufferedReader;
import java.io.BufferedWriter;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
+import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
@@ -33,6 +35,8 @@ import java.net.URLConnection;
import java.util.ArrayList;
import java.util.List;
+import javax.servlet.http.HttpServletResponse;
+
import org.apache.ace.connectionfactory.ConnectionFactory;
import org.apache.ace.discovery.Discovery;
import org.apache.ace.feedback.Descriptor;
@@ -44,100 +48,163 @@ import org.osgi.service.log.LogService;
public class LogSyncTask implements Runnable, LogSync {
+ public static enum Mode {
+ PUSH, PULL, PUSHPULL
+ }
+
private static final String COMMAND_QUERY = "query";
private static final String COMMAND_SEND = "send";
- private static final String COMMAND_RECEIVE = "receive";
+ private static final String COMMAND_RECEIVE = "receive";
private static final String TARGETID_KEY = "tid";
@SuppressWarnings("unused")
private static final String FILTER_KEY = "filter";
private static final String LOGID_KEY = "logid";
- private static final String RANGE_KEY = "range";
+ private static final String RANGE_KEY = "range";
// injected by dependencymanager
private volatile Discovery m_discovery;
private volatile LogService m_log;
private volatile LogStore m_logStore;
+
private volatile ConnectionFactory m_connectionFactory;
-
private final String m_endpoint;
private final String m_name;
- private final Mode m_mode;
- public static enum Mode { PUSH, PULL, PUSHPULL };
-
+ private final Mode m_mode;;
+
public LogSyncTask(String endpoint, String name, Mode mode) {
m_endpoint = endpoint;
m_name = name;
- m_mode = mode;
+ m_mode = mode;
+ }
+
+ public String getName() {
+ return m_name;
+ }
+
+ public boolean pull() throws IOException {
+ return synchronize(false /* push */, true /* pull */);
+ }
+
+ public boolean push() throws IOException {
+ return synchronize(true /* push */, false /* pull */);
+ }
+
+ public boolean pushpull() throws IOException {
+ return synchronize(true /* push */, true /* pull */);
}
public void run() {
try {
- switch (m_mode) {
- case PULL:
- pull();
- break;
- case PUSH:
- push();
- break;
- case PUSHPULL:
- pushpull();
- break;
- }
+ switch (m_mode) {
+ case PULL:
+ pull();
+ break;
+ case PUSH:
+ push();
+ break;
+ case PUSHPULL:
+ pushpull();
+ break;
+ }
}
catch (MalformedURLException e) {
- m_log.log(LogService.LOG_ERROR, "Unable to (" + m_mode.toString() + ") synchronize log (name=" + m_name + ") with remote");
+ m_log.log(LogService.LOG_ERROR, "Unable to (" + m_mode.toString() + ") synchronize log (name=" + m_name + ") with remote (malformed URL, incorrect configuration?)");
+ }
+ catch (ConnectException e) {
+ m_log.log(LogService.LOG_WARNING, "Unable to (" + m_mode.toString() + ") synchronize log (name=" + m_name + ") with remote (connection refused, remote not up?)");
}
catch (IOException e) {
m_log.log(LogService.LOG_ERROR, "Unable to (" + m_mode.toString() + ") synchronize log (name=" + m_name + ") with remote", e);
}
}
- public boolean pull() throws IOException {
- return synchronize(false, true);
+ /**
+ * Calculates the difference between two lists of <code>Descriptor</code>. The result will contain whatever is not
+ * in <code>destination</code>, but is in <code>source</code>.
+ */
+ protected List<Descriptor> calculateDelta(List<Descriptor> source, List<Descriptor> destination) {
+ /*
+ * For each local descriptor, we try to find a matching remote one. If so, we will synchronize all events that
+ * the remote does not have. If we do not find a matching one at all, we send the complete local log.
+ */
+ List<Descriptor> result = new ArrayList<Descriptor>();
+ for (Descriptor s : source) {
+ Descriptor diffs = s;
+ for (Descriptor d : destination) {
+ if ((s.getStoreID() == d.getStoreID()) && (s.getTargetID().equals(d.getTargetID()))) {
+ SortedRangeSet rangeDiff = d.getRangeSet().diffDest(s.getRangeSet());
+ if (!isEmptyRangeSet(rangeDiff)) {
+ diffs = new Descriptor(s.getTargetID(), s.getStoreID(), rangeDiff);
+ }
+ else {
+ diffs = null;
+ }
+ }
+ }
+ if (diffs != null) {
+ result.add(diffs);
+ }
+ }
+ return result;
}
- public boolean push() throws IOException {
- return synchronize(true, false);
- }
+ protected boolean doPull(URL host, List<Descriptor> localRanges, List<Descriptor> remoteRanges) {
+ List<Descriptor> delta = calculateDelta(remoteRanges, localRanges);
- public boolean pushpull() throws IOException {
- return synchronize(true, true);
- }
+ boolean result = !delta.isEmpty();
+ for (Descriptor descriptor : delta) {
+ InputStream receiveInput = null;
+ URLConnection receiveConnection = null;
+ try {
+ /*
+ * The request currently contains a range. This is not yet supported by the servlet, but it will simply
+ * be ignored.
+ */
+ URL url = createReceiveURL(host, descriptor);
- /**
- * Synchronizes the local store with the discovered remote one.
- * @throws java.io.IOException
- */
- private boolean synchronize(boolean push, boolean pull) throws IOException {
- URL host = m_discovery.discover();
+ receiveConnection = createConnection(url);
+ receiveInput = receiveConnection.getInputStream();
- URLConnection queryConnection = m_connectionFactory.createConnection(new URL(host, m_endpoint + "/" + COMMAND_QUERY));
- InputStream queryInput = queryConnection.getInputStream();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(receiveInput));
+ try {
+ readLogs(reader);
+ }
+ finally {
+ reader.close();
+ }
- List<Descriptor> localRanges = m_logStore.getDescriptors();
- List<Descriptor> remoteRanges = getRanges(queryInput);
+ if (receiveConnection instanceof HttpURLConnection) {
+ // Will cause a flush and reads the response from the server...
+ result = ((HttpURLConnection) receiveConnection).getResponseCode() == HttpServletResponse.SC_OK;
+ }
- boolean result = false;
- if (push) {
- result |= doPush(host, localRanges, remoteRanges);
- }
- if (pull) {
- result |= doPull(host, localRanges, remoteRanges);
+ m_log.log(LogService.LOG_DEBUG, "Pulled log (" + m_name + ") successfully from remote...");
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to connect to retrieve log events.", e);
+ }
+ finally {
+ closeSilently(receiveInput);
+ closeSilently(receiveConnection);
+ }
}
return result;
}
protected boolean doPush(URL host, List<Descriptor> localRanges, List<Descriptor> remoteRanges) {
- boolean result = false;
+ List<Descriptor> delta = calculateDelta(localRanges, remoteRanges);
+ boolean result = !delta.isEmpty();
+
OutputStream sendOutput = null;
+ URLConnection sendConnection = null;
+
try {
- URLConnection sendConnection = m_connectionFactory.createConnection(new URL(host, m_endpoint + "/" + COMMAND_SEND));
-
+ sendConnection = createConnection(createURL(host, COMMAND_SEND));
if (sendConnection instanceof HttpURLConnection) {
// ACE-294: enable streaming mode causing only small amounts of memory to be
- // used for this commit. Otherwise, the entire input stream is cached into
+ // used for this commit. Otherwise, the entire input stream is cached into
// memory prior to sending it to the server...
((HttpURLConnection) sendConnection).setChunkedStreamingMode(8192);
}
@@ -146,180 +213,167 @@ public class LogSyncTask implements Runn
sendOutput = sendConnection.getOutputStream();
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(sendOutput));
- List<Descriptor> delta = calculateDelta(localRanges, remoteRanges);
- result = !delta.isEmpty();
- writeDelta(delta, writer);
-
- sendOutput.flush();
- sendOutput.close();
+ try {
+ writeDelta(delta, writer);
+ }
+ finally {
+ writer.close();
+ }
if (sendConnection instanceof HttpURLConnection) {
- HttpURLConnection conn = (HttpURLConnection) sendConnection;
- conn.getContent();
- conn.disconnect();
+ // Will cause a flush and reads the response from the server...
+ result = ((HttpURLConnection) sendConnection).getResponseCode() == HttpServletResponse.SC_OK;
}
+
+ m_log.log(LogService.LOG_DEBUG, "Pushed log (" + m_name + ") successfully to remote...");
}
catch (IOException e) {
m_log.log(LogService.LOG_ERROR, "Unable to (fully) synchronize log with remote", e);
}
finally {
- if (sendOutput != null) {
+ closeSilently(sendOutput);
+ closeSilently(sendConnection);
+ }
+ return result;
+ }
+
+ protected List<Descriptor> getRanges(URL host) throws IOException {
+ List<Descriptor> result = new ArrayList<Descriptor>();
+
+ URLConnection queryConnection = null;
+ InputStream queryInput = null;
+ try {
+ queryConnection = createConnection(createURL(host, COMMAND_QUERY));
+ queryInput = queryConnection.getInputStream();
+
+ BufferedReader queryReader = new BufferedReader(new InputStreamReader(queryInput));
+
+ for (String line = queryReader.readLine(); line != null; line = queryReader.readLine()) {
try {
- sendOutput.close();
+ result.add(new Descriptor(line));
}
- catch (Exception ex) {
- // not much we can do
+ catch (IllegalArgumentException iae) {
+ queryReader.close();
+
+ throw new IOException("Could not determine highest remote event id, received malformed event range: " + line);
}
}
}
+ finally {
+ closeSilently(queryInput);
+ closeSilently(queryConnection);
+ }
return result;
+
+ }
+
+ protected void readLogs(BufferedReader reader) {
+ try {
+ List<Event> events = new ArrayList<Event>();
+
+ String eventString = null;
+ while ((eventString = reader.readLine()) != null) {
+ try {
+ events.add(new Event(eventString));
+ }
+ catch (IllegalArgumentException e) {
+ // Just skip this one.
+ }
+ }
+ m_logStore.put(events);
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_DEBUG, "Error reading line from reader", e);
+ }
+
}
/**
* Writes the difference between local and remote to a writer.
- * @param descriptors A list of Descriptors that identifies all local log entries that need to be written.
- * @param writer A writer to write to.
+ *
+ * @param descriptors
+ * A list of Descriptors that identifies all local log entries that need to be written.
+ * @param writer
+ * A writer to write to.
* @throws java.io.IOException
*/
protected void writeDelta(List<Descriptor> descriptors, Writer writer) throws IOException {
for (Descriptor l : descriptors) {
writeDescriptor(l, writer);
}
+ writer.flush();
}
/**
* Writes the Events described by the descriptor to the writer.
- * @param descriptor A Descriptor that identifies the events to be written.
- * @param writer A writer to write the events to.
- * @throws java.io.IOException Thrown when either the writer goes wrong, or there is a problem
- * communicating with the local log store.
+ *
+ * @param descriptor
+ * A Descriptor that identifies the events to be written.
+ * @param writer
+ * A writer to write the events to.
+ * @throws java.io.IOException
+ * Thrown when either the writer goes wrong, or there is a problem communicating with the local log
+ * store.
*/
protected void writeDescriptor(Descriptor descriptor, Writer writer) throws IOException {
List<Event> events = m_logStore.get(descriptor);
for (Event event : events) {
writer.write(event.toRepresentation() + "\n");
}
- writer.flush();
}
- protected boolean doPull(URL host, List<Descriptor> localRanges, List<Descriptor> remoteRanges) {
- boolean result = false;
- List<Descriptor> delta = calculateDelta(remoteRanges, localRanges);
- result = !delta.isEmpty();
- for (Descriptor l : delta) {
+ private void closeSilently(Closeable resource) {
+ if (resource != null) {
try {
- /*
- * The request currently contains a range. This is not yet supported by the servlet, but it will
- * simply be ignored.
- */
- URL url = new URL(host, m_endpoint + "/" + COMMAND_RECEIVE + "?" + TARGETID_KEY + "=" + l.getTargetID() + "&" + LOGID_KEY + "=" + l.getStoreID() + "&" + RANGE_KEY + "=" + l.getRangeSet().toRepresentation());
-
- URLConnection receiveConnection = m_connectionFactory.createConnection(url);
- InputStream receiveInput = receiveConnection.getInputStream();
-
- BufferedReader reader = new BufferedReader(new InputStreamReader(receiveInput));
- readLogs(reader);
-
- if (receiveConnection instanceof HttpURLConnection) {
- HttpURLConnection conn = (HttpURLConnection) receiveConnection;
- conn.getContent();
- conn.disconnect();
- }
+ resource.close();
}
- catch (IOException e) {
- m_log.log(LogService.LOG_ERROR, "Unable to connect to retrieve log events.", e);
+ catch (IOException exception) {
+ // Ignore, not much we can do...
}
}
- return result;
}
- protected void readLogs(BufferedReader reader) {
- try {
- List<Event> events = new ArrayList<Event>();
-
- String eventString = null;
- while ((eventString = reader.readLine()) != null) {
- try {
- Event event = new Event(eventString);
- events.add(event);
- }
- catch (IllegalArgumentException e) {
- // Just skip this one.
- }
- }
- m_logStore.put(events);
- }
- catch (IOException e) {
- m_log.log(LogService.LOG_DEBUG, "Error reading line from reader", e);
+ private void closeSilently(URLConnection resource) {
+ if (resource instanceof HttpURLConnection) {
+ ((HttpURLConnection) resource).disconnect();
}
+ }
+ private URLConnection createConnection(URL url) throws IOException {
+ return m_connectionFactory.createConnection(url);
}
- /**
- * Calculates the difference between two lists of <code>Descriptor</code>. The result will contain whatever is
- * not in <code>destination</code>, but is in <code>source</code>.
- */
- protected List<Descriptor> calculateDelta(List<Descriptor> source, List<Descriptor> destination) {
- /*
- * For each local descriptor, we try to find a matching remote one. If so, we will synchronize all events
- * that the remote does not have. If we do not find a matching one at all, we send the complete local
- * log.
- */
- List<Descriptor> result = new ArrayList<Descriptor>();
- for (Descriptor s : source) {
- Descriptor diffs = s;
- for (Descriptor d : destination) {
- if ((s.getStoreID() == d.getStoreID()) && (s.getTargetID().equals(d.getTargetID()))) {
- SortedRangeSet rangeDiff = d.getRangeSet().diffDest(s.getRangeSet());
- if (!isEmptyRangeSet(rangeDiff)) {
- diffs = new Descriptor(s.getTargetID(), s.getStoreID(), rangeDiff);
- }
- else {
- diffs = null;
- }
- }
- }
- if (diffs != null) {
- result.add(diffs);
- }
- }
- return result;
+ private URL createReceiveURL(URL host, Descriptor l) throws MalformedURLException {
+ return new URL(host, String.format("%s/%s?%s=%s&%s=%s&%s=%s", m_endpoint, COMMAND_RECEIVE, TARGETID_KEY, l.getTargetID(), LOGID_KEY, l.getStoreID(), RANGE_KEY, l.getRangeSet().toRepresentation()));
+ }
+
+ private URL createURL(URL host, String command) throws MalformedURLException {
+ return new URL(host, m_endpoint.concat("/").concat(command));
}
private boolean isEmptyRangeSet(SortedRangeSet set) {
return !set.iterator().hasNext();
}
- protected List<Descriptor> getRanges(InputStream stream) throws IOException {
- List<Descriptor> result = new ArrayList<Descriptor>();
- BufferedReader queryReader = null;
- try {
- queryReader = new BufferedReader(new InputStreamReader(stream));
+ /**
+ * Synchronizes the local store with the discovered remote one.
+ *
+ * @throws java.io.IOException
+ */
+ private boolean synchronize(boolean push, boolean pull) throws IOException {
+ URL host = m_discovery.discover();
- for (String line = queryReader.readLine(); line != null; line = queryReader.readLine()) {
- try {
- result.add(new Descriptor(line));
- }
- catch (IllegalArgumentException iae) {
- throw new IOException("Could not determine highest remote event id, received malformed event range: " + line);
- }
- }
+ List<Descriptor> localRanges = m_logStore.getDescriptors();
+ List<Descriptor> remoteRanges = getRanges(host);
+
+ boolean result = false;
+ if (push) {
+ result |= doPush(host, localRanges, remoteRanges);
}
- finally {
- if (queryReader != null) {
- try {
- queryReader.close();
- }
- catch (Exception ex) {
- // not much we can do
- }
- }
+ if (pull) {
+ result |= doPull(host, localRanges, remoteRanges);
}
- return result;
-
- }
- public String getName() {
- return m_name;
+ return result;
}
-}
\ No newline at end of file
+}