You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by ja...@apache.org on 2012/10/17 17:09:29 UTC
svn commit: r1399299 - in /ace/trunk:
org.apache.ace.client.repository.helper.base/src/org/apache/ace/client/repository/helper/base/
org.apache.ace.client.repository.impl/src/org/apache/ace/client/repository/impl/
org.apache.ace.gateway.log/src/org/apa...
Author: jawi
Date: Wed Oct 17 15:09:29 2012
New Revision: 1399299
URL: http://svn.apache.org/viewvc?rev=1399299&view=rev
Log:
ACE-294: enable streaming mode for all HTTP-POSTs to the server.
Modified:
ace/trunk/org.apache.ace.client.repository.helper.base/src/org/apache/ace/client/repository/helper/base/ArtifactPreprocessorBase.java
ace/trunk/org.apache.ace.client.repository.impl/src/org/apache/ace/client/repository/impl/ArtifactRepositoryImpl.java
ace/trunk/org.apache.ace.gateway.log/src/org/apache/ace/target/log/task/LogSyncTask.java
ace/trunk/org.apache.ace.log.task/src/org/apache/ace/server/log/task/LogSyncTask.java
ace/trunk/org.apache.ace.repository.ext/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
Modified: ace/trunk/org.apache.ace.client.repository.helper.base/src/org/apache/ace/client/repository/helper/base/ArtifactPreprocessorBase.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.client.repository.helper.base/src/org/apache/ace/client/repository/helper/base/ArtifactPreprocessorBase.java?rev=1399299&r1=1399298&r2=1399299&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.client.repository.helper.base/src/org/apache/ace/client/repository/helper/base/ArtifactPreprocessorBase.java (original)
+++ ace/trunk/org.apache.ace.client.repository.helper.base/src/org/apache/ace/client/repository/helper/base/ArtifactPreprocessorBase.java Wed Oct 17 15:09:29 2012
@@ -38,21 +38,22 @@ import org.apache.ace.client.repository.
import org.apache.ace.connectionfactory.ConnectionFactory;
/**
- * This class can be used as a base class for artifact preprocessors. It comes with its
- * own upload() method, which will be used by all artifact preprocessors anyway.
+ * This class can be used as a base class for artifact preprocessors. It comes with its own upload() method, which will
+ * be used by all artifact preprocessors anyway.
*/
public abstract class ArtifactPreprocessorBase implements ArtifactPreprocessor {
- /** 64k buffers should be enough for everybody... */
+ /** 64k buffers should be enough for everybody... */
protected static final int BUFFER_SIZE = 64 * 1024;
-
+
protected final ConnectionFactory m_connectionFactory;
private final ExecutorService m_executor;
/**
* Creates a new {@link ArtifactPreprocessorBase} instance.
*
- * @param connectionFactory the connection factory to use, cannot be <code>null</code>.
+ * @param connectionFactory
+ * the connection factory to use, cannot be <code>null</code>.
*/
protected ArtifactPreprocessorBase(ConnectionFactory connectionFactory) {
m_connectionFactory = connectionFactory;
@@ -62,10 +63,13 @@ public abstract class ArtifactPreprocess
/**
* Creates a new URL for given (file) name and OBR base URL.
*
- * @param name the name of the file to create the URL for;
- * @param obrBase the OBR base URL to use.
+ * @param name
+ * the name of the file to create the URL for;
+ * @param obrBase
+ * the OBR base URL to use.
* @return a new URL for the file, never <code>null</code>.
- * @throws MalformedURLException in case of invalid characters in the given name.
+ * @throws MalformedURLException
+ * in case of invalid characters in the given name.
*/
protected URL determineNewUrl(String name, URL obrBase) throws MalformedURLException {
return new URL(obrBase, name);
@@ -74,7 +78,8 @@ public abstract class ArtifactPreprocess
/**
* Silently closes the given {@link Closeable} instance.
*
- * @param closable the closeable to close, may be <code>null</code>.
+ * @param closable
+ * the closeable to close, may be <code>null</code>.
*/
protected final void silentlyClose(Closeable closable) {
if (closable != null) {
@@ -90,44 +95,52 @@ public abstract class ArtifactPreprocess
/**
* Gets a stream to write an artifact to, which will be uploaded asynchronously to the OBR.
*
- * @param name The name of the artifact.
- * @param obrBase The base URL of the obr to which this artifact should be written.
- * @param inputStream the input stream with data to upload.
+ * @param name
+ * The name of the artifact.
+ * @param obrBase
+ * The base URL of the obr to which this artifact should be written.
+ * @param inputStream
+ * the input stream with data to upload.
*/
protected final Future<URL> uploadAsynchronously(final String name, final URL obrBase, final InputStream inputStream) {
return m_executor.submit(new Callable<URL>() {
public URL call() throws IOException {
- return upload(inputStream, name, obrBase);
+ return upload(inputStream, name, obrBase);
}
});
}
- /**
- * Converts a given URL to a {@link File} object.
- *
- * @param url the URL to convert, cannot be <code>null</code>.
- * @return a {@link File} object, never <code>null</code>.
- */
- protected final File urlToFile(URL url) {
- File file;
+ /**
+ * Converts a given URL to a {@link File} object.
+ *
+ * @param url
+ * the URL to convert, cannot be <code>null</code>.
+ * @return a {@link File} object, never <code>null</code>.
+ */
+ protected final File urlToFile(URL url) {
+ File file;
try {
file = new File(url.toURI());
}
catch (URISyntaxException e) {
file = new File(url.getPath());
}
- return file;
- }
+ return file;
+ }
/**
* Uploads an artifact synchronously to an OBR.
*
- * @param input A inputstream from which the artifact can be read.
- * @param name The name of the artifact. If the name is not unique, an IOException will be thrown.
- * @param obrBase The base URL of the obr to which this artifact should be written.
+ * @param input
+ * A inputstream from which the artifact can be read.
+ * @param name
+ * The name of the artifact. If the name is not unique, an IOException will be thrown.
+ * @param obrBase
+ * The base URL of the obr to which this artifact should be written.
* @return A URL to the uploaded artifact; this is identical to calling <code>determineNewUrl(name, obrBase)</code>
- * @throws IOException If there was an error reading from <code>input</code>, or if there was a problem communicating
- * with the OBR.
+ * @throws IOException
+ * If there was an error reading from <code>input</code>, or if there was a problem communicating with
+ * the OBR.
*/
private URL upload(InputStream input, String name, URL obrBase) throws IOException {
if (obrBase == null) {
@@ -142,12 +155,12 @@ public abstract class ArtifactPreprocess
url = determineNewUrl(name, obrBase);
if (!urlPointsToExistingFile(url)) {
- if ("file".equals(url.getProtocol())) {
- uploadToFile(input, url);
- }
- else {
- uploadToRemote(input, url);
- }
+ if ("file".equals(url.getProtocol())) {
+ uploadToFile(input, url);
+ }
+ else {
+ uploadToRemote(input, url);
+ }
}
}
catch (IOException ioe) {
@@ -163,9 +176,12 @@ public abstract class ArtifactPreprocess
/**
* Uploads an artifact to a local file location.
*
- * @param input the input stream of the (local) artifact to upload.
- * @param url the URL of the (file) artifact to upload to.
- * @throws IOException in case of I/O problems.
+ * @param input
+ * the input stream of the (local) artifact to upload.
+ * @param url
+ * the URL of the (file) artifact to upload to.
+ * @throws IOException
+ * in case of I/O problems.
*/
private void uploadToFile(InputStream input, URL url) throws IOException {
File file = urlToFile(url);
@@ -188,17 +204,26 @@ public abstract class ArtifactPreprocess
/**
* Uploads an artifact to a remote location.
*
- * @param input the input stream of the (local) artifact to upload.
- * @param url the URL of the (remote) artifact to upload to.
- * @throws IOException in case of I/O problems, or when the upload was refused by the remote.
+ * @param input
+ * the input stream of the (local) artifact to upload.
+ * @param url
+ * the URL of the (remote) artifact to upload to.
+ * @throws IOException
+ * in case of I/O problems, or when the upload was refused by the remote.
*/
private void uploadToRemote(InputStream input, URL url) throws IOException {
OutputStream output = null;
try {
URLConnection connection = m_connectionFactory.createConnection(url);
+ if (connection instanceof HttpURLConnection) {
+ // ACE-294: enable streaming mode causing only small amounts of memory to be
+ // used for this commit. Otherwise, the entire input stream is cached into
+ // memory prior to sending it to the server...
+ ((HttpURLConnection) connection).setChunkedStreamingMode(8192);
+ }
connection.setDoOutput(true);
-
+
output = connection.getOutputStream();
byte[] buffer = new byte[BUFFER_SIZE];
@@ -226,48 +251,54 @@ public abstract class ArtifactPreprocess
silentlyClose(output);
}
}
-
+
/**
* Determines whether the given URL points to an existing file.
*
- * @param url the URL to test, cannot be <code>null</code>.
+ * @param url
+ * the URL to test, cannot be <code>null</code>.
* @return <code>true</code> if the given URL points to an existing file, <code>false</code> otherwise.
*/
private boolean urlPointsToExistingFile(URL url) {
- boolean result = false;
+ boolean result = false;
+
+ if ("file".equals(url.getProtocol())) {
+ result = urlToFile(url).exists();
+ }
+ else {
+ try {
+ URLConnection connection = m_connectionFactory.createConnection(url);
+
+ if (connection instanceof HttpURLConnection) {
+ HttpURLConnection hc = (HttpURLConnection) connection;
+
+ // Perform a HEAD on the file, to see whether it exists...
+ hc.setRequestMethod("HEAD");
+ try {
+ int responseCode = hc.getResponseCode();
+ result = (responseCode == HttpURLConnection.HTTP_OK);
+ }
+ finally {
+ hc.disconnect();
+ }
+ }
+ else {
+ // In all other scenario's: try to read a single byte from the input
+ // stream, if this succeeds, we can assume the file exists...
+ InputStream is = connection.getInputStream();
+ try {
+ is.read();
+ }
+ finally {
+ silentlyClose(is);
+ }
+ }
+ }
+ catch (IOException e) {
+ // Ignore; assume file does not exist...
+ }
+ }
- if ("file".equals(url.getProtocol())) {
- result = urlToFile(url).exists();
- } else {
- try {
- URLConnection connection = m_connectionFactory.createConnection(url);
-
- if (connection instanceof HttpURLConnection) {
- HttpURLConnection hc = (HttpURLConnection) connection;
-
- // Perform a HEAD on the file, to see whether it exists...
- hc.setRequestMethod("HEAD");
- try {
- int responseCode = hc.getResponseCode();
- result = (responseCode == HttpURLConnection.HTTP_OK);
- } finally {
- hc.disconnect();
- }
- } else {
- // In all other scenario's: try to read a single byte from the input
- // stream, if this succeeds, we can assume the file exists...
- InputStream is = connection.getInputStream();
- try {
- is.read();
- } finally {
- silentlyClose(is);
- }
- }
- } catch (IOException e) {
- // Ignore; assume file does not exist...
- }
- }
-
- return result;
+ return result;
}
}
Modified: ace/trunk/org.apache.ace.client.repository.impl/src/org/apache/ace/client/repository/impl/ArtifactRepositoryImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.client.repository.impl/src/org/apache/ace/client/repository/impl/ArtifactRepositoryImpl.java?rev=1399299&r1=1399298&r2=1399299&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.client.repository.impl/src/org/apache/ace/client/repository/impl/ArtifactRepositoryImpl.java (original)
+++ ace/trunk/org.apache.ace.client.repository.impl/src/org/apache/ace/client/repository/impl/ArtifactRepositoryImpl.java Wed Oct 17 15:09:29 2012
@@ -52,23 +52,23 @@ import org.osgi.service.log.LogService;
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
/**
- * Implementation class for the ArtifactRepository. For 'what it does', see ArtifactRepository,
- * for 'how it works', see ObjectRepositoryImpl.<br>
+ * Implementation class for the ArtifactRepository. For 'what it does', see ArtifactRepository, for 'how it works', see
+ * ObjectRepositoryImpl.<br>
* <br>
* This class has some extended functionality when compared to <code>ObjectRepositoryImpl</code>,
* <ul>
- * <li> it keeps track of all <code>ArtifactHelper</code>s, and serves them to its inhabitants.
- * <li> it handles importing of artifacts.
+ * <li>it keeps track of all <code>ArtifactHelper</code>s, and serves them to its inhabitants.
+ * <li>it handles importing of artifacts.
* </ul>
*/
public class ArtifactRepositoryImpl extends ObjectRepositoryImpl<ArtifactObjectImpl, ArtifactObject> implements ArtifactRepository {
private final static String XML_NODE = "artifacts";
-
+
// Injected by Dependency Manager
private volatile BundleContext m_context;
private volatile LogService m_log;
private volatile ConnectionFactory m_connectionFactory;
-
+
private final Map<String, ArtifactHelper> m_helpers = new HashMap<String, ArtifactHelper>();
private URL m_obrBase;
@@ -121,10 +121,11 @@ public class ArtifactRepositoryImpl exte
ArtifactObjectImpl createNewInhabitant(Map<String, String> attributes, Map<String, String> tags) {
ArtifactHelper helper = getHelper(attributes.get(ArtifactObject.KEY_MIMETYPE));
ArtifactObjectImpl ao = new ArtifactObjectImpl(helper.checkAttributes(attributes), helper.getMandatoryAttributes(), tags, this, this);
- if ((ao.getAttribute("upload") != null) && (m_obrBase != null)){
+ if ((ao.getAttribute("upload") != null) && (m_obrBase != null)) {
try {
ao.addAttribute(ArtifactObject.KEY_URL, new URL(m_obrBase, ao.getDefinition() + ao.getAttribute("upload")).toString());
- } catch (MalformedURLException e) {
+ }
+ catch (MalformedURLException e) {
throw new IllegalStateException(e);
}
}
@@ -138,12 +139,15 @@ public class ArtifactRepositoryImpl exte
/**
* Helper method for this repository's inhabitants, which finds the necessary helpers.
- * @param mimetype The mimetype for which a helper should be found.
+ *
+ * @param mimetype
+ * The mimetype for which a helper should be found.
* @return An artifact helper for the given mimetype.
- * @throws IllegalArgumentException when the mimetype is invalid, or no helpers are available.
+ * @throws IllegalArgumentException
+ * when the mimetype is invalid, or no helpers are available.
*/
ArtifactHelper getHelper(String mimetype) {
- synchronized(m_helpers) {
+ synchronized (m_helpers) {
if ((mimetype == null) || (mimetype.length() == 0)) {
throw new IllegalArgumentException("Without a mimetype, we cannot find a helper.");
}
@@ -162,7 +166,7 @@ public class ArtifactRepositoryImpl exte
* Method intended for adding artifact helpers by the bundle's activator.
*/
void addHelper(String mimetype, ArtifactHelper helper) {
- synchronized(m_helpers) {
+ synchronized (m_helpers) {
if ((mimetype == null) || (mimetype.length() == 0)) {
m_log.log(LogService.LOG_WARNING, "An ArtifactHelper has been published without a proper mimetype.");
}
@@ -176,7 +180,7 @@ public class ArtifactRepositoryImpl exte
* Method intended for removing artifact helpers by the bundle's activator.
*/
void removeHelper(String mimetype, ArtifactHelper helper) {
- synchronized(m_helpers) {
+ synchronized (m_helpers) {
if ((mimetype == null) || (mimetype.length() == 0)) {
m_log.log(LogService.LOG_WARNING, "An ArtifactHelper is being removed without a proper mimetype.");
}
@@ -187,13 +191,14 @@ public class ArtifactRepositoryImpl exte
}
/**
- * Utility function that takes either a URL or a String representing a mimetype,
- * and returns the corresponding <code>ArtifactHelper</code>, <code>ArtifactRecognizer</code>
- * and, if not specified, the mimetype.
- * @param input Either a <code>URL</code> pointing to a physical artifact, or a <code>String</code>
- * representing a mime type.
+ * Utility function that takes either a URL or a String representing a mimetype, and returns the corresponding
+ * <code>ArtifactHelper</code>, <code>ArtifactRecognizer</code> and, if not specified, the mimetype.
+ *
+ * @param input
+ * Either a <code>URL</code> pointing to a physical artifact, or a <code>String</code> representing a
+ * mime type.
* @return A mapping from a class (<code>ArtifactRecognizer</code>, <code>ArtifactHelper</code> or
- * <code>String</code> to an instance of that class as a result.
+ * <code>String</code> to an instance of that class as a result.
*/
protected Map<Class<?>, Object> findRecognizerAndHelper(Object input) throws IllegalArgumentException {
// check input.
@@ -248,7 +253,8 @@ public class ArtifactRepositoryImpl exte
break;
}
}
- } finally {
+ }
+ finally {
m_context.ungetService(ref);
}
}
@@ -278,7 +284,7 @@ public class ArtifactRepositoryImpl exte
return mimetype != null;
}
catch (Exception e) {
- //too bad... Nothing to do now.
+ // too bad... Nothing to do now.
return false;
}
}
@@ -336,7 +342,7 @@ public class ArtifactRepositoryImpl exte
private ArtifactObject importArtifact(URL artifact, ArtifactRecognizer recognizer, ArtifactHelper helper, String mimetype, boolean overwrite, boolean upload) throws IOException {
ArtifactResource resource = convertToArtifactResource(artifact);
-
+
Map<String, String> attributes = recognizer.extractMetaData(resource);
Map<String, String> tags = new HashMap<String, String>();
@@ -347,39 +353,44 @@ public class ArtifactRepositoryImpl exte
}
String artifactURL = artifact.toString();
-
+
attributes.put(ArtifactObject.KEY_URL, artifactURL);
-
+
if (upload) {
attributes.put("upload", recognizer.getExtension(resource));
}
ArtifactObject result = create(attributes, tags);
-
+
if (upload) {
try {
upload(artifact, result.getDefinition() + attributes.get("upload"), mimetype);
- } catch (IOException ex) {
+ }
+ catch (IOException ex) {
remove(result);
throw ex;
}
finally {
try {
attributes.remove("upload");
- } catch (Exception ex) {
+ }
+ catch (Exception ex) {
// Not much we can do
}
}
}
return result;
-
+
}
/**
- * Helper method which checks a given URL for 'validity', that is, does this URL point
- * to something that can be read.
- * @param artifact A URL pointing to an artifact.
- * @throws IllegalArgumentException when the URL does not point to a valid file.
+ * Helper method which checks a given URL for 'validity', that is, does this URL point to something that can be
+ * read.
+ *
+ * @param artifact
+ * A URL pointing to an artifact.
+ * @throws IllegalArgumentException
+ * when the URL does not point to a valid file.
*/
private void checkURL(URL artifact) throws IllegalArgumentException {
@@ -406,17 +417,21 @@ public class ArtifactRepositoryImpl exte
String artifactName = artifact.toString();
for (byte b : artifactName.substring(artifactName.lastIndexOf('/') + 1).getBytes()) {
if (!(((b >= 'A') && (b <= 'Z')) || ((b >= 'a') && (b <= 'z')) || ((b >= '0') && (b <= '9')) || (b == '.') || (b == '-') || (b == '_'))) {
- throw new IllegalArgumentException("Artifact " + artifactName + "'s name contains an illegal character '" + new String(new byte[] {b}) + "'");
+ throw new IllegalArgumentException("Artifact " + artifactName + "'s name contains an illegal character '" + new String(new byte[] { b }) + "'");
}
}
}
/**
* Uploads an artifact to the OBR.
- * @param artifact URL pointing to the local artifact.
- * @param mimetype The mimetype of this artifact.
+ *
+ * @param artifact
+ * URL pointing to the local artifact.
+ * @param mimetype
+ * The mimetype of this artifact.
* @return The persistent URL of this artifact.
- * @throws IOException for any problem uploading the artifact.
+ * @throws IOException
+ * for any problem uploading the artifact.
*/
private URL upload(URL artifact, String definition, String mimetype) throws IOException {
if (m_obrBase == null) {
@@ -432,11 +447,18 @@ public class ArtifactRepositoryImpl exte
url = new URL(m_obrBase, definition);
URLConnection connection = m_connectionFactory.createConnection(url);
-
+
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
+
connection.setRequestProperty("Content-Type", mimetype);
+ if (connection instanceof HttpURLConnection) {
+ // ACE-294: enable streaming mode causing only small amounts of memory to be
+ // used for this commit. Otherwise, the entire input stream is cached into
+ // memory prior to sending it to the server...
+ ((HttpURLConnection) connection).setChunkedStreamingMode(8192);
+ }
output = connection.getOutputStream();
@@ -446,11 +468,11 @@ public class ArtifactRepositoryImpl exte
}
output.close();
-
+
if (connection instanceof HttpURLConnection) {
int responseCode = ((HttpURLConnection) connection).getResponseCode();
switch (responseCode) {
- case HttpURLConnection.HTTP_OK :
+ case HttpURLConnection.HTTP_OK:
break;
case HttpURLConnection.HTTP_CONFLICT:
throw new IOException("Artifact already exists in storage.");
@@ -517,7 +539,13 @@ public class ArtifactRepositoryImpl exte
/**
* Custom comparator which sorts service references by service rank, highest rank first.
*/
- private static Comparator<ServiceReference> SERVICE_RANK_COMPARATOR = new Comparator<ServiceReference>() { // TODO ServiceReferences are comparable by default now
+ private static Comparator<ServiceReference> SERVICE_RANK_COMPARATOR = new Comparator<ServiceReference>() { // TODO
+ // ServiceReferences
+ // are
+ // comparable
+ // by
+ // default
+ // now
public int compare(ServiceReference o1, ServiceReference o2) {
int rank1 = 0;
int rank2 = 0;
@@ -539,18 +567,19 @@ public class ArtifactRepositoryImpl exte
return rank1 - rank2;
}
};
-
+
private InputStream openInputStream(URL artifactURL) throws IOException {
URLConnection connection = m_connectionFactory.createConnection(artifactURL);
return connection.getInputStream();
}
/**
- * Converts a given URL to a {@link ArtifactResource} that abstracts the way we access the contents of
- * the URL away from the URL itself. This way, we can avoid having to pass authentication credentials,
- * or a {@link ConnectionFactory} to the artifact recognizers.
- *
- * @param url the URL to convert, can be <code>null</code> in which case <code>null</code> is returned.
+ * Converts a given URL to a {@link ArtifactResource} that abstracts the way we access the contents of the URL away
+ * from the URL itself. This way, we can avoid having to pass authentication credentials, or a
+ * {@link ConnectionFactory} to the artifact recognizers.
+ *
+ * @param url
+ * the URL to convert, can be <code>null</code> in which case <code>null</code> is returned.
* @return an {@link ArtifactResource}, or <code>null</code> if the given URL was <code>null</code>.
*/
private ArtifactResource convertToArtifactResource(final URL url) {
@@ -562,7 +591,7 @@ public class ArtifactRepositoryImpl exte
public URL getURL() {
return url;
}
-
+
public InputStream openStream() throws IOException {
// Take care of the fact that an URL could need credentials to be accessible!!!
URLConnection conn = m_connectionFactory.createConnection(getURL());
Modified: ace/trunk/org.apache.ace.gateway.log/src/org/apache/ace/target/log/task/LogSyncTask.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.gateway.log/src/org/apache/ace/target/log/task/LogSyncTask.java?rev=1399299&r1=1399298&r2=1399299&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.gateway.log/src/org/apache/ace/target/log/task/LogSyncTask.java (original)
+++ ace/trunk/org.apache.ace.gateway.log/src/org/apache/ace/target/log/task/LogSyncTask.java Wed Oct 17 15:09:29 2012
@@ -62,8 +62,7 @@ public class LogSyncTask implements Runn
}
/**
- * Synchronize the log events available remote with the events available
- * locally.
+ * Synchronize the log events available remote with the events available locally.
*/
public void run() {
URL host = m_discovery.discover();
@@ -74,28 +73,34 @@ public class LogSyncTask implements Runn
m_log.log(LogService.LOG_WARNING, "Unable to synchronize log with remote (endpoint=" + m_endpoint + ") - none available");
return;
}
-
- if ("file".equals(host.getProtocol())) {
- // if the discovery URL is a file, we cannot sync, so we silently return here
- return;
- }
- String targetId = m_identification.getID();
+ if ("file".equals(host.getProtocol())) {
+ // if the discovery URL is a file, we cannot sync, so we silently return here
+ return;
+ }
+
+ String targetId = m_identification.getID();
URLConnection sendConnection = null;
try {
sendConnection = m_connectionFactory.createConnection(new URL(host, m_endpoint + "/" + COMMAND_SEND));
sendConnection.setDoOutput(true);
+ if (sendConnection instanceof HttpURLConnection) {
+ // ACE-294: enable streaming mode causing only small amounts of memory to be
+ // used for this commit. Otherwise, the entire input stream is cached into
+ // memory prior to sending it to the server...
+ ((HttpURLConnection) sendConnection).setChunkedStreamingMode(8192);
+ }
long[] logIDs = m_LogStore.getLogIDs();
for (int i = 0; i < logIDs.length; i++) {
URL url = new URL(host, m_endpoint + "/" + COMMAND_QUERY + "?" + PARAMETER_TARGETID + "=" + targetId + "&" + PARAMETER_LOGID + "=" + logIDs[i]);
-
+
URLConnection queryConnection = m_connectionFactory.createConnection(url);
// TODO: make sure no actual call is made using sendConnection
// when there's nothing to sync
synchronizeLog(logIDs[i], queryConnection.getInputStream(), sendConnection);
}
-
+
// Make sure to send the actual POST request...
sendConnection.getContent();
}
@@ -113,20 +118,16 @@ public class LogSyncTask implements Runn
}
/**
- * Synchronizes a single log (there can be multiple log/logid's per
- * target).
+ * Synchronizes a single log (there can be multiple log/logid's per target).
*
* @param logID
* ID of the log to synchronize.
* @param queryInput
- * Stream pointing to a query result for the events available
- * remotely for this log id
+ * Stream pointing to a query result for the events available remotely for this log id
* @param sendConnection
- * .getOutputStream() Stream to write the events to that are
- * missing on the remote side.
+ * .getOutputStream() Stream to write the events to that are missing on the remote side.
* @throws java.io.IOException
- * If synchronization could not be completed due to an I/O
- * failure.
+ * If synchronization could not be completed due to an I/O failure.
*/
protected void synchronizeLog(long logID, InputStream queryInput, URLConnection sendConnection) throws IOException {
long highestLocal = m_LogStore.getHighestID(logID);
@@ -134,14 +135,14 @@ public class LogSyncTask implements Runn
// No events, no need to synchronize
return;
}
-
+
SortedRangeSet localRange = new SortedRangeSet("1-" + highestLocal);
SortedRangeSet remoteRange = getDescriptor(queryInput).getRangeSet();
SortedRangeSet delta = remoteRange.diffDest(localRange);
RangeIterator rangeIterator = delta.iterator();
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(sendConnection.getOutputStream()));
-
+
if (rangeIterator.hasNext()) {
long lowest = rangeIterator.next();
long highest = delta.getHigh();
@@ -171,8 +172,7 @@ public class LogSyncTask implements Runn
*
* @param queryInput
* Stream containing a LogDescriptor object.
- * @return LogDescriptor object reflecting the range contained in the
- * stream.
+ * @return LogDescriptor object reflecting the range contained in the stream.
* @throws java.io.IOException
* If no range could be determined due to an I/O failure.
*/
@@ -184,7 +184,7 @@ public class LogSyncTask implements Runn
if (rangeString != null) {
try {
return new LogDescriptor(rangeString);
- }
+ }
catch (IllegalArgumentException iae) {
throw new IOException("Could not determine highest remote event id, received malformed event range (" + rangeString + ")");
}
@@ -192,16 +192,16 @@ public class LogSyncTask implements Runn
else {
throw new IOException("Could not construct LogDescriptor from stream because stream is empty");
}
- }
+ }
finally {
if (queryReader != null) {
try {
queryReader.close();
- }
+ }
catch (Exception ex) {
// not much we can do
}
}
}
}
-}
\ No newline at end of file
+}
Modified: ace/trunk/org.apache.ace.log.task/src/org/apache/ace/server/log/task/LogSyncTask.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log.task/src/org/apache/ace/server/log/task/LogSyncTask.java?rev=1399299&r1=1399298&r2=1399299&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.log.task/src/org/apache/ace/server/log/task/LogSyncTask.java (original)
+++ ace/trunk/org.apache.ace.log.task/src/org/apache/ace/server/log/task/LogSyncTask.java Wed Oct 17 15:09:29 2012
@@ -119,8 +119,15 @@ public class LogSyncTask implements Runn
OutputStream sendOutput = null;
try {
URLConnection sendConnection = m_connectionFactory.createConnection(new URL(host, m_endpoint + "/" + COMMAND_SEND));
- sendConnection.setDoOutput(true);
+ if (sendConnection instanceof HttpURLConnection) {
+ // ACE-294: enable streaming mode causing only small amounts of memory to be
+ // used for this commit. Otherwise, the entire input stream is cached into
+ // memory prior to sending it to the server...
+ ((HttpURLConnection) sendConnection).setChunkedStreamingMode(8192);
+ }
+ sendConnection.setDoOutput(true);
+
sendOutput = sendConnection.getOutputStream();
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(sendOutput));
Modified: ace/trunk/org.apache.ace.repository.ext/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.repository.ext/src/org/apache/ace/repository/ext/impl/RemoteRepository.java?rev=1399299&r1=1399298&r2=1399299&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.repository.ext/src/org/apache/ace/repository/ext/impl/RemoteRepository.java (original)
+++ ace/trunk/org.apache.ace.repository.ext/src/org/apache/ace/repository/ext/impl/RemoteRepository.java Wed Oct 17 15:09:29 2012
@@ -94,8 +94,12 @@ public class RemoteRepository implements
URL url = buildCommand(m_url, COMMAND_COMMIT, fromVersion);
HttpURLConnection connection = (HttpURLConnection) m_connectionFactory.createConnection(url);
- connection.setDoOutput(true);
+ // ACE-294: enable streaming mode causing only small amounts of memory to be
+ // used for this commit. Otherwise, the entire input stream is cached into
+ // memory prior to sending it to the server...
+ connection.setChunkedStreamingMode(8192);
connection.setRequestProperty("Content-Type", MIME_APPLICATION_OCTET_STREAM);
+ connection.setDoOutput(true);
OutputStream out = connection.getOutputStream();
try {
Modified: ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java?rev=1399299&r1=1399298&r2=1399299&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java (original)
+++ ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java Wed Oct 17 15:09:29 2012
@@ -60,6 +60,10 @@ final class Utils {
URL url = new URL(host, endpoint + "?customer=" + customer + "&name=" + name + "&version=" + version);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setDoOutput(true);
+ // ACE-294: enable streaming mode causing only small amounts of memory to be
+ // used for this commit. Otherwise, the entire input stream is cached into
+ // memory prior to sending it to the server...
+ connection.setChunkedStreamingMode(8192);
connection.setRequestProperty("Content-Type", MIME_APPLICATION_OCTET_STREAM);
OutputStream out = connection.getOutputStream();
copy(in, out);