You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jclouds.apache.org by na...@apache.org on 2015/03/10 22:06:18 UTC

jclouds git commit: JCLOUDS-532: Properly close HTTP streams

Repository: jclouds
Updated Branches:
  refs/heads/master 9f9394152 -> ec63b55a0


JCLOUDS-532: Properly close HTTP streams


Project: http://git-wip-us.apache.org/repos/asf/jclouds/repo
Commit: http://git-wip-us.apache.org/repos/asf/jclouds/commit/ec63b55a
Tree: http://git-wip-us.apache.org/repos/asf/jclouds/tree/ec63b55a
Diff: http://git-wip-us.apache.org/repos/asf/jclouds/diff/ec63b55a

Branch: refs/heads/master
Commit: ec63b55a04afdd5a970d8b05674deb4c3802c59c
Parents: 9f93941
Author: Ignasi Barrera <na...@apache.org>
Authored: Mon Mar 9 12:03:18 2015 +0100
Committer: Ignasi Barrera <na...@apache.org>
Committed: Tue Mar 10 21:28:14 2015 +0100

----------------------------------------------------------------------
 .../handlers/BackoffLimitedRetryHandler.java    |   2 -
 .../BaseHttpCommandExecutorService.java         |   9 +-
 .../JavaUrlHttpCommandExecutorService.java      |  16 +-
 .../BackoffLimitedRetryHandlerTest.java         |  31 +-
 .../BaseHttpCommandExecutorServiceTest.java     | 280 +++++++++++++++++++
 5 files changed, 301 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jclouds/blob/ec63b55a/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java b/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java
index 481cde2..978406c 100644
--- a/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java
+++ b/core/src/main/java/org/jclouds/http/handlers/BackoffLimitedRetryHandler.java
@@ -17,7 +17,6 @@
 package org.jclouds.http.handlers;
 
 import static java.lang.Math.max;
-import static org.jclouds.http.HttpUtils.releasePayload;
 
 import java.io.IOException;
 import java.util.Random;
@@ -95,7 +94,6 @@ public class BackoffLimitedRetryHandler implements HttpRetryHandler, IOException
    }
 
    public boolean shouldRetryRequest(HttpCommand command, HttpResponse response) {
-      releasePayload(response);
       return ifReplayableBackoffAndReturnTrue(command);
    }
 

http://git-wip-us.apache.org/repos/asf/jclouds/blob/ec63b55a/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java b/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java
index 1eda6a9..e770658 100644
--- a/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java
+++ b/core/src/main/java/org/jclouds/http/internal/BaseHttpCommandExecutorService.java
@@ -19,6 +19,7 @@ package org.jclouds.http.internal;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Throwables.propagate;
 import static org.jclouds.http.HttpUtils.checkRequestHasContentLengthOrChunkedEncoding;
+import static org.jclouds.http.HttpUtils.releasePayload;
 import static org.jclouds.http.HttpUtils.wirePayloadIfEnabled;
 import static org.jclouds.util.Throwables2.getFirstThrowableOfType;
 
@@ -42,6 +43,8 @@ import org.jclouds.http.handlers.DelegatingRetryHandler;
 import org.jclouds.io.ContentMetadataCodec;
 import org.jclouds.logging.Logger;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandExecutorService {
    protected final HttpUtils utils;
    protected final ContentMetadataCodec contentMetadataCodec;
@@ -120,13 +123,17 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
       return response;
    }
 
