You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/08/09 23:45:28 UTC

hive git commit: HIVE-20353 : Follow redirects when hive connects to a passive druid overlord/coordinator (Nishant Bangarwa via Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 689423981 -> 3dc736fcf


HIVE-20353 : Follow redirects when hive connects to a passive druid overlord/coordinator (Nishant Bangarwa via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3dc736fc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3dc736fc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3dc736fc

Branch: refs/heads/master
Commit: 3dc736fcf9580d272b61bc83dd3422355b44fea9
Parents: 6894239
Author: Nishant Bangarwa <ni...@gmail.com>
Authored: Thu Aug 9 16:44:56 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu Aug 9 16:44:56 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/druid/DruidStorageHandler.java  | 143 ++++++++++++-------
 .../hive/druid/DruidStorageHandlerUtils.java    |  41 ++++++
 2 files changed, 129 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3dc736fc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 53d93e1..9f34b7b 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -33,9 +33,8 @@ import com.metamx.http.client.HttpClient;
 import com.metamx.http.client.HttpClientConfig;
 import com.metamx.http.client.HttpClientInit;
 import com.metamx.http.client.Request;
-import com.metamx.http.client.response.StatusResponseHandler;
-import com.metamx.http.client.response.StatusResponseHolder;
-
+import com.metamx.http.client.response.FullResponseHandler;
+import com.metamx.http.client.response.FullResponseHolder;
 import io.druid.data.input.impl.DimensionSchema;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.InputRowParser;
@@ -60,7 +59,6 @@ import io.druid.segment.loading.SegmentLoadingException;
 import io.druid.storage.hdfs.HdfsDataSegmentPusher;
 import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
 import io.druid.timeline.DataSegment;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -108,6 +106,7 @@ import org.skife.jdbi.v2.exceptions.CallbackFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -120,10 +119,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
-import javax.annotation.Nullable;
-
 import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
 
 /**
@@ -407,14 +405,16 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
       String task = JSON_MAPPER.writeValueAsString(spec);
       console.printInfo("submitting kafka Spec {}", task);
       LOG.info("submitting kafka Supervisor Spec {}", task);
-
-      StatusResponseHolder response = getHttpClient().go(new Request(HttpMethod.POST,
-              new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress)))
-              .setContent(
-                  "application/json",
-                  JSON_MAPPER.writeValueAsBytes(spec)),
-          new StatusResponseHandler(
-              Charset.forName("UTF-8"))).get();
+      FullResponseHolder response = DruidStorageHandlerUtils
+              .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.POST,
+                      new URL(String
+                              .format("http://%s/druid/indexer/v1/supervisor", overlordAddress))
+              )
+                      .setContent(
+                              "application/json",
+                              JSON_MAPPER.writeValueAsBytes(spec)
+                      ), new FullResponseHandler(
+                      Charset.forName("UTF-8")));
       if (response.getStatus().equals(HttpResponseStatus.OK)) {
         String msg = String.format("Kafka Supervisor for [%s] Submitted Successfully to druid.", spec.getDataSchema().getDataSource());
         LOG.info(msg);
@@ -431,16 +431,22 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
 
   private void resetKafkaIngestion(String overlordAddress, String dataSourceName) {
     try {
-      StatusResponseHolder response = RetryUtils
-          .retry(() -> getHttpClient().go(new Request(HttpMethod.POST,
-                  new URL(String
-                      .format("http://%s/druid/indexer/v1/supervisor/%s/reset", overlordAddress,
-                          dataSourceName))),
-              new StatusResponseHandler(
-                  Charset.forName("UTF-8"))).get(),
-              input -> input instanceof IOException,
-              getMaxRetryCount());
-      if (response.getStatus().equals(HttpResponseStatus.OK)) {
+      FullResponseHolder response = RetryUtils
+              .retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(
+                      getHttpClient(),
+                      new Request(HttpMethod.POST,
+                              new URL(String
+                                      .format("http://%s/druid/indexer/v1/supervisor/%s/reset",
+                                              overlordAddress,
+                                              dataSourceName
+                                      ))
+                      ), new FullResponseHandler(
+                              Charset.forName("UTF-8"))
+                      ),
+                      input -> input instanceof IOException,
+                      getMaxRetryCount()
+              );
+      if(response.getStatus().equals(HttpResponseStatus.OK)) {
         console.printInfo("Druid Kafka Ingestion Reset successful.");
       } else {
         throw new IOException(String
@@ -454,15 +460,21 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
 
   private void stopKafkaIngestion(String overlordAddress, String dataSourceName) {
     try {
-      StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient()
-              .go(new Request(HttpMethod.POST,
-                      new URL(String
-                          .format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", overlordAddress,
-                              dataSourceName))),
-                  new StatusResponseHandler(
-                      Charset.forName("UTF-8"))).get(),
-          input -> input instanceof IOException,
-          getMaxRetryCount());
+      FullResponseHolder response = RetryUtils
+              .retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(
+                      getHttpClient(),
+                      new Request(HttpMethod.POST,
+                              new URL(String
+                                      .format("http://%s/druid/indexer/v1/supervisor/%s/shutdown",
+                                              overlordAddress,
+                                              dataSourceName
+                                      ))
+                      ), new FullResponseHandler(
+                              Charset.forName("UTF-8"))
+                      ),
+                      input -> input instanceof IOException,
+                      getMaxRetryCount()
+              );
       if (response.getStatus().equals(HttpResponseStatus.OK)) {
         console.printInfo("Druid Kafka Ingestion shutdown successful.");
       } else {
@@ -485,15 +497,22 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
         .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE),
             "Druid Datasource name is null");
     try {
-      StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient().go(new Request(HttpMethod.GET,
-              new URL(String
-                  .format("http://%s/druid/indexer/v1/supervisor/%s", overlordAddress,
-                      dataSourceName))),
-          new StatusResponseHandler(
-              Charset.forName("UTF-8"))).get(),
-          input -> input instanceof IOException,
-          getMaxRetryCount());
-      if (response.getStatus().equals(HttpResponseStatus.OK)) {
+      FullResponseHolder response = RetryUtils
+              .retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(
+                      getHttpClient(),
+                      new Request(HttpMethod.GET,
+                              new URL(String
+                                      .format("http://%s/druid/indexer/v1/supervisor/%s",
+                                              overlordAddress,
+                                              dataSourceName
+                                      ))
+                      ), new FullResponseHandler(
+                              Charset.forName("UTF-8"))
+                      ),
+                      input -> input instanceof IOException,
+                      getMaxRetryCount()
+              );
+      if(response.getStatus().equals(HttpResponseStatus.OK)) {
         return JSON_MAPPER
             .readValue(response.getContent(), KafkaSupervisorSpec.class);
         // Druid Returns 400 Bad Request when not found.
@@ -524,14 +543,18 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
             .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE),
                     "Druid Datasource name is null");
     try {
-      StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient().go(new Request(HttpMethod.GET,
-                      new URL(String
-                              .format("http://%s/druid/indexer/v1/supervisor/%s/status", overlordAddress,
-                                      dataSourceName))),
-              new StatusResponseHandler(
-                      Charset.forName("UTF-8"))).get(),
+      FullResponseHolder response = RetryUtils.retry(() -> DruidStorageHandlerUtils
+                      .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET,
+                              new URL(String
+                                      .format("http://%s/druid/indexer/v1/supervisor/%s/status",
+                                              overlordAddress,
+                                              dataSourceName
+                                      ))
+                      ), new FullResponseHandler(
+                              Charset.forName("UTF-8"))),
               input -> input instanceof IOException,
-              getMaxRetryCount());
+              getMaxRetryCount()
+      );
       if (response.getStatus().equals(HttpResponseStatus.OK)) {
         return DruidStorageHandlerUtils.JSON_MAPPER
                 .readValue(response.getContent(), KafkaSupervisorReport.class);
@@ -549,7 +572,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
       return null;
     }
   }
-  
+
   /**
    * Creates metadata moves then commit the Segment's metadata to Druid metadata store in one TxN
    *
@@ -609,9 +632,15 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
 
     String coordinatorResponse;
     try {
-      coordinatorResponse = RetryUtils.retry(() -> DruidStorageHandlerUtils.getURL(getHttpClient(),
-              new URL(String.format("http://%s/status", coordinatorAddress))
-      ), input -> input instanceof IOException, maxTries);
+      coordinatorResponse = RetryUtils
+              .retry(() -> DruidStorageHandlerUtils
+                              .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET,
+                                              new URL(String.format("http://%s/status", coordinatorAddress))
+                                      ),
+                                      new FullResponseHandler(Charset.forName("UTF-8"))
+                              ).getContent(),
+                      input -> input instanceof IOException, maxTries
+              );
     } catch (Exception e) {
       console.printInfo(
               "Will skip waiting for data loading, coordinator unavailable");
@@ -642,10 +671,14 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
     while (numRetries++ < maxTries && !UrlsOfUnloadedSegments.isEmpty()) {
       UrlsOfUnloadedSegments = ImmutableSet.copyOf(Sets.filter(UrlsOfUnloadedSegments, input -> {
         try {
-          String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input);
+          String result = DruidStorageHandlerUtils
+                  .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, input),
+                          new FullResponseHandler(Charset.forName("UTF-8"))
+                  ).getContent();
+
           LOG.debug("Checking segment [{}] response is [{}]", input, result);
           return Strings.isNullOrEmpty(result);
-        } catch (IOException e) {
+        } catch (InterruptedException | ExecutionException e) {
           LOG.error(String.format("Error while checking URL [%s]", input), e);
           return true;
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/3dc736fc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 3e2a171..9da46df 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -38,6 +38,8 @@ import com.metamx.emitter.core.NoopEmitter;
 import com.metamx.emitter.service.ServiceEmitter;
 import com.metamx.http.client.HttpClient;
 import com.metamx.http.client.Request;
+import com.metamx.http.client.response.FullResponseHandler;
+import com.metamx.http.client.response.FullResponseHolder;
 import com.metamx.http.client.response.InputStreamResponseHandler;
 import io.druid.data.input.impl.DimensionSchema;
 import io.druid.data.input.impl.StringDimensionSchema;
@@ -100,6 +102,7 @@ import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.util.StringUtils;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.chrono.ISOChronology;
@@ -120,6 +123,7 @@ import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.Reader;
 import java.net.InetAddress;
+import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.UnknownHostException;
 import java.sql.SQLException;
@@ -277,6 +281,43 @@ public final class DruidStorageHandlerUtils {
     }
   }
 
+  public static FullResponseHolder getResponseFromCurrentLeader(HttpClient client, Request request,
+          FullResponseHandler fullResponseHandler)
+          throws ExecutionException, InterruptedException {
+    FullResponseHolder responseHolder = client.go(request,
+            fullResponseHandler).get();
+    if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(responseHolder.getStatus())) {
+      String redirectUrlStr = responseHolder.getResponse().headers().get("Location");
+      LOG.debug("Request[%s] received redirect response to location [%s].", request.getUrl(),
+              redirectUrlStr);
+      final URL redirectUrl;
+      try {
+        redirectUrl = new URL(redirectUrlStr);
+      } catch (MalformedURLException ex) {
+        throw new ExecutionException(
+                String.format(
+                        "Malformed redirect location is found in response from url[%s], new location[%s].",
+                        request.getUrl(),
+                        redirectUrlStr),
+                ex
+        );
+      }
+      responseHolder = client.go(withUrl(request, redirectUrl),
+              fullResponseHandler).get();
+    }
+    return responseHolder;
+  }
+
+  private static Request withUrl(Request old, URL url)
+  {
+    Request req = new Request(old.getMethod(), url);
+    req.addHeaderValues(old.getHeaders());
+    if (old.hasContent()) {
+      req.setContent(old.getContent());
+    }
+    return req;
+  }
+
   /**
    * @param taskDir path to the  directory containing the segments descriptor info
    *                the descriptor path will be .../workingPath/task_id/{@link DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json