You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/08/09 23:45:28 UTC
hive git commit: HIVE-20353 : Follow redirects when hive connects to
a passive druid overlord/coordinator (Nishant Bangarwa via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 689423981 -> 3dc736fcf
HIVE-20353 : Follow redirects when hive connects to a passive druid overlord/coordinator (Nishant Bangarwa via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3dc736fc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3dc736fc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3dc736fc
Branch: refs/heads/master
Commit: 3dc736fcf9580d272b61bc83dd3422355b44fea9
Parents: 6894239
Author: Nishant Bangarwa <ni...@gmail.com>
Authored: Thu Aug 9 16:44:56 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu Aug 9 16:44:56 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/druid/DruidStorageHandler.java | 143 ++++++++++++-------
.../hive/druid/DruidStorageHandlerUtils.java | 41 ++++++
2 files changed, 129 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3dc736fc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 53d93e1..9f34b7b 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -33,9 +33,8 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.http.client.Request;
-import com.metamx.http.client.response.StatusResponseHandler;
-import com.metamx.http.client.response.StatusResponseHolder;
-
+import com.metamx.http.client.response.FullResponseHandler;
+import com.metamx.http.client.response.FullResponseHolder;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
@@ -60,7 +59,6 @@ import io.druid.segment.loading.SegmentLoadingException;
import io.druid.storage.hdfs.HdfsDataSegmentPusher;
import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
import io.druid.timeline.DataSegment;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -108,6 +106,7 @@ import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
@@ -120,10 +119,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
-
import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
/**
@@ -407,14 +405,16 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
String task = JSON_MAPPER.writeValueAsString(spec);
console.printInfo("submitting kafka Spec {}", task);
LOG.info("submitting kafka Supervisor Spec {}", task);
-
- StatusResponseHolder response = getHttpClient().go(new Request(HttpMethod.POST,
- new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress)))
- .setContent(
- "application/json",
- JSON_MAPPER.writeValueAsBytes(spec)),
- new StatusResponseHandler(
- Charset.forName("UTF-8"))).get();
+ FullResponseHolder response = DruidStorageHandlerUtils
+ .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.POST,
+ new URL(String
+ .format("http://%s/druid/indexer/v1/supervisor", overlordAddress))
+ )
+ .setContent(
+ "application/json",
+ JSON_MAPPER.writeValueAsBytes(spec)
+ ), new FullResponseHandler(
+ Charset.forName("UTF-8")));
if (response.getStatus().equals(HttpResponseStatus.OK)) {
String msg = String.format("Kafka Supervisor for [%s] Submitted Successfully to druid.", spec.getDataSchema().getDataSource());
LOG.info(msg);
@@ -431,16 +431,22 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
private void resetKafkaIngestion(String overlordAddress, String dataSourceName) {
try {
- StatusResponseHolder response = RetryUtils
- .retry(() -> getHttpClient().go(new Request(HttpMethod.POST,
- new URL(String
- .format("http://%s/druid/indexer/v1/supervisor/%s/reset", overlordAddress,
- dataSourceName))),
- new StatusResponseHandler(
- Charset.forName("UTF-8"))).get(),
- input -> input instanceof IOException,
- getMaxRetryCount());
- if (response.getStatus().equals(HttpResponseStatus.OK)) {
+ FullResponseHolder response = RetryUtils
+ .retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(
+ getHttpClient(),
+ new Request(HttpMethod.POST,
+ new URL(String
+ .format("http://%s/druid/indexer/v1/supervisor/%s/reset",
+ overlordAddress,
+ dataSourceName
+ ))
+ ), new FullResponseHandler(
+ Charset.forName("UTF-8"))
+ ),
+ input -> input instanceof IOException,
+ getMaxRetryCount()
+ );
+ if(response.getStatus().equals(HttpResponseStatus.OK)) {
console.printInfo("Druid Kafka Ingestion Reset successful.");
} else {
throw new IOException(String
@@ -454,15 +460,21 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
private void stopKafkaIngestion(String overlordAddress, String dataSourceName) {
try {
- StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient()
- .go(new Request(HttpMethod.POST,
- new URL(String
- .format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", overlordAddress,
- dataSourceName))),
- new StatusResponseHandler(
- Charset.forName("UTF-8"))).get(),
- input -> input instanceof IOException,
- getMaxRetryCount());
+ FullResponseHolder response = RetryUtils
+ .retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(
+ getHttpClient(),
+ new Request(HttpMethod.POST,
+ new URL(String
+ .format("http://%s/druid/indexer/v1/supervisor/%s/shutdown",
+ overlordAddress,
+ dataSourceName
+ ))
+ ), new FullResponseHandler(
+ Charset.forName("UTF-8"))
+ ),
+ input -> input instanceof IOException,
+ getMaxRetryCount()
+ );
if (response.getStatus().equals(HttpResponseStatus.OK)) {
console.printInfo("Druid Kafka Ingestion shutdown successful.");
} else {
@@ -485,15 +497,22 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
.checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE),
"Druid Datasource name is null");
try {
- StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient().go(new Request(HttpMethod.GET,
- new URL(String
- .format("http://%s/druid/indexer/v1/supervisor/%s", overlordAddress,
- dataSourceName))),
- new StatusResponseHandler(
- Charset.forName("UTF-8"))).get(),
- input -> input instanceof IOException,
- getMaxRetryCount());
- if (response.getStatus().equals(HttpResponseStatus.OK)) {
+ FullResponseHolder response = RetryUtils
+ .retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(
+ getHttpClient(),
+ new Request(HttpMethod.GET,
+ new URL(String
+ .format("http://%s/druid/indexer/v1/supervisor/%s",
+ overlordAddress,
+ dataSourceName
+ ))
+ ), new FullResponseHandler(
+ Charset.forName("UTF-8"))
+ ),
+ input -> input instanceof IOException,
+ getMaxRetryCount()
+ );
+ if(response.getStatus().equals(HttpResponseStatus.OK)) {
return JSON_MAPPER
.readValue(response.getContent(), KafkaSupervisorSpec.class);
// Druid Returns 400 Bad Request when not found.
@@ -524,14 +543,18 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
.checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE),
"Druid Datasource name is null");
try {
- StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient().go(new Request(HttpMethod.GET,
- new URL(String
- .format("http://%s/druid/indexer/v1/supervisor/%s/status", overlordAddress,
- dataSourceName))),
- new StatusResponseHandler(
- Charset.forName("UTF-8"))).get(),
+ FullResponseHolder response = RetryUtils.retry(() -> DruidStorageHandlerUtils
+ .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET,
+ new URL(String
+ .format("http://%s/druid/indexer/v1/supervisor/%s/status",
+ overlordAddress,
+ dataSourceName
+ ))
+ ), new FullResponseHandler(
+ Charset.forName("UTF-8"))),
input -> input instanceof IOException,
- getMaxRetryCount());
+ getMaxRetryCount()
+ );
if (response.getStatus().equals(HttpResponseStatus.OK)) {
return DruidStorageHandlerUtils.JSON_MAPPER
.readValue(response.getContent(), KafkaSupervisorReport.class);
@@ -549,7 +572,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
return null;
}
}
-
+
/**
* Creates metadata moves then commit the Segment's metadata to Druid metadata store in one TxN
*
@@ -609,9 +632,15 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
String coordinatorResponse;
try {
- coordinatorResponse = RetryUtils.retry(() -> DruidStorageHandlerUtils.getURL(getHttpClient(),
- new URL(String.format("http://%s/status", coordinatorAddress))
- ), input -> input instanceof IOException, maxTries);
+ coordinatorResponse = RetryUtils
+ .retry(() -> DruidStorageHandlerUtils
+ .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET,
+ new URL(String.format("http://%s/status", coordinatorAddress))
+ ),
+ new FullResponseHandler(Charset.forName("UTF-8"))
+ ).getContent(),
+ input -> input instanceof IOException, maxTries
+ );
} catch (Exception e) {
console.printInfo(
"Will skip waiting for data loading, coordinator unavailable");
@@ -642,10 +671,14 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
while (numRetries++ < maxTries && !UrlsOfUnloadedSegments.isEmpty()) {
UrlsOfUnloadedSegments = ImmutableSet.copyOf(Sets.filter(UrlsOfUnloadedSegments, input -> {
try {
- String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input);
+ String result = DruidStorageHandlerUtils
+ .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, input),
+ new FullResponseHandler(Charset.forName("UTF-8"))
+ ).getContent();
+
LOG.debug("Checking segment [{}] response is [{}]", input, result);
return Strings.isNullOrEmpty(result);
- } catch (IOException e) {
+ } catch (InterruptedException | ExecutionException e) {
LOG.error(String.format("Error while checking URL [%s]", input), e);
return true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dc736fc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 3e2a171..9da46df 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -38,6 +38,8 @@ import com.metamx.emitter.core.NoopEmitter;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
+import com.metamx.http.client.response.FullResponseHandler;
+import com.metamx.http.client.response.FullResponseHolder;
import com.metamx.http.client.response.InputStreamResponseHandler;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
@@ -100,6 +102,7 @@ import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.StringUtils;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
@@ -120,6 +123,7 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.net.InetAddress;
+import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.sql.SQLException;
@@ -277,6 +281,43 @@ public final class DruidStorageHandlerUtils {
}
}
+ public static FullResponseHolder getResponseFromCurrentLeader(HttpClient client, Request request,
+ FullResponseHandler fullResponseHandler)
+ throws ExecutionException, InterruptedException {
+ FullResponseHolder responseHolder = client.go(request,
+ fullResponseHandler).get();
+ if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(responseHolder.getStatus())) {
+ String redirectUrlStr = responseHolder.getResponse().headers().get("Location");
+ LOG.debug("Request[%s] received redirect response to location [%s].", request.getUrl(),
+ redirectUrlStr);
+ final URL redirectUrl;
+ try {
+ redirectUrl = new URL(redirectUrlStr);
+ } catch (MalformedURLException ex) {
+ throw new ExecutionException(
+ String.format(
+ "Malformed redirect location is found in response from url[%s], new location[%s].",
+ request.getUrl(),
+ redirectUrlStr),
+ ex
+ );
+ }
+ responseHolder = client.go(withUrl(request, redirectUrl),
+ fullResponseHandler).get();
+ }
+ return responseHolder;
+ }
+
+ private static Request withUrl(Request old, URL url)
+ {
+ Request req = new Request(old.getMethod(), url);
+ req.addHeaderValues(old.getHeaders());
+ if (old.hasContent()) {
+ req.setContent(old.getContent());
+ }
+ return req;
+ }
+
/**
* @param taskDir path to the directory containing the segments descriptor info
* the descriptor path will be .../workingPath/task_id/{@link DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json