-   private boolean shouldContinue(HttpCommand command, HttpResponse response) {
+   @VisibleForTesting
+   boolean shouldContinue(HttpCommand command, HttpResponse response) {
       boolean shouldContinue = false;
       if (retryHandler.shouldRetryRequest(command, response)) {
          shouldContinue = true;
       } else {
          errorHandler.handleError(command, response);
       }
+      // At this point we are going to send a new request or we have just handled the error, so
+      // we should make sure that any open stream is closed.
+      releasePayload(response);
       return shouldContinue;
    }
 

http://git-wip-us.apache.org/repos/asf/jclouds/blob/ec63b55a/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java b/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java
index bafe7f3..ce36149 100644
--- a/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java
+++ b/core/src/main/java/org/jclouds/http/internal/JavaUrlHttpCommandExecutorService.java
@@ -18,7 +18,6 @@ package org.jclouds.http.internal;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Throwables.propagate;
-import static com.google.common.io.ByteStreams.toByteArray;
 import static com.google.common.net.HttpHeaders.CONTENT_LENGTH;
 import static com.google.common.net.HttpHeaders.HOST;
 import static com.google.common.net.HttpHeaders.USER_AGENT;
@@ -26,7 +25,6 @@ import static org.jclouds.http.HttpUtils.filterOutContentHeaders;
 import static org.jclouds.io.Payloads.newInputStreamPayload;
 import static org.jclouds.util.Closeables2.closeQuietly;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Field;
@@ -96,7 +94,7 @@ public class JavaUrlHttpCommandExecutorService extends BaseHttpCommandExecutorSe
       try {
          in = connection.getInputStream();
       } catch (IOException e) {
-         in = bufferAndCloseStream(connection.getErrorStream());
+         in = connection.getErrorStream();
       } catch (RuntimeException e) {
          closeQuietly(in);
          throw e;
@@ -127,18 +125,6 @@ public class JavaUrlHttpCommandExecutorService extends BaseHttpCommandExecutorSe
       return builder.build();
    }
 
-   private InputStream bufferAndCloseStream(InputStream inputStream) throws IOException {
-      InputStream in = null;
-      try {
-         if (inputStream != null) {
-            in = new ByteArrayInputStream(toByteArray(inputStream));
-         }
-      } finally {
-         closeQuietly(inputStream);
-      }
-      return in;
-   }
-
    @Override
    protected HttpURLConnection convert(HttpRequest request) throws IOException, InterruptedException {
       boolean chunked = "chunked".equals(request.getFirstHeaderOrNull("Transfer-Encoding"));

http://git-wip-us.apache.org/repos/asf/jclouds/blob/ec63b55a/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java b/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java
index 4e2bfe6..cc25c27 100644
--- a/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java
+++ b/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java
@@ -19,6 +19,7 @@ package org.jclouds.http.handlers;
 import static org.jclouds.reflect.Reflection2.method;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -117,46 +118,38 @@ public class BackoffLimitedRetryHandlerTest {
    }
 
    @Test
-   void testClosesInputStream() throws InterruptedException, IOException, SecurityException, NoSuchMethodException {
+   void testInputStreamIsNotClosed() throws SecurityException, NoSuchMethodException, IOException {
       HttpCommand command = createCommand();
-
-      HttpResponse response = HttpResponse.builder().statusCode(400).build();
+      HttpResponse response = HttpResponse.builder().statusCode(500).build();
 
       InputStream inputStream = new InputStream() {
-         boolean isOpen = true;
+         int count = 2;
 
          @Override
          public void close() {
-            this.isOpen = false;
+            fail("The retry handler should not close the response stream");
          }
 
-         int count = 1;
-
          @Override
          public int read() throws IOException {
-            if (this.isOpen)
-               return (count > -1) ? count-- : -1;
-            else
-               return -1;
+            return count < 0 ? -1 : --count;
          }
 
          @Override
          public int available() throws IOException {
-            if (this.isOpen)
-               return count;
-            else
-               return 0;
+            return count < 0 ? 0 : count;
          }
       };
+
       response.setPayload(Payloads.newInputStreamPayload(inputStream));
       response.getPayload().getContentMetadata().setContentLength(1l);
-      assertEquals(response.getPayload().getInput().available(), 1);
-      assertEquals(response.getPayload().getInput().read(), 1);
+      assertEquals(response.getPayload().openStream().available(), 2);
+      assertEquals(response.getPayload().openStream().read(), 1);
 
       handler.shouldRetryRequest(command, response);
 
-      assertEquals(response.getPayload().getInput().available(), 0);
-      assertEquals(response.getPayload().getInput().read(), -1);
+      assertEquals(response.getPayload().openStream().available(), 1);
+      assertEquals(response.getPayload().openStream().read(), 0);
    }
 
    private final Function<Invocation, HttpRequest> processor = ContextBuilder

http://git-wip-us.apache.org/repos/asf/jclouds/blob/ec63b55a/core/src/test/java/org/jclouds/http/internal/BaseHttpCommandExecutorServiceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/jclouds/http/internal/BaseHttpCommandExecutorServiceTest.java b/core/src/test/java/org/jclouds/http/internal/BaseHttpCommandExecutorServiceTest.java
new file mode 100644
index 0000000..8fa399e
--- /dev/null
+++ b/core/src/test/java/org/jclouds/http/internal/BaseHttpCommandExecutorServiceTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.http.internal;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.getCurrentArguments;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.jclouds.http.HttpUtils.closeClientButKeepContentStream;
+import static org.jclouds.http.HttpUtils.releasePayload;
+import static org.jclouds.io.Payloads.newInputStreamPayload;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.inject.Inject;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.jclouds.http.HttpCommand;
+import org.jclouds.http.HttpRequest;
+import org.jclouds.http.HttpResponse;
+import org.jclouds.http.HttpUtils;
+import org.jclouds.http.IOExceptionRetryHandler;
+import org.jclouds.http.handlers.DelegatingErrorHandler;
+import org.jclouds.http.handlers.DelegatingRetryHandler;
+import org.jclouds.io.ContentMetadataCodec;
+import org.jclouds.rest.internal.BaseHttpApiMetadata;
+import org.testng.annotations.Test;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+
+@Test(groups = "unit", testName = "BaseHttpCommandExecutorServiceTest")
+public class BaseHttpCommandExecutorServiceTest {
+
+   public void testStreamIsClosedWhenRetrying() throws IOException {
+      MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes
+      HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build();
+      response.getPayload().getContentMetadata().setContentLength(1l);
+      HttpCommand command = mockHttpCommand();
+
+      DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class);
+      DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class);
+
+      expect(retryHandler.shouldRetryRequest(command, response)).andReturn(true);
+      replay(retryHandler, errorHandler);
+
+      // Verify the stream is open. This consumes one byte.
+      assertEquals(response.getPayload().openStream().available(), 2);
+      assertEquals(response.getPayload().openStream().read(), 1);
+
+      BaseHttpCommandExecutorService<?> service = mockHttpCommandExecutorService(retryHandler, errorHandler);
+      assertTrue(service.shouldContinue(command, response));
+
+      verify(retryHandler, errorHandler);
+
+      // Verify that the response stream is closed and consumed
+      assertFalse(in.isOpen);
+      assertTrue(response.getPayload().openStream() == in); // The service shouldn't have changed it
+      assertEquals(response.getPayload().openStream().available(), 0);
+      assertEquals(response.getPayload().openStream().read(), -1);
+   }
+
+   public void testStreamIsClosedWhenNotRetrying() throws IOException {
+      MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes
+      HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build();
+      response.getPayload().getContentMetadata().setContentLength(1l);
+      HttpCommand command = mockHttpCommand();
+
+      DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class);
+      DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class);
+
+      errorHandler.handleError(command, response);
+      expectLastCall();
+      expect(retryHandler.shouldRetryRequest(command, response)).andReturn(false);
+      replay(retryHandler, errorHandler);
+
+      // Verify the stream is open. This consumes one byte.
+      assertEquals(response.getPayload().openStream().available(), 2);
+      assertEquals(response.getPayload().openStream().read(), 1);
+
+      BaseHttpCommandExecutorService<?> service = mockHttpCommandExecutorService(retryHandler, errorHandler);
+      assertFalse(service.shouldContinue(command, response));
+
+      verify(retryHandler, errorHandler);
+
+      // Verify that the response stream is closed
+      assertFalse(in.isOpen);
+      assertTrue(response.getPayload().openStream() == in);
+      assertEquals(response.getPayload().openStream().available(), 0);
+      assertEquals(response.getPayload().openStream().read(), -1);
+   }
+
+   public void testStreamIsClosedAndBufferedInTheErrorHandlerWhenNotRetrying() throws IOException {
+      MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes
+      HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build();
+      response.getPayload().getContentMetadata().setContentLength(1l);
+      HttpCommand command = mockHttpCommand();
+
+      DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class);
+      DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class);
+
+      errorHandler.handleError(command, response);
+      expectLastCall().andAnswer(new IAnswer<Void>() {
+         @Override
+         public Void answer() throws Throwable {
+            // This error handler will close the original stream and buffer it into memory
+            HttpResponse response = (HttpResponse) getCurrentArguments()[1];
+            closeClientButKeepContentStream(response);
+            return null;
+         }
+      });
+
+      expect(retryHandler.shouldRetryRequest(command, response)).andReturn(false);
+      replay(retryHandler, errorHandler);
+
+      // Verify the stream is open. This consumes one byte.
+      assertEquals(response.getPayload().openStream().available(), 2);
+      assertEquals(response.getPayload().openStream().read(), 1);
+
+      BaseHttpCommandExecutorService<?> service = mockHttpCommandExecutorService(retryHandler, errorHandler);
+      assertFalse(service.shouldContinue(command, response));
+
+      verify(retryHandler, errorHandler);
+
+      // Verify that the original response stream is closed and consumed
+      assertFalse(in.isOpen);
+      assertEquals(in.available(), 0);
+      assertEquals(in.read(), -1);
+
+      // Verify that the buffered stream is now repeatable and we can read the bytes that still have not
+      // been consumed from the original stream
+      assertTrue(response.getPayload().isRepeatable());
+      assertEquals(response.getPayload().openStream().available(), 1);
+      assertEquals(response.getPayload().openStream().read(), 0);
+   }
+
+   public void testCloseStreamCanBeCalledMoreThanOnce() throws IOException {
+      MockInputStream in = new MockInputStream(2); // Input stream that produces 2 bytes
+      HttpResponse response = HttpResponse.builder().payload(newInputStreamPayload(in)).build();
+      response.getPayload().getContentMetadata().setContentLength(1l);
+      HttpCommand command = mockHttpCommand();
+
+      DelegatingRetryHandler retryHandler = EasyMock.createMock(DelegatingRetryHandler.class);
+      DelegatingErrorHandler errorHandler = EasyMock.createMock(DelegatingErrorHandler.class);
+
+      errorHandler.handleError(command, response);
+      expectLastCall().andAnswer(new IAnswer<Void>() {
+         @Override
+         public Void answer() throws Throwable {
+            // This error handler will close the original stream
+            HttpResponse response = (HttpResponse) getCurrentArguments()[1];
+            releasePayload(response);
+            return null;
+         }
+      });
+
+      expect(retryHandler.shouldRetryRequest(command, response)).andReturn(false);
+      replay(retryHandler, errorHandler);
+
+      // Verify the stream is open. This consumes one byte.
+      assertEquals(response.getPayload().openStream().available(), 2);
+      assertEquals(response.getPayload().openStream().read(), 1);
+
+      BaseHttpCommandExecutorService<?> service = mockHttpCommandExecutorService(retryHandler, errorHandler);
+      assertFalse(service.shouldContinue(command, response));
+
+      verify(retryHandler, errorHandler);
+
+      // Verify that the response stream is closed and consumed
+      assertFalse(in.isOpen);
+      assertEquals(in.closeCount, 2); // The stream has been closed twice, but the IOException should not propagated
+      assertTrue(response.getPayload().openStream() == in); // The service shouldn't have changed it
+      assertEquals(response.getPayload().openStream().available(), 0);
+      assertEquals(response.getPayload().openStream().read(), -1);
+   }
+
+   private HttpCommand mockHttpCommand() {
+      return new HttpCommand(HttpRequest.builder().endpoint("http://localhost").method("mock").build());
+   }
+   
+   private BaseHttpCommandExecutorService<?> mockHttpCommandExecutorService(final DelegatingRetryHandler retryHandler,
+         final DelegatingErrorHandler errorHandler) {
+      Injector injector = Guice.createInjector(new AbstractModule() {
+         @Override
+         protected void configure() {
+            Names.bindProperties(binder(), BaseHttpApiMetadata.defaultProperties());
+            bind(DelegatingRetryHandler.class).toInstance(retryHandler);
+            bind(DelegatingErrorHandler.class).toInstance(errorHandler);
+            bind(BaseHttpCommandExecutorService.class).to(MockHttpCommandExecutorService.class);
+         }
+      });
+
+      return injector.getInstance(BaseHttpCommandExecutorService.class);
+   }
+
+   private static class MockInputStream extends InputStream {
+      boolean isOpen = true;
+      int count;
+      int closeCount = 0;
+
+      public MockInputStream(int count) {
+         this.count = count;
+      }
+
+      @Override
+      public void close() throws IOException {
+         this.closeCount++;
+         if (!this.isOpen) {
+            throw new IOException("The stream is already closed");
+         }
+         this.isOpen = false;
+      }
+
+      @Override
+      public int read() throws IOException {
+         if (this.isOpen)
+            return (count > 0) ? --count : -1;
+         else
+            return -1;
+      }
+
+      @Override
+      public int available() throws IOException {
+         if (this.isOpen)
+            return count;
+         else
+            return 0;
+      }
+
+   }
+
+   private static class MockHttpCommandExecutorService extends BaseHttpCommandExecutorService<Object> {
+
+      @Inject
+      MockHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
+            DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
+            DelegatingErrorHandler errorHandler, HttpWire wire) {
+         super(utils, contentMetadataCodec, retryHandler, ioRetryHandler, errorHandler, wire);
+      }
+
+      @Override
+      protected Object convert(HttpRequest request) throws IOException, InterruptedException {
+         return null;
+      }
+
+      @Override
+      protected HttpResponse invoke(Object nativeRequest) throws IOException, InterruptedException {
+         return null;
+      }
+
+      @Override
+      protected void cleanup(Object nativeRequest) {
+
+      }
+
+   }
+
+}