You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/11/13 17:51:45 UTC

[incubator-druid] 01/01: Use current coordinator leader instead of cached one (#6551) (#6552)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch backport-6552-to-0.13.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git

commit ee81e2c8d57b95a5a70de77a436390ab24ab3f24
Author: Surekha <su...@imply.io>
AuthorDate: Tue Nov 6 13:09:51 2018 -0800

    Use current coordinator leader instead of cached one (#6551) (#6552)
    
    * Use current coordinator leader instead of cached one (#6551)
    
    Check the response status and throw exception if not OK
    
    * Modify tests
    
    * PR comment
    
    * Add the correct check for status of BytesAccumulatingResponseHandler
    
    * Move the status check into JsonParserIterator so sql query outputs meaningful message on failure
    
    * Fix tests
---
 .../org/apache/druid/client/DirectDruidClient.java     |  2 +-
 .../org/apache/druid/client/JsonParserIterator.java    | 15 ++++++++++++++-
 .../org/apache/druid/discovery/DruidLeaderClient.java  | 14 ++++++++++++--
 .../apache/druid/sql/calcite/schema/SystemSchema.java  | 18 +++++++++++++-----
 .../druid/sql/calcite/schema/SystemSchemaTest.java     | 11 +++++++----
 5 files changed, 47 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index da2fb03..e4f2678 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -540,7 +540,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
           @Override
           public JsonParserIterator<T> make()
           {
-            return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper);
+            return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper, null);
           }
 
           @Override
diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
index 18535c1..d0c210b 100644
--- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -31,7 +31,9 @@ import org.apache.druid.java.util.common.guava.CloseQuietly;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.ResourceLimitExceededException;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
 
+import javax.servlet.http.HttpServletResponse;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -50,6 +52,7 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
   private final String url;
   private final String host;
   private final ObjectMapper objectMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
 
   public JsonParserIterator(
       JavaType typeRef,
@@ -57,7 +60,8 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
       String url,
       Query<T> query,
       String host,
-      ObjectMapper objectMapper
+      ObjectMapper objectMapper,
+      BytesAccumulatingResponseHandler responseHandler
   )
   {
     this.typeRef = typeRef;
@@ -67,6 +71,7 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
     jp = null;
     this.host = host;
     this.objectMapper = objectMapper;
+    this.responseHandler = responseHandler;
   }
 
   @Override
@@ -111,6 +116,14 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
     if (jp == null) {
       try {
         InputStream is = future.get();
+        if (responseHandler != null && responseHandler.getStatus() != HttpServletResponse.SC_OK) {
+          throw new RE(
+              "Unexpected response status [%s] description [%s] from request url [%s]",
+              responseHandler.getStatus(),
+              responseHandler.getDescription(),
+              url
+          );
+        }
         if (is == null) {
           throw new QueryInterruptedException(
               new ResourceLimitExceededException(
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
index 72feb9c..f1a6b6d 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
@@ -122,11 +122,21 @@ public class DruidLeaderClient
 
   /**
    * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
+   *
+   * @param cached Uses cached leader if true, else uses the current leader
    */
-  public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
+  public Request makeRequest(HttpMethod httpMethod, String urlPath, boolean cached) throws IOException
   {
     Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
-    return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath)));
+    return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(cached), urlPath)));
+  }
+
+  /**
+   * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
+   */
+  public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
+  {
+    return makeRequest(httpMethod, urlPath, true);
   }
 
   public FullResponseHolder go(Request request) throws IOException, InterruptedException
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 8a1dbc8..0a50878 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -50,6 +50,7 @@ import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
@@ -81,13 +82,14 @@ import java.util.stream.Collectors;
 
 public class SystemSchema extends AbstractSchema
 {
-
   public static final String NAME = "sys";
   private static final String SEGMENTS_TABLE = "segments";
   private static final String SERVERS_TABLE = "servers";
   private static final String SERVER_SEGMENTS_TABLE = "server_segments";
   private static final String TASKS_TABLE = "tasks";
 
+  private static final EmittingLogger log = new EmittingLogger(SystemSchema.class);
+
   private static final RowSignature SEGMENTS_SIGNATURE = RowSignature
       .builder()
       .add("segment_id", ValueType.STRING)
@@ -393,7 +395,8 @@ public class SystemSchema extends AbstractSchema
     try {
       request = coordinatorClient.makeRequest(
           HttpMethod.GET,
-          StringUtils.format("/druid/coordinator/v1/metadata/segments")
+          StringUtils.format("/druid/coordinator/v1/metadata/segments"),
+          false
       );
     }
     catch (IOException e) {
@@ -403,6 +406,7 @@ public class SystemSchema extends AbstractSchema
         request,
         responseHandler
     );
+
     final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<DataSegment>()
     {
     });
@@ -412,7 +416,8 @@ public class SystemSchema extends AbstractSchema
         request.getUrl().toString(),
         null,
         request.getUrl().getHost(),
-        jsonMapper
+        jsonMapper,
+        responseHandler
     );
   }
 
@@ -646,7 +651,8 @@ public class SystemSchema extends AbstractSchema
     try {
       request = indexingServiceClient.makeRequest(
           HttpMethod.GET,
-          StringUtils.format("/druid/indexer/v1/tasks")
+          StringUtils.format("/druid/indexer/v1/tasks"),
+          false
       );
     }
     catch (IOException e) {
@@ -656,6 +662,7 @@ public class SystemSchema extends AbstractSchema
         request,
         responseHandler
     );
+
     final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<TaskStatusPlus>()
     {
     });
@@ -665,7 +672,8 @@ public class SystemSchema extends AbstractSchema
         request.getUrl().toString(),
         null,
         request.getUrl().getHost(),
-        jsonMapper
+        jsonMapper,
+        responseHandler
     );
   }
 
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 528d972..bf2eb87 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -336,11 +336,14 @@ public class SystemSchemaTest extends CalciteTestBase
         druidSchema, client, mapper, responseHandler, authMapper).createMock();
     EasyMock.replay(segmentsTable);
 
-    EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments")).andReturn(request).anyTimes();
+    EasyMock
+        .expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments", false))
+        .andReturn(request)
+        .anyTimes();
     SettableFuture<InputStream> future = SettableFuture.create();
     EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
     final int ok = HttpServletResponse.SC_OK;
-    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
+    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
 
     EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/coordinator/v1/metadata/segments")).anyTimes();
 
@@ -591,11 +594,11 @@ public class SystemSchemaTest extends CalciteTestBase
                                                  .withConstructor(client, mapper, responseHandler, authMapper)
                                                  .createMock();
     EasyMock.replay(tasksTable);
-    EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks")).andReturn(request).anyTimes();
+    EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks", false)).andReturn(request).anyTimes();
     SettableFuture<InputStream> future = SettableFuture.create();
     EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
     final int ok = HttpServletResponse.SC_OK;
-    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
+    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
     EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes();
 
     AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org