You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by el...@apache.org on 2016/06/24 21:18:25 UTC

calcite git commit: [CALCITE-1300] Retry on HTTP-503 in hc-based AvaticaHttpClient

Repository: calcite
Updated Branches:
  refs/heads/master 240cee445 -> 3b5d88e6a


[CALCITE-1300] Retry on HTTP-503 in hc-based AvaticaHttpClient


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/3b5d88e6
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/3b5d88e6
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/3b5d88e6

Branch: refs/heads/master
Commit: 3b5d88e6a717600f2c7c282ae7319875866017ea
Parents: 240cee4
Author: Josh Elser <el...@apache.org>
Authored: Fri Jun 24 16:47:44 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Jun 24 17:13:48 2016 -0400

----------------------------------------------------------------------
 .../remote/AvaticaCommonsHttpClientImpl.java    | 69 ++++++++++-------
 .../AvaticaCommonsHttpClientImplTest.java       | 79 ++++++++++++++++++++
 2 files changed, 120 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/3b5d88e6/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
index ffb20a7..8cd5340 100644
--- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
@@ -20,6 +20,7 @@ import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthSchemeProvider;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.config.AuthSchemes;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -48,6 +49,7 @@ import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -70,7 +72,7 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient,
   private static final String MAX_POOLED_CONNECTIONS_DEFAULT = "100";
 
   protected final HttpHost host;
-  protected final URL url;
+  protected final URI uri;
   protected final HttpProcessor httpProcessor;
   protected final HttpRequestExecutor httpExecutor;
   protected final BasicAuthCache authCache;
@@ -83,7 +85,7 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient,
 
   public AvaticaCommonsHttpClientImpl(URL url) {
     this.host = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
-    this.url = Objects.requireNonNull(url);
+    this.uri = toURI(Objects.requireNonNull(url));
 
     this.httpProcessor = HttpProcessorBuilder.create()
         .add(new RequestContent())
@@ -111,39 +113,50 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient,
   }
 
   public byte[] send(byte[] request) {
-    HttpClientContext context = HttpClientContext.create();
+    while (true) {
+      HttpClientContext context = HttpClientContext.create();
 
-    context.setTargetHost(host);
+      context.setTargetHost(host);
 
-    // Set the credentials if they were provided.
-    if (null != this.credentials) {
-      context.setCredentialsProvider(credentialsProvider);
-      context.setAuthSchemeRegistry(authRegistry);
-      context.setAuthCache(authCache);
-    }
-
-    ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM);
-
-    // Create the client with the AuthSchemeRegistry and manager
-    HttpPost post = new HttpPost(toURI(url));
-    post.setEntity(entity);
-
-    try (CloseableHttpResponse response = client.execute(post, context)) {
-      final int statusCode = response.getStatusLine().getStatusCode();
-      if (HttpURLConnection.HTTP_OK == statusCode
-          || HttpURLConnection.HTTP_INTERNAL_ERROR == statusCode) {
-        return EntityUtils.toByteArray(response.getEntity());
+      // Set the credentials if they were provided.
+      if (null != this.credentials) {
+        context.setCredentialsProvider(credentialsProvider);
+        context.setAuthSchemeRegistry(authRegistry);
+        context.setAuthCache(authCache);
       }
 
-      throw new RuntimeException("Failed to execute HTTP Request, got HTTP/" + statusCode);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (Exception e) {
-      LOG.debug("Failed to execute HTTP request", e);
-      throw new RuntimeException(e);
+      ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM);
+
+      // Create the client with the AuthSchemeRegistry and manager
+      HttpPost post = new HttpPost(uri);
+      post.setEntity(entity);
+
+      try (CloseableHttpResponse response = execute(post, context)) {
+        final int statusCode = response.getStatusLine().getStatusCode();
+        if (HttpURLConnection.HTTP_OK == statusCode
+            || HttpURLConnection.HTTP_INTERNAL_ERROR == statusCode) {
+          return EntityUtils.toByteArray(response.getEntity());
+        } else if (HttpURLConnection.HTTP_UNAVAILABLE == statusCode) {
+          LOG.debug("Failed to connect to server (HTTP/503), retrying");
+          continue;
+        }
+
+        throw new RuntimeException("Failed to execute HTTP Request, got HTTP/" + statusCode);
+      } catch (RuntimeException e) {
+        throw e;
+      } catch (Exception e) {
+        LOG.debug("Failed to execute HTTP request", e);
+        throw new RuntimeException(e);
+      }
     }
   }
 
+  // Visible for testing
+  CloseableHttpResponse execute(HttpPost post, HttpClientContext context)
+      throws IOException, ClientProtocolException {
+    return client.execute(post, context);
+  }
+
   @Override public void setUsernamePassword(AuthenticationType authType, String username,
       String password) {
     this.credentials = new UsernamePasswordCredentials(

http://git-wip-us.apache.org/repos/asf/calcite/blob/3b5d88e6/avatica/core/src/test/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImplTest.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/test/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImplTest.java b/avatica/core/src/test/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImplTest.java
new file mode 100644
index 0000000..b7c49a3
--- /dev/null
+++ b/avatica/core/src/test/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImplTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.entity.StringEntity;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.net.HttpURLConnection;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Test class for {@link AvaticaCommonsHttpClientImpl}
+ */
+public class AvaticaCommonsHttpClientImplTest {
+
+  @Test public void testRetryOnHttp503() throws Exception {
+    final byte[] requestBytes = "fake_request".getBytes(UTF_8);
+    final CloseableHttpResponse badResponse = mock(CloseableHttpResponse.class);
+    final CloseableHttpResponse goodResponse = mock(CloseableHttpResponse.class);
+    final StatusLine badStatusLine = mock(StatusLine.class);
+    final StatusLine goodStatusLine = mock(StatusLine.class);
+    final StringEntity responseEntity = new StringEntity("success");
+    final Answer<CloseableHttpResponse> failThenSucceed = new Answer<CloseableHttpResponse>() {
+      private int iteration = 0;
+      @Override public CloseableHttpResponse answer(InvocationOnMock invocation) throws Throwable {
+        iteration++;
+        if (1 == iteration) {
+          return badResponse;
+        } else {
+          return goodResponse;
+        }
+      }
+    };
+
+    final AvaticaCommonsHttpClientImpl client = mock(AvaticaCommonsHttpClientImpl.class);
+
+    when(client.send(any(byte[].class))).thenCallRealMethod();
+    when(client.execute(any(HttpPost.class), any(HttpClientContext.class))).then(failThenSucceed);
+
+    when(badResponse.getStatusLine()).thenReturn(badStatusLine);
+    when(badStatusLine.getStatusCode()).thenReturn(HttpURLConnection.HTTP_UNAVAILABLE);
+
+    when(goodResponse.getStatusLine()).thenReturn(goodStatusLine);
+    when(goodStatusLine.getStatusCode()).thenReturn(HttpURLConnection.HTTP_OK);
+    when(goodResponse.getEntity()).thenReturn(responseEntity);
+
+    byte[] responseBytes = client.send(requestBytes);
+    assertEquals("success", new String(responseBytes, UTF_8));
+  }
+}
+
+// End AvaticaCommonsHttpClientImplTest.java