You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/11/06 21:09:54 UTC

[GitHub] jon-wei closed pull request #6552: Use current coordinator leader instead of cached one (#6551)

jon-wei closed pull request #6552: Use current coordinator leader instead of cached one (#6551)
URL: https://github.com/apache/incubator-druid/pull/6552
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index 58b153a2b36..3f86efd18c6 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -543,7 +543,7 @@ public void onFailure(Throwable t)
           @Override
           public JsonParserIterator<T> make()
           {
-            return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper);
+            return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper, null);
           }
 
           @Override
diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
index 18535c19c39..d0c210b5b60 100644
--- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -31,7 +31,9 @@
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.ResourceLimitExceededException;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
 
+import javax.servlet.http.HttpServletResponse;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -50,6 +52,7 @@
   private final String url;
   private final String host;
   private final ObjectMapper objectMapper;
+  private final BytesAccumulatingResponseHandler responseHandler;
 
   public JsonParserIterator(
       JavaType typeRef,
@@ -57,7 +60,8 @@ public JsonParserIterator(
       String url,
       Query<T> query,
       String host,
-      ObjectMapper objectMapper
+      ObjectMapper objectMapper,
+      BytesAccumulatingResponseHandler responseHandler
   )
   {
     this.typeRef = typeRef;
@@ -67,6 +71,7 @@ public JsonParserIterator(
     jp = null;
     this.host = host;
     this.objectMapper = objectMapper;
+    this.responseHandler = responseHandler;
   }
 
   @Override
@@ -111,6 +116,14 @@ private void init()
     if (jp == null) {
       try {
         InputStream is = future.get();
+        if (responseHandler != null && responseHandler.getStatus() != HttpServletResponse.SC_OK) {
+          throw new RE(
+              "Unexpected response status [%s] description [%s] from request url [%s]",
+              responseHandler.getStatus(),
+              responseHandler.getDescription(),
+              url
+          );
+        }
         if (is == null) {
           throw new QueryInterruptedException(
               new ResourceLimitExceededException(
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
index f0563a40b00..79d5a69917a 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
@@ -122,11 +122,21 @@ public void stop()
 
   /**
    * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
+   *
+   * @param cached Uses cached leader if true, else uses the current leader
    */
-  public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
+  public Request makeRequest(HttpMethod httpMethod, String urlPath, boolean cached) throws IOException
   {
     Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
-    return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath)));
+    return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(cached), urlPath)));
+  }
+
+  /**
+   * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
+   */
+  public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
+  {
+    return makeRequest(httpMethod, urlPath, true);
   }
 
   public FullResponseHolder go(Request request) throws IOException, InterruptedException
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 3990ded6980..1fa23884774 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -50,6 +50,7 @@
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
@@ -81,13 +82,14 @@
 
 public class SystemSchema extends AbstractSchema
 {
-
   public static final String NAME = "sys";
   private static final String SEGMENTS_TABLE = "segments";
   private static final String SERVERS_TABLE = "servers";
   private static final String SERVER_SEGMENTS_TABLE = "server_segments";
   private static final String TASKS_TABLE = "tasks";
 
+  private static final EmittingLogger log = new EmittingLogger(SystemSchema.class);
+
   private static final RowSignature SEGMENTS_SIGNATURE = RowSignature
       .builder()
       .add("segment_id", ValueType.STRING)
@@ -402,7 +404,8 @@ public long getNumRows()
     try {
       request = coordinatorClient.makeRequest(
           HttpMethod.GET,
-          StringUtils.format("/druid/coordinator/v1/metadata/segments")
+          StringUtils.format("/druid/coordinator/v1/metadata/segments"),
+          false
       );
     }
     catch (IOException e) {
@@ -412,6 +415,7 @@ public long getNumRows()
         request,
         responseHandler
     );
+
     final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<DataSegment>()
     {
     });
@@ -421,7 +425,8 @@ public long getNumRows()
         request.getUrl().toString(),
         null,
         request.getUrl().getHost(),
-        jsonMapper
+        jsonMapper,
+        responseHandler
     );
   }
 
@@ -659,7 +664,8 @@ public void close()
     try {
       request = indexingServiceClient.makeRequest(
           HttpMethod.GET,
-          StringUtils.format("/druid/indexer/v1/tasks")
+          StringUtils.format("/druid/indexer/v1/tasks"),
+          false
       );
     }
     catch (IOException e) {
@@ -669,6 +675,7 @@ public void close()
         request,
         responseHandler
     );
+
     final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<TaskStatusPlus>()
     {
     });
@@ -678,7 +685,8 @@ public void close()
         request.getUrl().toString(),
         null,
         request.getUrl().getHost(),
-        jsonMapper
+        jsonMapper,
+        responseHandler
     );
   }
 
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index f80201e1459..3537ad62bef 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -339,13 +339,13 @@ public void testSegmentsTable() throws Exception
     EasyMock.replay(segmentsTable);
 
     EasyMock
-        .expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments"))
+        .expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments", false))
         .andReturn(request)
         .anyTimes();
     SettableFuture<InputStream> future = SettableFuture.create();
     EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
     final int ok = HttpServletResponse.SC_OK;
-    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
+    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
 
     EasyMock
         .expect(request.getUrl())
@@ -599,11 +599,11 @@ public void testTasksTable() throws Exception
                                                  .withConstructor(client, mapper, responseHandler, authMapper)
                                                  .createMock();
     EasyMock.replay(tasksTable);
-    EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks")).andReturn(request).anyTimes();
+    EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks", false)).andReturn(request).anyTimes();
     SettableFuture<InputStream> future = SettableFuture.create();
     EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
     final int ok = HttpServletResponse.SC_OK;
-    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
+    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
     EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes();
 
     AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org