You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/11/13 22:34:57 UTC

[GitHub] fjy closed pull request #6614: [Backport] Use current coordinator leader instead of cached one

fjy closed pull request #6614: [Backport] Use current coordinator leader instead of cached one
URL: https://github.com/apache/incubator-druid/pull/6614
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index da2fb03b819..e4f267846ed 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -540,7 +540,7 @@ public void onFailure(Throwable t)
           @Override
           public JsonParserIterator<T> make()
           {
-            return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper);
+            return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper, null);
           }
 
           @Override
diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
index 18535c19c39..d0c210b5b60 100644
--- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -31,7 +31,9 @@
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.ResourceLimitExceededException;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
 
+import javax.servlet.http.HttpServletResponse;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -50,6 +52,7 @@
   private final String url;
   private final String host;
   private final ObjectMapper objectMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
 
   public JsonParserIterator(
       JavaType typeRef,
@@ -57,7 +60,8 @@ public JsonParserIterator(
       String url,
       Query<T> query,
       String host,
-      ObjectMapper objectMapper
+      ObjectMapper objectMapper,
+      BytesAccumulatingResponseHandler responseHandler
   )
   {
     this.typeRef = typeRef;
@@ -67,6 +71,7 @@ public JsonParserIterator(
     jp = null;
     this.host = host;
     this.objectMapper = objectMapper;
+    this.responseHandler = responseHandler;
   }
 
   @Override
@@ -111,6 +116,14 @@ private void init()
     if (jp == null) {
       try {
         InputStream is = future.get();
+        if (responseHandler != null && responseHandler.getStatus() != HttpServletResponse.SC_OK) {
+          throw new RE(
+              "Unexpected response status [%s] description [%s] from request url [%s]",
+              responseHandler.getStatus(),
+              responseHandler.getDescription(),
+              url
+          );
+        }
         if (is == null) {
           throw new QueryInterruptedException(
               new ResourceLimitExceededException(
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
index 72feb9cdbbd..f1a6b6d0eb8 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
@@ -122,11 +122,21 @@ public void stop()
 
   /**
    * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
+   *
+   * @param cached Uses cached leader if true, else uses the current leader
    */
-  public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
+  public Request makeRequest(HttpMethod httpMethod, String urlPath, boolean cached) throws IOException
   {
     Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
-    return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath)));
+    return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(cached), urlPath)));
+  }
+
+  /**
+   * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
+   */
+  public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
+  {
+    return makeRequest(httpMethod, urlPath, true);
   }
 
   public FullResponseHolder go(Request request) throws IOException, InterruptedException
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 8a1dbc87ab3..0a50878ac1d 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -50,6 +50,7 @@
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
@@ -81,13 +82,14 @@
 
 public class SystemSchema extends AbstractSchema
 {
-
   public static final String NAME = "sys";
   private static final String SEGMENTS_TABLE = "segments";
   private static final String SERVERS_TABLE = "servers";
   private static final String SERVER_SEGMENTS_TABLE = "server_segments";
   private static final String TASKS_TABLE = "tasks";
 
+  private static final EmittingLogger log = new EmittingLogger(SystemSchema.class);
+
   private static final RowSignature SEGMENTS_SIGNATURE = RowSignature
       .builder()
       .add("segment_id", ValueType.STRING)
@@ -393,7 +395,8 @@ public long getNumRows()
     try {
       request = coordinatorClient.makeRequest(
           HttpMethod.GET,
-          StringUtils.format("/druid/coordinator/v1/metadata/segments")
+          StringUtils.format("/druid/coordinator/v1/metadata/segments"),
+          false
       );
     }
     catch (IOException e) {
@@ -403,6 +406,7 @@ public long getNumRows()
         request,
         responseHandler
     );
+
     final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<DataSegment>()
     {
     });
@@ -412,7 +416,8 @@ public long getNumRows()
         request.getUrl().toString(),
         null,
         request.getUrl().getHost(),
-        jsonMapper
+        jsonMapper,
+        responseHandler
     );
   }
 
@@ -646,7 +651,8 @@ public void close()
     try {
       request = indexingServiceClient.makeRequest(
           HttpMethod.GET,
-          StringUtils.format("/druid/indexer/v1/tasks")
+          StringUtils.format("/druid/indexer/v1/tasks"),
+          false
       );
     }
     catch (IOException e) {
@@ -656,6 +662,7 @@ public void close()
         request,
         responseHandler
     );
+
     final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<TaskStatusPlus>()
     {
     });
@@ -665,7 +672,8 @@ public void close()
         request.getUrl().toString(),
         null,
         request.getUrl().getHost(),
-        jsonMapper
+        jsonMapper,
+        responseHandler
     );
   }
 
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 528d9725217..bf2eb876cbd 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -336,11 +336,14 @@ public void testSegmentsTable() throws Exception
         druidSchema, client, mapper, responseHandler, authMapper).createMock();
     EasyMock.replay(segmentsTable);
 
-    EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments")).andReturn(request).anyTimes();
+    EasyMock
+        .expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments", false))
+        .andReturn(request)
+        .anyTimes();
     SettableFuture<InputStream> future = SettableFuture.create();
     EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
     final int ok = HttpServletResponse.SC_OK;
-    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
+    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
 
     EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/coordinator/v1/metadata/segments")).anyTimes();
 
@@ -591,11 +594,11 @@ public void testTasksTable() throws Exception
                                                  .withConstructor(client, mapper, responseHandler, authMapper)
                                                  .createMock();
     EasyMock.replay(tasksTable);
-    EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks")).andReturn(request).anyTimes();
+    EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks", false)).andReturn(request).anyTimes();
     SettableFuture<InputStream> future = SettableFuture.create();
     EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
     final int ok = HttpServletResponse.SC_OK;
-    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
+    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
     EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes();
 
     AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org