You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/14 15:44:27 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1112] Implement a new HttpMethodRetryHandler that allows retr…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 045abc1  [GOBBLIN-1112] Implement a new HttpMethodRetryHandler that allows retr…
045abc1 is described below

commit 045abc17734cd3fdcd40c1317cca3db980ea0095
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Apr 14 08:44:19 2020 -0700

    [GOBBLIN-1112] Implement a new HttpMethodRetryHandler that allows retr…
    
    Closes #2951 from sv2000/httpRetryHandler
---
 .../gobblin/configuration/ConfigurationKeys.java   | 12 ++++-
 .../schemareg/GobblinHttpMethodRetryHandler.java   | 62 ++++++++++++++++++++++
 .../gobblin/kafka/schemareg/HttpClientFactory.java | 21 +++++++-
 .../metrics/kafka/KafkaAvroSchemaRegistry.java     | 20 +++++--
 .../GobblinHttpMethodRetryHandlerTest.java         | 55 +++++++++++++++++++
 .../kafka/schemareg/HttpClientFactoryTest.java     | 33 ++++++++++++
 6 files changed, 197 insertions(+), 6 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 6075725..53bbe3c 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -824,12 +824,22 @@ public class ConfigurationKeys {
   public static final String SHARED_KAFKA_CONFIG_PREFIX = "gobblin.kafka.sharedConfig";
 
   /**
-   * Kafka schema registry
+   * Kafka schema registry HTTP client configuration
    */
   public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT =
       "kafka.schema.registry.httpclient.so.timeout";
   public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT =
       "kafka.schema.registry.httpclient.conn.timeout";
+  public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_COUNT =
+      "kafka.schema.registry.httpclient.methodRetryCount";
+  public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_REQUEST_RETRY_ENABLED =
+      "kafka.schema.registry.httpclient.requestRetryEnabled";
+  public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_HANDLER_CLASS =
+      "kafka.schema.registry.httpclient.methodRetryHandlerClass";
+
+  /**
+   * Kafka schema registry retry configurations
+   */
   public static final String KAFKA_SCHEMA_REGISTRY_RETRY_TIMES = "kafka.schema.registry.retry.times";
   public static final String KAFKA_SCHEMA_REGISTRY_RETRY_INTERVAL_IN_MILLIS =
       "kafka.schema.registry.retry.interval.inMillis";
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandler.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandler.java
new file mode 100644
index 0000000..13ddd27
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.schemareg;
+
+import java.io.IOException;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpMethod;
+
+import org.apache.gobblin.annotation.Alias;
+
+/**
+ * An extension of {@link DefaultHttpMethodRetryHandler} that retries the HTTP request on network errors such as
+ * {@link java.net.UnknownHostException} and {@link java.net.NoRouteToHostException}.
+ */
+@Alias (value = "gobblinhttpretryhandler")
+public class GobblinHttpMethodRetryHandler extends DefaultHttpMethodRetryHandler {
+
+  public GobblinHttpMethodRetryHandler() {
+    this(3, false);
+  }
+
+  public GobblinHttpMethodRetryHandler(int retryCount, boolean requestSentRetryEnabled) {
+    super(retryCount, requestSentRetryEnabled);
+  }
+
+  @Override
+  public boolean retryMethod(final HttpMethod method, final IOException exception, int executionCount) {
+    if (method == null) {
+      throw new IllegalArgumentException("HTTP method may not be null");
+    }
+    if (exception == null) {
+      throw new IllegalArgumentException("Exception parameter may not be null");
+    }
+    if (executionCount > super.getRetryCount()) {
+      // Do not retry if over max retry count
+      return false;
+    }
+    //Override the behavior of DefaultHttpMethodRetryHandler to retry in case of UnknownHostException
+    // and NoRouteToHostException.
+    if (exception instanceof UnknownHostException || exception instanceof NoRouteToHostException) {
+      return true;
+    }
+    return super.retryMethod(method, exception, executionCount);
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
index 028dcf8..4fd0a90 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
@@ -17,13 +17,19 @@
 
 package org.apache.gobblin.kafka.schemareg;
 
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
 import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethodRetryHandler;
+import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.commons.pool2.BasePooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 
 import lombok.Setter;
 
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
 
 /**
  * An implementation of {@link BasePooledObjectFactory} for {@link HttpClient}.
@@ -34,18 +40,29 @@ public class HttpClientFactory extends BasePooledObjectFactory<HttpClient>{
 
   @Setter private int soTimeout = -1;
   @Setter private int connTimeout = -1;
+  @Setter private int httpMethodRetryCount = 3;
+  @Setter private boolean httpRequestSentRetryEnabled = false;
+  @Setter private String httpMethodRetryHandlerClass = DefaultHttpMethodRetryHandler.class.getName();
 
   public HttpClientFactory() {
   }
 
   @Override
-  public HttpClient create() throws Exception {
-
+  public HttpClient create() {
     HttpClient client = new HttpClient();
     if (soTimeout >= 0) {
       client.getParams().setSoTimeout(soTimeout);
     }
 
+    ClassAliasResolver<HttpMethodRetryHandler> aliasResolver = new ClassAliasResolver<>(HttpMethodRetryHandler.class);
+    HttpMethodRetryHandler httpMethodRetryHandler;
+    try {
+      httpMethodRetryHandler = GobblinConstructorUtils.invokeLongestConstructor(aliasResolver.resolveClass(httpMethodRetryHandlerClass), httpMethodRetryCount, httpRequestSentRetryEnabled);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+    client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, httpMethodRetryHandler);
+
     if (connTimeout >= 0) {
       client.getHttpConnectionManager().getParams().setConnectionTimeout(connTimeout);
     }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
index c31c7c8..556c1d1 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
@@ -37,13 +37,13 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.schemareg.HttpClientFactory;
 import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
 import org.apache.gobblin.util.AvroUtils;
 
-import lombok.extern.slf4j.Slf4j;
-
 
 /**
  * An implementation of {@link KafkaSchemaRegistry}.
@@ -68,7 +68,7 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema>
   private final Optional<Map<String, String>> namespaceOverride;
 
   /**
-   * @param properties properties should contain property "kafka.schema.registry.url", and optionally
+   * @param props properties should contain property "kafka.schema.registry.url", and optionally
    * "kafka.schema.registry.max.cache.size" (default = 1000) and
    * "kafka.schema.registry.cache.expire.after.write.min" (default = 10).
    */
@@ -101,6 +101,20 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema>
       factory.setConnTimeout(Integer.parseInt(connTimeout));
     }
 
+    if (this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_COUNT)) {
+      String retryCount = this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_COUNT);
+      factory.setHttpMethodRetryCount(Integer.parseInt(retryCount));
+    }
+
+    if (this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_REQUEST_RETRY_ENABLED)) {
+      String requestRetryEnabled = this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_REQUEST_RETRY_ENABLED);
+      factory.setHttpRequestSentRetryEnabled(Boolean.parseBoolean(requestRetryEnabled));
+    }
+
+    if (this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_HANDLER_CLASS)) {
+      String httpMethodRetryHandlerClass = this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_HANDLER_CLASS);
+      factory.setHttpMethodRetryHandlerClass(httpMethodRetryHandlerClass);
+    }
     this.httpClientPool = new GenericObjectPool<>(factory, config);
   }
 
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandlerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandlerTest.java
new file mode 100644
index 0000000..b0fe997
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandlerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.schemareg;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpMethod;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class GobblinHttpMethodRetryHandlerTest {
+
+  @Test
+  public void testRetryMethod() {
+    GobblinHttpMethodRetryHandler gobblinHttpMethodRetryHandler = new GobblinHttpMethodRetryHandler(1, false);
+    HttpMethod mockHttpMethod = Mockito.mock(HttpMethod.class);
+
+    //GobblinHttpHandler.retryMethod should return true on UnknownHostException
+    Assert.assertTrue(gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new UnknownHostException("dummyException"), 0));
+    Assert.assertTrue(gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new UnknownHostException("dummyException"), 1));
+    //Return false when the retry count is exceeded
+    Assert.assertFalse(gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new UnknownHostException("dummyException"), 2));
+
+    //Ensure the GobblinHttpMethodRetryHandler has the same behavior as the DefaultHttpMethodRetryHandler for a normal
+    //IOException
+    DefaultHttpMethodRetryHandler defaultHttpMethodRetryHandler = new DefaultHttpMethodRetryHandler(1, false);
+    boolean shouldRetryWithGobblinRetryHandler = gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new IOException("dummyException"), 0);
+    boolean shouldRetryWithDefaultRetryHandler = defaultHttpMethodRetryHandler.retryMethod(mockHttpMethod, new IOException("dummyException"), 0);
+    Assert.assertTrue(shouldRetryWithGobblinRetryHandler);
+    Assert.assertEquals(shouldRetryWithDefaultRetryHandler, shouldRetryWithGobblinRetryHandler);
+
+    shouldRetryWithGobblinRetryHandler = gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new IOException("dummyException"), 2);
+    shouldRetryWithDefaultRetryHandler = defaultHttpMethodRetryHandler.retryMethod(mockHttpMethod, new IOException("dummyException"), 2);
+    Assert.assertFalse(shouldRetryWithGobblinRetryHandler);
+    Assert.assertEquals(shouldRetryWithDefaultRetryHandler, shouldRetryWithGobblinRetryHandler);
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/HttpClientFactoryTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/HttpClientFactoryTest.java
new file mode 100644
index 0000000..4d080f4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/HttpClientFactoryTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.schemareg;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class HttpClientFactoryTest {
+
+  @Test
+  public void testCreate() {
+    HttpClientFactory httpClientFactory = new HttpClientFactory();
+    httpClientFactory.setHttpMethodRetryHandlerClass(GobblinHttpMethodRetryHandler.class.getName());
+    HttpClient client = httpClientFactory.create();
+    Assert.assertNotNull(client);
+  }
+}
\ No newline at end of file