You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/03/09 00:17:58 UTC

[incubator-druid] branch master updated: integration-tests: make ITParallelIndexTest still work in parallel (#7211)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new de55905  integration-tests: make ITParallelIndexTest still work in parallel (#7211)
de55905 is described below

commit de55905a5fd5e5093d2fd156542a2b209e2d6c58
Author: David Glasser <gl...@apollographql.com>
AuthorDate: Fri Mar 8 16:17:52 2019 -0800

    integration-tests: make ITParallelIndexTest still work in parallel (#7211)
    
    * integration-tests: make ITParallelIndexTest still work in parallel
    
    Follow-up to #7181, which made the default behavior for index_parallel tasks
    non-parallel.
    
    * Validate that parallel index subtasks were run
---
 .../clients/OverlordResourceTestClient.java        | 14 ++++++++++++-
 .../druid/testing/clients/TaskResponseObject.java  |  9 +++++++++
 .../tests/indexer/AbstractITBatchIndexTest.java    | 23 ++++++++++++++++++++++
 .../indexer/wikipedia_parallel_index_task.json     |  4 ++++
 .../indexer/wikipedia_parallel_reindex_task.json   |  4 ++++
 5 files changed, 53 insertions(+), 1 deletion(-)

diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 5cee2fb..f2e600e 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -153,6 +153,11 @@ public class OverlordResourceTestClient
     return getTasks("pendingTasks");
   }
 
+  public List<TaskResponseObject> getCompleteTasksForDataSource(final String dataSource)
+  {
+    return getTasks(StringUtils.format("tasks?state=complete&datasource=%s", StringUtils.urlEncode(dataSource)));
+  }
+
   private List<TaskResponseObject> getTasks(String identifier)
   {
     try {
@@ -233,7 +238,14 @@ public class OverlordResourceTestClient
   {
     try {
       StatusResponseHolder response = httpClient.go(
-          new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), StringUtils.urlEncode(id)))),
+          new Request(
+              HttpMethod.POST,
+              new URL(StringUtils.format(
+                  "%ssupervisor/%s/shutdown",
+                  getIndexerURL(),
+                  StringUtils.urlEncode(id)
+              ))
+          ),
           responseHandler
       ).get();
       if (!response.getStatus().equals(HttpResponseStatus.OK)) {
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
index 501cd5e..38f0a86 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
@@ -28,6 +28,7 @@ public class TaskResponseObject
 {
 
   private final String id;
+  private final String type;
   private final DateTime createdTime;
   private final DateTime queueInsertionTime;
   private final TaskState status;
@@ -35,12 +36,14 @@ public class TaskResponseObject
   @JsonCreator
   private TaskResponseObject(
       @JsonProperty("id") String id,
+      @JsonProperty("type") String type,
       @JsonProperty("createdTime") DateTime createdTime,
       @JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
       @JsonProperty("status") TaskState status
   )
   {
     this.id = id;
+    this.type = type;
     this.createdTime = createdTime;
     this.queueInsertionTime = queueInsertionTime;
     this.status = status;
@@ -53,6 +56,12 @@ public class TaskResponseObject
   }
 
   @SuppressWarnings("unused") // Used by Jackson serialization?
+  public String getType()
+  {
+    return type;
+  }
+
+  @SuppressWarnings("unused") // Used by Jackson serialization?
   public DateTime getCreatedTime()
   {
     return createdTime;
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index 226824f..121fd7a 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -160,10 +160,25 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
   {
     final Set<String> oldVersions = waitForNewVersion ? coordinator.getSegmentVersions(dataSourceName) : null;
 
+    long startSubTaskCount = -1;
+    final boolean assertRunsSubTasks = taskSpec.contains("index_parallel");
+    if (assertRunsSubTasks) {
+      startSubTaskCount = countCompleteSubTasks(dataSourceName);
+    }
+
     final String taskID = indexer.submitTask(taskSpec);
     LOG.info("TaskID for loading index task %s", taskID);
     indexer.waitUntilTaskCompletes(taskID);
 
+    if (assertRunsSubTasks) {
+      final long newSubTasks = countCompleteSubTasks(dataSourceName) - startSubTaskCount;
+      Assert.assertTrue(
+          StringUtils.format(
+              "The supervisor task[%s] didn't create any sub tasks. Was it executed in the parallel mode?",
+              taskID
+          ), newSubTasks > 0);
+    }
+
     // ITParallelIndexTest does a second round of ingestion to replace segements in an existing
     // data source. For that second round we need to make sure the coordinator actually learned
     // about the new segments befor waiting for it to report that all segments are loaded; otherwise
@@ -179,4 +194,12 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
         () -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load"
     );
   }
+
+  private long countCompleteSubTasks(final String dataSource)
+  {
+    return indexer.getCompleteTasksForDataSource(dataSource)
+                  .stream()
+                  .filter(t -> t.getType().equals("index_sub"))
+                  .count();
+  }
 }
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json
index f317c53..887508a 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json
@@ -61,6 +61,10 @@
                 "baseDir": "/resources/data/batch_index",
                 "filter": "wikipedia_index_data*"
             }
+        },
+        "tuningConfig": {
+            "type": "index_parallel",
+            "maxNumSubTasks": 10
         }
     }
 }
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json
index c06890b..ef16c64 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json
@@ -60,6 +60,10 @@
                 "baseDir": "/resources/data/batch_index",
                 "filter": "wikipedia_index_data2*"
             }
+        },
+        "tuningConfig": {
+            "type": "index_parallel",
+            "maxNumSubTasks": 10
         }
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org