You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/11/13 17:51:44 UTC

[incubator-druid] branch backport-6552-to-0.13.0-incubating created (now ee81e2c)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a change to branch backport-6552-to-0.13.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git.


      at ee81e2c  Use current coordinator leader instead of cached one (#6551) (#6552)

This branch includes the following new commits:

     new ee81e2c  Use current coordinator leader instead of cached one (#6551) (#6552)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[incubator-druid] 01/01: Use current coordinator leader instead of cached one (#6551) (#6552)

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch backport-6552-to-0.13.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git

commit ee81e2c8d57b95a5a70de77a436390ab24ab3f24
Author: Surekha <su...@imply.io>
AuthorDate: Tue Nov 6 13:09:51 2018 -0800

    Use current coordinator leader instead of cached one (#6551) (#6552)
    
    * Use current coordinator leader instead of cached one (#6551)
    
    Check the response status and throw exception if not OK
    
    * Modify tests
    
    * PR comment
    
    * Add the correct check for status of BytesAccumulatingResponseHandler
    
    * Move the status check into JsonParserIterator so sql query outputs meaningful message on failure
    
    * Fix tests
---
 .../org/apache/druid/client/DirectDruidClient.java     |  2 +-
 .../org/apache/druid/client/JsonParserIterator.java    | 15 ++++++++++++++-
 .../org/apache/druid/discovery/DruidLeaderClient.java  | 14 ++++++++++++--
 .../apache/druid/sql/calcite/schema/SystemSchema.java  | 18 +++++++++++++-----
 .../druid/sql/calcite/schema/SystemSchemaTest.java     | 11 +++++++----
 5 files changed, 47 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index da2fb03..e4f2678 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -540,7 +540,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
           @Override
           public JsonParserIterator<T> make()
           {
-            return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper);
+            return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper, null);
           }
 
           @Override
diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
index 18535c1..d0c210b 100644
--- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -31,7 +31,9 @@ import org.apache.druid.java.util.common.guava.CloseQuietly;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.ResourceLimitExceededException;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
 
+import javax.servlet.http.HttpServletResponse;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -50,6 +52,7 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
   private final String url;
   private final String host;
   private final ObjectMapper objectMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
 
   public JsonParserIterator(
       JavaType typeRef,
@@ -57,7 +60,8 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
       String url,
       Query<T> query,
       String host,
-      ObjectMapper objectMapper
+      ObjectMapper objectMapper,
+      BytesAccumulatingResponseHandler responseHandler
   )
   {
     this.typeRef = typeRef;
@@ -67,6 +71,7 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
     jp = null;
     this.host = host;
     this.objectMapper = objectMapper;
+    this.responseHandler = responseHandler;
   }
 
   @Override
@@ -111,6 +116,14 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
     if (jp == null) {
       try {
         InputStream is = future.get();
+        if (responseHandler != null && responseHandler.getStatus() != HttpServletResponse.SC_OK) {
+          throw new RE(
+              "Unexpected response status [%s] description [%s] from request url [%s]",
+              responseHandler.getStatus(),
+              responseHandler.getDescription(),
+              url
+          );
+        }
         if (is == null) {
           throw new QueryInterruptedException(
               new ResourceLimitExceededException(
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
index 72feb9c..f1a6b6d 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
@@ -122,11 +122,21 @@ public class DruidLeaderClient
 
   /**
    * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
+   *
+   * @param cached Uses cached leader if true, else uses the current leader
    */
-  public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
+  public Request makeRequest(HttpMethod httpMethod, String urlPath, boolean cached) throws IOException
   {
     Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
-    return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath)));
+    return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(cached), urlPath)));
+  }
+
+  /**
+   * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
+   */
+  public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
+  {
+    return makeRequest(httpMethod, urlPath, true);
   }
 
   public FullResponseHolder go(Request request) throws IOException, InterruptedException
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 8a1dbc8..0a50878 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -50,6 +50,7 @@ import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
@@ -81,13 +82,14 @@ import java.util.stream.Collectors;
 
 public class SystemSchema extends AbstractSchema
 {
-
   public static final String NAME = "sys";
   private static final String SEGMENTS_TABLE = "segments";
   private static final String SERVERS_TABLE = "servers";
   private static final String SERVER_SEGMENTS_TABLE = "server_segments";
   private static final String TASKS_TABLE = "tasks";
 
+  private static final EmittingLogger log = new EmittingLogger(SystemSchema.class);
+
   private static final RowSignature SEGMENTS_SIGNATURE = RowSignature
       .builder()
       .add("segment_id", ValueType.STRING)
@@ -393,7 +395,8 @@ public class SystemSchema extends AbstractSchema
     try {
       request = coordinatorClient.makeRequest(
           HttpMethod.GET,
-          StringUtils.format("/druid/coordinator/v1/metadata/segments")
+          StringUtils.format("/druid/coordinator/v1/metadata/segments"),
+          false
       );
     }
     catch (IOException e) {
@@ -403,6 +406,7 @@ public class SystemSchema extends AbstractSchema
         request,
         responseHandler
     );
+
     final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<DataSegment>()
     {
     });
@@ -412,7 +416,8 @@ public class SystemSchema extends AbstractSchema
         request.getUrl().toString(),
         null,
         request.getUrl().getHost(),
-        jsonMapper
+        jsonMapper,
+        responseHandler
     );
   }
 
@@ -646,7 +651,8 @@ public class SystemSchema extends AbstractSchema
     try {
       request = indexingServiceClient.makeRequest(
           HttpMethod.GET,
-          StringUtils.format("/druid/indexer/v1/tasks")
+          StringUtils.format("/druid/indexer/v1/tasks"),
+          false
       );
     }
     catch (IOException e) {
@@ -656,6 +662,7 @@ public class SystemSchema extends AbstractSchema
         request,
         responseHandler
     );
+
     final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<TaskStatusPlus>()
     {
     });
@@ -665,7 +672,8 @@ public class SystemSchema extends AbstractSchema
         request.getUrl().toString(),
         null,
         request.getUrl().getHost(),
-        jsonMapper
+        jsonMapper,
+        responseHandler
     );
   }
 
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 528d972..bf2eb87 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -336,11 +336,14 @@ public class SystemSchemaTest extends CalciteTestBase
         druidSchema, client, mapper, responseHandler, authMapper).createMock();
     EasyMock.replay(segmentsTable);
 
-    EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments")).andReturn(request).anyTimes();
+    EasyMock
+        .expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments", false))
+        .andReturn(request)
+        .anyTimes();
     SettableFuture<InputStream> future = SettableFuture.create();
     EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
     final int ok = HttpServletResponse.SC_OK;
-    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
+    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
 
     EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/coordinator/v1/metadata/segments")).anyTimes();
 
@@ -591,11 +594,11 @@ public class SystemSchemaTest extends CalciteTestBase
                                                  .withConstructor(client, mapper, responseHandler, authMapper)
                                                  .createMock();
     EasyMock.replay(tasksTable);
-    EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks")).andReturn(request).anyTimes();
+    EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks", false)).andReturn(request).anyTimes();
     SettableFuture<InputStream> future = SettableFuture.create();
     EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
     final int ok = HttpServletResponse.SC_OK;
-    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
+    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
     EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes();
 
     AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org