You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/14 15:44:27 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1112] Implement a new HttpMethodRetryHandler that allows retr…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 045abc1 [GOBBLIN-1112] Implement a new HttpMethodRetryHandler that allows retr…
045abc1 is described below
commit 045abc17734cd3fdcd40c1317cca3db980ea0095
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Apr 14 08:44:19 2020 -0700
[GOBBLIN-1112] Implement a new HttpMethodRetryHandler that allows retr…
Closes #2951 from sv2000/httpRetryHandler
---
.../gobblin/configuration/ConfigurationKeys.java | 12 ++++-
.../schemareg/GobblinHttpMethodRetryHandler.java | 62 ++++++++++++++++++++++
.../gobblin/kafka/schemareg/HttpClientFactory.java | 21 +++++++-
.../metrics/kafka/KafkaAvroSchemaRegistry.java | 20 +++++--
.../GobblinHttpMethodRetryHandlerTest.java | 55 +++++++++++++++++++
.../kafka/schemareg/HttpClientFactoryTest.java | 33 ++++++++++++
6 files changed, 197 insertions(+), 6 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 6075725..53bbe3c 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -824,12 +824,22 @@ public class ConfigurationKeys {
public static final String SHARED_KAFKA_CONFIG_PREFIX = "gobblin.kafka.sharedConfig";
/**
- * Kafka schema registry
+ * Kafka schema registry HTTP client configuration
*/
public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT =
"kafka.schema.registry.httpclient.so.timeout";
public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT =
"kafka.schema.registry.httpclient.conn.timeout";
+ public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_COUNT =
+ "kafka.schema.registry.httpclient.methodRetryCount";
+ public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_REQUEST_RETRY_ENABLED =
+ "kafka.schema.registry.httpclient.requestRetryEnabled";
+ public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_HANDLER_CLASS =
+ "kafka.schema.registry.httpclient.methodRetryHandlerClass";
+
+ /**
+ * Kafka schema registry retry configurations
+ */
public static final String KAFKA_SCHEMA_REGISTRY_RETRY_TIMES = "kafka.schema.registry.retry.times";
public static final String KAFKA_SCHEMA_REGISTRY_RETRY_INTERVAL_IN_MILLIS =
"kafka.schema.registry.retry.interval.inMillis";
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandler.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandler.java
new file mode 100644
index 0000000..13ddd27
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.schemareg;
+
+import java.io.IOException;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpMethod;
+
+import org.apache.gobblin.annotation.Alias;
+
+/**
+ * An extension of {@link DefaultHttpMethodRetryHandler} that retries the HTTP request on network errors such as
+ * {@link java.net.UnknownHostException} and {@link java.net.NoRouteToHostException}.
+ */
+@Alias (value = "gobblinhttpretryhandler")
+public class GobblinHttpMethodRetryHandler extends DefaultHttpMethodRetryHandler {
+
+ public GobblinHttpMethodRetryHandler() {
+ this(3, false);
+ }
+
+ public GobblinHttpMethodRetryHandler(int retryCount, boolean requestSentRetryEnabled) {
+ super(retryCount, requestSentRetryEnabled);
+ }
+
+ @Override
+ public boolean retryMethod(final HttpMethod method, final IOException exception, int executionCount) {
+ if (method == null) {
+ throw new IllegalArgumentException("HTTP method may not be null");
+ }
+ if (exception == null) {
+ throw new IllegalArgumentException("Exception parameter may not be null");
+ }
+ if (executionCount > super.getRetryCount()) {
+ // Do not retry if over max retry count
+ return false;
+ }
+ //Override the behavior of DefaultHttpMethodRetryHandler to retry in case of UnknownHostException
+ // and NoRouteToHostException.
+ if (exception instanceof UnknownHostException || exception instanceof NoRouteToHostException) {
+ return true;
+ }
+ return super.retryMethod(method, exception, executionCount);
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
index 028dcf8..4fd0a90 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/HttpClientFactory.java
@@ -17,13 +17,19 @@
package org.apache.gobblin.kafka.schemareg;
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethodRetryHandler;
+import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import lombok.Setter;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
/**
* An implementation of {@link BasePooledObjectFactory} for {@link HttpClient}.
@@ -34,18 +40,29 @@ public class HttpClientFactory extends BasePooledObjectFactory<HttpClient>{
@Setter private int soTimeout = -1;
@Setter private int connTimeout = -1;
+ @Setter private int httpMethodRetryCount = 3;
+ @Setter private boolean httpRequestSentRetryEnabled = false;
+ @Setter private String httpMethodRetryHandlerClass = DefaultHttpMethodRetryHandler.class.getName();
public HttpClientFactory() {
}
@Override
- public HttpClient create() throws Exception {
-
+ public HttpClient create() {
HttpClient client = new HttpClient();
if (soTimeout >= 0) {
client.getParams().setSoTimeout(soTimeout);
}
+ ClassAliasResolver<HttpMethodRetryHandler> aliasResolver = new ClassAliasResolver<>(HttpMethodRetryHandler.class);
+ HttpMethodRetryHandler httpMethodRetryHandler;
+ try {
+ httpMethodRetryHandler = GobblinConstructorUtils.invokeLongestConstructor(aliasResolver.resolveClass(httpMethodRetryHandlerClass), httpMethodRetryCount, httpRequestSentRetryEnabled);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, httpMethodRetryHandler);
+
if (connTimeout >= 0) {
client.getHttpConnectionManager().getParams().setConnectionTimeout(connTimeout);
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
index c31c7c8..556c1d1 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
@@ -37,13 +37,13 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.schemareg.HttpClientFactory;
import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
import org.apache.gobblin.util.AvroUtils;
-import lombok.extern.slf4j.Slf4j;
-
/**
* An implementation of {@link KafkaSchemaRegistry}.
@@ -68,7 +68,7 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema>
private final Optional<Map<String, String>> namespaceOverride;
/**
- * @param properties properties should contain property "kafka.schema.registry.url", and optionally
+ * @param props properties should contain property "kafka.schema.registry.url", and optionally
* "kafka.schema.registry.max.cache.size" (default = 1000) and
* "kafka.schema.registry.cache.expire.after.write.min" (default = 10).
*/
@@ -101,6 +101,20 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema>
factory.setConnTimeout(Integer.parseInt(connTimeout));
}
+ if (this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_COUNT)) {
+ String retryCount = this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_COUNT);
+ factory.setHttpMethodRetryCount(Integer.parseInt(retryCount));
+ }
+
+ if (this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_REQUEST_RETRY_ENABLED)) {
+ String requestRetryEnabled = this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_REQUEST_RETRY_ENABLED);
+ factory.setHttpRequestSentRetryEnabled(Boolean.parseBoolean(requestRetryEnabled));
+ }
+
+ if (this.props.containsKey(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_HANDLER_CLASS)) {
+ String httpMethodRetryHandlerClass = this.props.getProperty(ConfigurationKeys.KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_METHOD_RETRY_HANDLER_CLASS);
+ factory.setHttpMethodRetryHandlerClass(httpMethodRetryHandlerClass);
+ }
this.httpClientPool = new GenericObjectPool<>(factory, config);
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandlerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandlerTest.java
new file mode 100644
index 0000000..b0fe997
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/GobblinHttpMethodRetryHandlerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.schemareg;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpMethod;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class GobblinHttpMethodRetryHandlerTest {
+
+ @Test
+ public void testRetryMethod() {
+ GobblinHttpMethodRetryHandler gobblinHttpMethodRetryHandler = new GobblinHttpMethodRetryHandler(1, false);
+ HttpMethod mockHttpMethod = Mockito.mock(HttpMethod.class);
+
+ //GobblinHttpHandler.retryMethod should return true on UnknownHostException
+ Assert.assertTrue(gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new UnknownHostException("dummyException"), 0));
+ Assert.assertTrue(gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new UnknownHostException("dummyException"), 1));
+ //Return false when the retry count is exceeded
+ Assert.assertFalse(gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new UnknownHostException("dummyException"), 2));
+
+ //Ensure the GobblinHttpMethodRetryHandler has the same behavior as the DefaultHttpMethodRetryHandler for a normal
+ //IOException
+ DefaultHttpMethodRetryHandler defaultHttpMethodRetryHandler = new DefaultHttpMethodRetryHandler(1, false);
+ boolean shouldRetryWithGobblinRetryHandler = gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new IOException("dummyException"), 0);
+ boolean shouldRetryWithDefaultRetryHandler = defaultHttpMethodRetryHandler.retryMethod(mockHttpMethod, new IOException("dummyException"), 0);
+ Assert.assertTrue(shouldRetryWithGobblinRetryHandler);
+ Assert.assertEquals(shouldRetryWithDefaultRetryHandler, shouldRetryWithGobblinRetryHandler);
+
+ shouldRetryWithGobblinRetryHandler = gobblinHttpMethodRetryHandler.retryMethod(mockHttpMethod, new IOException("dummyException"), 2);
+ shouldRetryWithDefaultRetryHandler = defaultHttpMethodRetryHandler.retryMethod(mockHttpMethod, new IOException("dummyException"), 2);
+ Assert.assertFalse(shouldRetryWithGobblinRetryHandler);
+ Assert.assertEquals(shouldRetryWithDefaultRetryHandler, shouldRetryWithGobblinRetryHandler);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/HttpClientFactoryTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/HttpClientFactoryTest.java
new file mode 100644
index 0000000..4d080f4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/schemareg/HttpClientFactoryTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.schemareg;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class HttpClientFactoryTest {
+
+ @Test
+ public void testCreate() {
+ HttpClientFactory httpClientFactory = new HttpClientFactory();
+ httpClientFactory.setHttpMethodRetryHandlerClass(GobblinHttpMethodRetryHandler.class.getName());
+ HttpClient client = httpClientFactory.create();
+ Assert.assertNotNull(client);
+ }
+}
\ No newline at end of file