You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jclouds.apache.org by na...@apache.org on 2015/03/10 22:06:18 UTC
jclouds git commit: JCLOUDS-532: Properly close HTTP streams
Repository: jclouds
Updated Branches:
refs/heads/master 9f9394152 -> ec63b55a0
JCLOUDS-532: Properly close HTTP streams
Project: http://git-wip-us.apache.org/repos/asf/jclouds/repo
Commit: http://git-wip-us.apache.org/repos/asf/jclouds/commit/ec63b55a
Tree: http://git-wip-us.apache.org/repos/asf/jclouds/tree/ec63b55a
Diff: http://git-wip-us.apache.org/repos/asf/jclouds/diff/ec63b55a
Branch: refs/heads/master
Commit: ec63b55a04afdd5a970d8b05674deb4c3802c59c
Parents: 9f93941
Author: Ignasi Barrera <na...@apache.org>
Authored: Mon Mar 9 12:03:18 2015 +0100
Committer: Ignasi Barrera <na...@apache.org>
Committed: Tue Mar 10 21:28:14 2015 +0100
----------------------------------------------------------------------
.../handlers/BackoffLimitedRetryHandler.java | 2 -
.../BaseHttpCommandExecutorService.java | 9 +-
.../JavaUrlHttpCommandExecutorService.java | 16 +-
.../BackoffLimitedRetryHandlerTest.java | 31 +-
.../BaseHttpCommandExecutorServiceTest.java | 280 +++++++++++++++++++
5 files changed, 301 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jclouds/blob/ec63b55a/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java b/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java
index 481cde2..978406c 100644
--- a/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java
+++ b/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java
@@ -17,7 +17,6 @@
package org.jclouds.http.handlers;
import static java.lang.Math.max;
-import static org.jclouds.http.HttpUtils.releasePayload;
import java.io.IOException;
import java.util.Random;
@@ -95,7 +94,6 @@ public class BackoffLimitedRetryHandler implements HttpRetryHandler, IOException
}
public boolean shouldRetryRequest(HttpCommand command, HttpResponse response) {
- releasePayload(response);
return ifReplayableBackoffAndReturnTrue(command);
}
http://git-wip-us.apache.org/repos/asf/jclouds/blob/ec63b55a/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java b/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java
index 1eda6a9..e770658 100644
--- a/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java
+++ b/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java
@@ -19,6 +19,7 @@ package org.jclouds.http.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate;
import static org.jclouds.http.HttpUtils.checkRequestHasContentLengthOrChunkedEncoding;
+import static org.jclouds.http.HttpUtils.releasePayload;
import static org.jclouds.http.HttpUtils.wirePayloadIfEnabled;
import static org.jclouds.util.Throwables2.getFirstThrowableOfType;
@@ -42,6 +43,8 @@ import org.jclouds.http.handlers.DelegatingRetryHandler;
import org.jclouds.io.ContentMetadataCodec;
import org.jclouds.logging.Logger;
+import com.google.common.annotations.VisibleForTesting;
+
public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandExecutorService {
protected final HttpUtils utils;
protected final ContentMetadataCodec contentMetadataCodec;
@@ -120,13 +123,17 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
return response;
}
- private boolean shouldContinue(HttpCommand command, HttpResponse response) {
+ @VisibleForTesting
+ boolean shouldContinue(HttpCommand command, HttpResponse response) {
boolean shouldContinue = false;
if (retryHandler.shouldRetryRequest(command, response)) {
shouldContinue = true;
} else {
errorHandler.handleError(command, response);
}
+ // At this point we are going to send a new request or we have just handled the error, so
+ // we should make sure that any open stream is closed.
+ releasePayload(response);
return shouldContinue;
}
http://git-wip-us.apache.org/repos/asf/jclouds/blob/ec63b55a/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java b/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java
index bafe7f3..ce36149 100644
--- a/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java
+++ b/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java
@@ -18,7 +18,6 @@ package org.jclouds.http.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate;
-import static com.google.common.io.ByteStreams.toByteArray;
import static com.google.common.net.HttpHeaders.CONTENT_LENGTH;
import static com.google.common.net.HttpHeaders.HOST;
import static com.google.common.net.HttpHeaders.USER_AGENT;
@@ -26,7 +25,6 @@ import static org.jclouds.http.HttpUtils.filterOutContentHeaders;
import static org.jclouds.io.Payloads.newInputStreamPayload;
import static org.jclouds.util.Closeables2.closeQuietly;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
@@ -96,7 +94,7 @@ public class JavaUrlHttpCommandExecutorService extends BaseHttpCommandExecutorSe
try {
in = connection.getInputStream();
} catch (IOException e) {
- in = bufferAndCloseStream(connection.getErrorStream());
+ in = connection.getErrorStream();
} catch (RuntimeException e) {
closeQuietly(in);
throw e;
@@ -127,18 +125,6 @@ public class JavaUrlHttpCommandExecutorService extends BaseHttpCommandExecutorSe
return builder.build();
}
- private InputStream bufferAndCloseStream(InputStream inputStream) throws IOException {
- InputStream in = null;
- try {
- if (inputStream != null) {
- in = new ByteArrayInputStream(toByteArray(inputStream));
- }
- } finally {
- closeQuietly(inputStream);
- }
- return in;
- }
-
@Override
protected HttpURLConnection convert(HttpRequest request) throws IOException, InterruptedException {
boolean chunked = "chunked".equals(request.getFirstHeaderOrNull("Transfer-Encoding"));
http://git-wip-us.apache.org/repos/asf/jclouds/blob/ec63b55a/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java b/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java
index 4e2bfe6..cc25c27 100644
--- a/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java
+++ b/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java
@@ -19,6 +19,7 @@ package org.jclouds.http.handlers;
import static org.jclouds.reflect.Reflection2.method;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import java.io.IOException;
import java.io.InputStream;
@@ -117,46 +118,38 @@ public class BackoffLimitedRetryHandlerTest {
}
@Test
- void testClosesInputStream() throws InterruptedException, IOException, SecurityException, NoSuchMethodException {
+ void testInputStreamIsNotClosed() throws SecurityException, NoSuchMethodException, IOException {
HttpCommand command = createCommand();
-
- HttpResponse response = HttpResponse.builder().statusCode(400).build();
+ HttpResponse response = HttpResponse.builder().statusCode(500).build();
InputStream inputStream = new InputStream() {
- boolean isOpen = true;
+ int count = 2;
@Override
public void close() {
- this.isOpen = false;
+ fail("The retry handler should not close the response stream");
}
- int count = 1;
-
@Override
public int read() throws IOException {
- if (this.isOpen)
- return (count > -1) ? count-- : -1;
- else
- return -1;
+ return count < 0 ? -1 : --count;
}
@Override
public int available() throws IOException {
- if (this.isOpen)
- return count;
- else
- return 0;
+ return count < 0 ? 0 : count;
}
};
+
response.setPayload(Payloads.newInputStreamPayload(inputStream));
response.getPayload().getContentMetadata().setContentLength(1l);
- assertEquals(response.getPayload().getInput().available(), 1);
- assertEquals(response.getPayload().getInput().read(), 1);
+ assertEquals(response.getPayload().openStream().available(), 2);
+ assertEquals(response.getPayload().openStream().read(), 1);
handler.shouldRetryRequest(command, response);
- assertEquals(response.getPayload().getInput().available(), 0);
- assertEquals(response.getPayload().getInput().read(), -1);
+ assertEquals(response.getPayload().openStream().available(), 1);
+ assertEquals(response.getPayload().openStream().read(), 0);
}
private final Function<Invocation, HttpRequest> processor = ContextBuilder
http://git-wip-us.apache.org/repos/asf/jclouds/blob/ec63b55a/core/src/test/java/org/jclouds/http/internal/BaseHttpCommandExecutorServiceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/jclouds/http/internal/BaseHttpCommandExecutorServiceTest.java b/core/src/test/java/org/jclouds/http/internal/BaseHttpCommandExecutorServiceTest.java
new file mode 100644
index 0000000..8fa399e
--- /dev/null
+++ b/core/src/test/java/org/jclouds/http/internal/BaseHttpCommandExecutorServiceTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.http.internal;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.getCurrentArguments;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.jclouds.http.HttpUtils.closeClientButKeepContentStream;
+import static org.jclouds.http.HttpUtils.releasePayload;
+import static org.jclouds.io.Payloads.newInputStreamPayload;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.inject.Inject;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.jclouds.http.HttpCommand;
+import org.jclouds.http.HttpRequest;
+import org.jclouds.http.HttpResponse;
+import org.jclouds.http.HttpUtils;
+import org.jclouds.http.IOExceptionRetryHandler;
+import org.jclouds.http.handlers.DelegatingErrorHandler;
+import org.jclouds.http.handlers.DelegatingRetryHandler;
+import org.jclouds.io.ContentMetadataCodec;
+import org.jclouds.rest.internal.BaseHttpApiMetadata;
+import org.testng.annotations.Test;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+
+@Test(groups = "unit", testName = "BaseHttpCommandExecutorServiceTest")
+public class BaseHttpCommandExecutorServiceTest {
+
+ public void testStreamIsClosedWhenRetrying() throws IOException {
+ MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes
+ HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build();
+ response.getPayload().getContentMetadata().setContentLength(1l);
+ HttpCommand command = mockHttpCommand();
+
+ DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class);
+ DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class);
+
+ expect(retryHandler.shouldRetryRequest(command, response)).andReturn(true);
+ replay(retryHandler, errorHandler);
+
+ // Verify the stream is open. This consumes one byte.
+ assertEquals(response.getPayload().openStream().available(), 2);
+ assertEquals(response.getPayload().openStream().read(), 1);
+
+ BaseHttpCommandExecutorService<?> service = mockHttpCommandExecutorService(retryHandler, errorHandler);
+ assertTrue(service.shouldContinue(command, response));
+
+ verify(retryHandler, errorHandler);
+
+ // Verify that the response stream is closed and consumed
+ assertFalse(in.isOpen);
+ assertTrue(response.getPayload().openStream() == in); // The service shouldn't have changed it
+ assertEquals(response.getPayload().openStream().available(), 0);
+ assertEquals(response.getPayload().openStream().read(), -1);
+ }
+
+ public void testStreamIsClosedWhenNotRetrying() throws IOException {
+ MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes
+ HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build();
+ response.getPayload().getContentMetadata().setContentLength(1l);
+ HttpCommand command = mockHttpCommand();
+
+ DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class);
+ DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class);
+
+ errorHandler.handleError(command, response);
+ expectLastCall();
+ expect(retryHandler.shouldRetryRequest(command, response)).andReturn(false);
+ replay(retryHandler, errorHandler);
+
+ // Verify the stream is open. This consumes one byte.
+ assertEquals(response.getPayload().openStream().available(), 2);
+ assertEquals(response.getPayload().openStream().read(), 1);
+
+ BaseHttpCommandExecutorService<?> service = mockHttpCommandExecutorService(retryHandler, errorHandler);
+ assertFalse(service.shouldContinue(command, response));
+
+ verify(retryHandler, errorHandler);
+
+ // Verify that the response stream is closed
+ assertFalse(in.isOpen);
+ assertTrue(response.getPayload().openStream() == in);
+ assertEquals(response.getPayload().openStream().available(), 0);
+ assertEquals(response.getPayload().openStream().read(), -1);
+ }
+
+ public void testStreamIsClosedAndBufferedInTheErrorHandlerWhenNotRetrying() throws IOException {
+ MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes
+ HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build();
+ response.getPayload().getContentMetadata().setContentLength(1l);
+ HttpCommand command = mockHttpCommand();
+
+ DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class);
+ DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class);
+
+ errorHandler.handleError(command, response);
+ expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ // This error handler will close the original stream and buffer it into memory
+ HttpResponse response = (HttpResponse) getCurrentArguments()[1];
+ closeClientButKeepContentStream(response);
+ return null;
+ }
+ });
+
+ expect(retryHandler.shouldRetryRequest(command, response)).andReturn(false);
+ replay(retryHandler, errorHandler);
+
+ // Verify the stream is open. This consumes one byte.
+ assertEquals(response.getPayload().openStream().available(), 2);
+ assertEquals(response.getPayload().openStream().read(), 1);
+
+ BaseHttpCommandExecutorService<?> service = mockHttpCommandExecutorService(retryHandler, errorHandler);
+ assertFalse(service.shouldContinue(command, response));
+
+ verify(retryHandler, errorHandler);
+
+ // Verify that the original response stream is closed and consumed
+ assertFalse(in.isOpen);
+ assertEquals(in.available(), 0);
+ assertEquals(in.read(), -1);
+
+ // Verify that the buffered stream is now repeatable and we can read the bytes that still have not
+ // been consumed from the original stream
+ assertTrue(response.getPayload().isRepeatable());
+ assertEquals(response.getPayload().openStream().available(), 1);
+ assertEquals(response.getPayload().openStream().read(), 0);
+ }
+
+ public void testCloseStreamCanBeCalledMoreThanOnce() throws IOException {
+ MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes
+ HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build();
+ response.getPayload().getContentMetadata().setContentLength(1l);
+ HttpCommand command = mockHttpCommand();
+
+ DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class);
+ DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class);
+
+ errorHandler.handleError(command, response);
+ expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ // This error handler will close the original stream
+ HttpResponse response = (HttpResponse) getCurrentArguments()[1];
+ releasePayload(response);
+ return null;
+ }
+ });
+
+ expect(retryHandler.shouldRetryRequest(command, response)).andReturn(false);
+ replay(retryHandler, errorHandler);
+
+ // Verify the stream is open. This consumes one byte.
+ assertEquals(response.getPayload().openStream().available(), 2);
+ assertEquals(response.getPayload().openStream().read(), 1);
+
+ BaseHttpCommandExecutorService<?> service = mockHttpCommandExecutorService(retryHandler, errorHandler);
+ assertFalse(service.shouldContinue(command, response));
+
+ verify(retryHandler, errorHandler);
+
+ // Verify that the response stream is closed and consumed
+ assertFalse(in.isOpen);
+ assertEquals(in.closeCount, 2); // The stream has been closed twice, but the IOException should not propagated
+ assertTrue(response.getPayload().openStream() == in); // The service shouldn't have changed it
+ assertEquals(response.getPayload().openStream().available(), 0);
+ assertEquals(response.getPayload().openStream().read(), -1);
+ }
+
+ private HttpCommand mockHttpCommand() {
+ return new HttpCommand(HttpRequest.builder().endpoint("http://localhost").method("mock").build());
+ }
+
+ private BaseHttpCommandExecutorService<?> mockHttpCommandExecutorService(final DelegatingRetryHandler retryHandler,
+ final DelegatingErrorHandler errorHandler) {
+ Injector injector = Guice.createInjector(new AbstractModule() {
+ @Override
+ protected void configure() {
+ Names.bindProperties(binder(), BaseHttpApiMetadata.defaultProperties());
+ bind(DelegatingRetryHandler.class).toInstance(retryHandler);
+ bind(DelegatingErrorHandler.class).toInstance(errorHandler);
+ bind(BaseHttpCommandExecutorService.class).to(MockHttpCommandExecutorService.class);
+ }
+ });
+
+ return injector.getInstance(BaseHttpCommandExecutorService.class);
+ }
+
+ private static class MockInputStream extends InputStream {
+ boolean isOpen = true;
+ int count;
+ int closeCount = 0;
+
+ public MockInputStream(int count) {
+ this.count = count;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.closeCount++;
+ if (!this.isOpen) {
+ throw new IOException("The stream is already closed");
+ }
+ this.isOpen = false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (this.isOpen)
+ return (count > 0) ? --count : -1;
+ else
+ return -1;
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (this.isOpen)
+ return count;
+ else
+ return 0;
+ }
+
+ }
+
+ private static class MockHttpCommandExecutorService extends BaseHttpCommandExecutorService<Object> {
+
+ @Inject
+ MockHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
+ DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
+ DelegatingErrorHandler errorHandler, HttpWire wire) {
+ super(utils, contentMetadataCodec, retryHandler, ioRetryHandler, errorHandler, wire);
+ }
+
+ @Override
+ protected Object convert(HttpRequest request) throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ protected HttpResponse invoke(Object nativeRequest) throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ protected void cleanup(Object nativeRequest) {
+
+ }
+
+ }
+
+}