You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/11/13 17:51:45 UTC
[incubator-druid] 01/01: Use current coordinator leader instead of
cached one (#6551) (#6552)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch backport-6552-to-0.13.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit ee81e2c8d57b95a5a70de77a436390ab24ab3f24
Author: Surekha <su...@imply.io>
AuthorDate: Tue Nov 6 13:09:51 2018 -0800
Use current coordinator leader instead of cached one (#6551) (#6552)
* Use current coordinator leader instead of cached one (#6551)
Check the response status and throw exception if not OK
* Modify tests
* PR comment
* Add the correct check for status of BytesAccumulatingResponseHandler
* Move the status check into JsonParserIterator so sql query outputs meaningful message on failure
* Fix tests
---
.../org/apache/druid/client/DirectDruidClient.java | 2 +-
.../org/apache/druid/client/JsonParserIterator.java | 15 ++++++++++++++-
.../org/apache/druid/discovery/DruidLeaderClient.java | 14 ++++++++++++--
.../apache/druid/sql/calcite/schema/SystemSchema.java | 18 +++++++++++++-----
.../druid/sql/calcite/schema/SystemSchemaTest.java | 11 +++++++----
5 files changed, 47 insertions(+), 13 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index da2fb03..e4f2678 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -540,7 +540,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override
public JsonParserIterator<T> make()
{
- return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper);
+ return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper, null);
}
@Override
diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
index 18535c1..d0c210b 100644
--- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -31,7 +31,9 @@ import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.ResourceLimitExceededException;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import javax.servlet.http.HttpServletResponse;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@@ -50,6 +52,7 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
private final String url;
private final String host;
private final ObjectMapper objectMapper;
+ private final BytesAccumulatingResponseHandler responseHandler;
public JsonParserIterator(
JavaType typeRef,
@@ -57,7 +60,8 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
String url,
Query<T> query,
String host,
- ObjectMapper objectMapper
+ ObjectMapper objectMapper,
+ BytesAccumulatingResponseHandler responseHandler
)
{
this.typeRef = typeRef;
@@ -67,6 +71,7 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
jp = null;
this.host = host;
this.objectMapper = objectMapper;
+ this.responseHandler = responseHandler;
}
@Override
@@ -111,6 +116,14 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
if (jp == null) {
try {
InputStream is = future.get();
+ if (responseHandler != null && responseHandler.getStatus() != HttpServletResponse.SC_OK) {
+ throw new RE(
+ "Unexpected response status [%s] description [%s] from request url [%s]",
+ responseHandler.getStatus(),
+ responseHandler.getDescription(),
+ url
+ );
+ }
if (is == null) {
throw new QueryInterruptedException(
new ResourceLimitExceededException(
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
index 72feb9c..f1a6b6d 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
@@ -122,11 +122,21 @@ public class DruidLeaderClient
/**
* Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
+ *
+ * @param cached Uses cached leader if true, else uses the current leader
*/
- public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
+ public Request makeRequest(HttpMethod httpMethod, String urlPath, boolean cached) throws IOException
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
- return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath)));
+ return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(cached), urlPath)));
+ }
+
+ /**
+ * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
+ */
+ public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
+ {
+ return makeRequest(httpMethod, urlPath, true);
}
public FullResponseHolder go(Request request) throws IOException, InterruptedException
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 8a1dbc8..0a50878 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -50,6 +50,7 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
@@ -81,13 +82,14 @@ import java.util.stream.Collectors;
public class SystemSchema extends AbstractSchema
{
-
public static final String NAME = "sys";
private static final String SEGMENTS_TABLE = "segments";
private static final String SERVERS_TABLE = "servers";
private static final String SERVER_SEGMENTS_TABLE = "server_segments";
private static final String TASKS_TABLE = "tasks";
+ private static final EmittingLogger log = new EmittingLogger(SystemSchema.class);
+
private static final RowSignature SEGMENTS_SIGNATURE = RowSignature
.builder()
.add("segment_id", ValueType.STRING)
@@ -393,7 +395,8 @@ public class SystemSchema extends AbstractSchema
try {
request = coordinatorClient.makeRequest(
HttpMethod.GET,
- StringUtils.format("/druid/coordinator/v1/metadata/segments")
+ StringUtils.format("/druid/coordinator/v1/metadata/segments"),
+ false
);
}
catch (IOException e) {
@@ -403,6 +406,7 @@ public class SystemSchema extends AbstractSchema
request,
responseHandler
);
+
final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<DataSegment>()
{
});
@@ -412,7 +416,8 @@ public class SystemSchema extends AbstractSchema
request.getUrl().toString(),
null,
request.getUrl().getHost(),
- jsonMapper
+ jsonMapper,
+ responseHandler
);
}
@@ -646,7 +651,8 @@ public class SystemSchema extends AbstractSchema
try {
request = indexingServiceClient.makeRequest(
HttpMethod.GET,
- StringUtils.format("/druid/indexer/v1/tasks")
+ StringUtils.format("/druid/indexer/v1/tasks"),
+ false
);
}
catch (IOException e) {
@@ -656,6 +662,7 @@ public class SystemSchema extends AbstractSchema
request,
responseHandler
);
+
final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<TaskStatusPlus>()
{
});
@@ -665,7 +672,8 @@ public class SystemSchema extends AbstractSchema
request.getUrl().toString(),
null,
request.getUrl().getHost(),
- jsonMapper
+ jsonMapper,
+ responseHandler
);
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 528d972..bf2eb87 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -336,11 +336,14 @@ public class SystemSchemaTest extends CalciteTestBase
druidSchema, client, mapper, responseHandler, authMapper).createMock();
EasyMock.replay(segmentsTable);
- EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments")).andReturn(request).anyTimes();
+ EasyMock
+ .expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments", false))
+ .andReturn(request)
+ .anyTimes();
SettableFuture<InputStream> future = SettableFuture.create();
EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
final int ok = HttpServletResponse.SC_OK;
- EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
+ EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/coordinator/v1/metadata/segments")).anyTimes();
@@ -591,11 +594,11 @@ public class SystemSchemaTest extends CalciteTestBase
.withConstructor(client, mapper, responseHandler, authMapper)
.createMock();
EasyMock.replay(tasksTable);
- EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks")).andReturn(request).anyTimes();
+ EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks", false)).andReturn(request).anyTimes();
SettableFuture<InputStream> future = SettableFuture.create();
EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
final int ok = HttpServletResponse.SC_OK;
- EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
+ EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes();
AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org