You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/03/09 00:17:58 UTC
[incubator-druid] branch master updated: integration-tests: make
ITParallelIndexTest still work in parallel (#7211)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new de55905 integration-tests: make ITParallelIndexTest still work in parallel (#7211)
de55905 is described below
commit de55905a5fd5e5093d2fd156542a2b209e2d6c58
Author: David Glasser <gl...@apollographql.com>
AuthorDate: Fri Mar 8 16:17:52 2019 -0800
integration-tests: make ITParallelIndexTest still work in parallel (#7211)
* integration-tests: make ITParallelIndexTest still work in parallel
Follow-up to #7181, which made the default behavior for index_parallel tasks
non-parallel.
* Validate that parallel index subtasks were run
---
.../clients/OverlordResourceTestClient.java | 14 ++++++++++++-
.../druid/testing/clients/TaskResponseObject.java | 9 +++++++++
.../tests/indexer/AbstractITBatchIndexTest.java | 23 ++++++++++++++++++++++
.../indexer/wikipedia_parallel_index_task.json | 4 ++++
.../indexer/wikipedia_parallel_reindex_task.json | 4 ++++
5 files changed, 53 insertions(+), 1 deletion(-)
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 5cee2fb..f2e600e 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -153,6 +153,11 @@ public class OverlordResourceTestClient
return getTasks("pendingTasks");
}
+ public List<TaskResponseObject> getCompleteTasksForDataSource(final String dataSource)
+ {
+ return getTasks(StringUtils.format("tasks?state=complete&datasource=%s", StringUtils.urlEncode(dataSource)));
+ }
+
private List<TaskResponseObject> getTasks(String identifier)
{
try {
@@ -233,7 +238,14 @@ public class OverlordResourceTestClient
{
try {
StatusResponseHolder response = httpClient.go(
- new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), StringUtils.urlEncode(id)))),
+ new Request(
+ HttpMethod.POST,
+ new URL(StringUtils.format(
+ "%ssupervisor/%s/shutdown",
+ getIndexerURL(),
+ StringUtils.urlEncode(id)
+ ))
+ ),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
index 501cd5e..38f0a86 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
@@ -28,6 +28,7 @@ public class TaskResponseObject
{
private final String id;
+ private final String type;
private final DateTime createdTime;
private final DateTime queueInsertionTime;
private final TaskState status;
@@ -35,12 +36,14 @@ public class TaskResponseObject
@JsonCreator
private TaskResponseObject(
@JsonProperty("id") String id,
+ @JsonProperty("type") String type,
@JsonProperty("createdTime") DateTime createdTime,
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
@JsonProperty("status") TaskState status
)
{
this.id = id;
+ this.type = type;
this.createdTime = createdTime;
this.queueInsertionTime = queueInsertionTime;
this.status = status;
@@ -53,6 +56,12 @@ public class TaskResponseObject
}
@SuppressWarnings("unused") // Used by Jackson serialization?
+ public String getType()
+ {
+ return type;
+ }
+
+ @SuppressWarnings("unused") // Used by Jackson serialization?
public DateTime getCreatedTime()
{
return createdTime;
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index 226824f..121fd7a 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -160,10 +160,25 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
{
final Set<String> oldVersions = waitForNewVersion ? coordinator.getSegmentVersions(dataSourceName) : null;
+ long startSubTaskCount = -1;
+ final boolean assertRunsSubTasks = taskSpec.contains("index_parallel");
+ if (assertRunsSubTasks) {
+ startSubTaskCount = countCompleteSubTasks(dataSourceName);
+ }
+
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
+ if (assertRunsSubTasks) {
+ final long newSubTasks = countCompleteSubTasks(dataSourceName) - startSubTaskCount;
+ Assert.assertTrue(
+ StringUtils.format(
+ "The supervisor task[%s] didn't create any sub tasks. Was it executed in the parallel mode?",
+ taskID
+ ), newSubTasks > 0);
+ }
+
// ITParallelIndexTest does a second round of ingestion to replace segements in an existing
// data source. For that second round we need to make sure the coordinator actually learned
// about the new segments befor waiting for it to report that all segments are loaded; otherwise
@@ -179,4 +194,12 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
() -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load"
);
}
+
+ private long countCompleteSubTasks(final String dataSource)
+ {
+ return indexer.getCompleteTasksForDataSource(dataSource)
+ .stream()
+ .filter(t -> t.getType().equals("index_sub"))
+ .count();
+ }
}
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json
index f317c53..887508a 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json
@@ -61,6 +61,10 @@
"baseDir": "/resources/data/batch_index",
"filter": "wikipedia_index_data*"
}
+ },
+ "tuningConfig": {
+ "type": "index_parallel",
+ "maxNumSubTasks": 10
}
}
}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json
index c06890b..ef16c64 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json
@@ -60,6 +60,10 @@
"baseDir": "/resources/data/batch_index",
"filter": "wikipedia_index_data2*"
}
+ },
+ "tuningConfig": {
+ "type": "index_parallel",
+ "maxNumSubTasks": 10
}
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org