You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/09/22 10:40:00 UTC

[druid] branch master updated: Add IT for MSQ task engine using the new IT framework (#12992)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 728745a1d3 Add IT for MSQ task engine using the new IT framework (#12992)
728745a1d3 is described below

commit 728745a1d338b618752c96486fe6f63dd9739ab2
Author: Laksh Singla <la...@gmail.com>
AuthorDate: Thu Sep 22 16:09:47 2022 +0530

    Add IT for MSQ task engine using the new IT framework (#12992)
    
    * first test, serde causing problems
    
    * serde working
    
    * insert and select check
    
    * Add cluster annotations for MSQ test cases
    
    * Add cluster config for MSQ
    
    * Add MSQ config to the pom.xml
    
    * cleanup unnecessary changes
    
    * Remove model classes
    
    * Comments, checkstyle, check queries from file
    
    * fixup test case name
    
    * build failure fix
    
    * review changes
    
    * build failure fix
    
    * Trigger Build
    
    * Log the mismatch in QueryResultsVerifier
    
    * Trigger Build
    
    * Change the signature of the results verifier
    
    * review changes
    
    * LGTM fix
    
    * build, change pom
    
    * Trigger Build
    
    * Trigger Build
    
    * trigger build with minimal pom changes
    
    * guice fix in tests
    
    * travis.yml
---
 .travis.yml                                        |  11 +
 .../cluster/Common/environment-configs/common.env  |   2 +-
 .../cluster/MultiStageQuery/docker-compose.yaml    |  95 ++++++++
 integration-tests-ex/cases/pom.xml                 |  14 ++
 .../druid/testsEx/categories/MultiStageQuery.java  |  25 ++
 .../apache/druid/testsEx/config/Initializer.java   |  13 +-
 .../druid/testsEx/msq/ITMultiStageQuery.java       | 125 ++++++++++
 .../resources/cluster/MultiStageQuery/docker.yaml  |  40 ++++
 .../wikipedia_msq_select_query1.json               |  31 +++
 integration-tests/pom.xml                          |   5 +
 .../clients/OverlordResourceTestClient.java        |   6 +-
 .../clients/msq/MsqOverlordResourceTestClient.java |  79 +++++++
 .../druid/testing/guice/DruidTestModule.java       |  10 +
 .../testing/utils/AbstractTestQueryHelper.java     |  20 +-
 .../druid/testing/utils/MsqQueryWithResults.java   |  40 ++++
 .../druid/testing/utils/MsqTestQueryHelper.java    | 257 +++++++++++++++++++++
 .../druid/testing/utils/QueryResultVerifier.java   |  60 ++++-
 .../query/ITQueryRetryTestOnMissingSegments.java   |   2 +-
 18 files changed, 815 insertions(+), 20 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 793fb6cf91..f8c224b06a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -688,6 +688,17 @@ jobs:
       # the Druid services that did not exit normally.
       script: ./it.sh travis HighAvailability
 
+    - &integration_tests_ex
+      name: "(Compile=openjdk8, Run=openjdk8) multi stage query tests"
+      stage: Tests - phase 2
+      jdk: openjdk8
+      services: *integration_test_services
+      env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
+      # Uses the installation defined above. Then, builds the test tools and docker image,
+      # and runs one IT. If tests fail, echos log lines of any of
+      # the Druid services that did not exit normally.
+      script: ./it.sh travis MultiStageQuery
+
     # Disabling BatchIndex test as it is failing with due to timeout, fixing it will be taken in a separate PR.
     #- <<: *integration_tests_ex
     #  name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer (new)"
diff --git a/integration-tests-ex/cases/cluster/Common/environment-configs/common.env b/integration-tests-ex/cases/cluster/Common/environment-configs/common.env
index 9df377f1a8..5354128326 100644
--- a/integration-tests-ex/cases/cluster/Common/environment-configs/common.env
+++ b/integration-tests-ex/cases/cluster/Common/environment-configs/common.env
@@ -49,7 +49,7 @@ DRUID_INSTANCE=
 # variables: druid_standard_loadList defined here, and druid_test_loadList, defined
 # in a docker-compose.yaml file, for any test-specific extensions.
 # See compose.md for more details.
-druid_standard_loadList=mysql-metadata-storage,druid-it-tools,druid-lookups-cached-global,druid-histogram,druid-datasketches,druid-parquet-extensions,druid-avro-extensions,druid-protobuf-extensions,druid-orc-extensions,druid-kafka-indexing-service,druid-s3-extensions
+druid_standard_loadList=mysql-metadata-storage,druid-it-tools,druid-lookups-cached-global,druid-histogram,druid-datasketches,druid-parquet-extensions,druid-avro-extensions,druid-protobuf-extensions,druid-orc-extensions,druid-kafka-indexing-service,druid-s3-extensions,druid-multi-stage-query
 
 # Location of Hadoop dependencies provided at runtime in the shared directory.
 druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies
diff --git a/integration-tests-ex/cases/cluster/MultiStageQuery/docker-compose.yaml b/integration-tests-ex/cases/cluster/MultiStageQuery/docker-compose.yaml
new file mode 100644
index 0000000000..f22ff3e6dd
--- /dev/null
+++ b/integration-tests-ex/cases/cluster/MultiStageQuery/docker-compose.yaml
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+networks:
+  druid-it-net:
+    name: druid-it-net
+    ipam:
+      config:
+        - subnet: 172.172.172.0/24
+
+services:
+  zookeeper:
+    extends:
+      file: ../Common/dependencies.yaml
+      service: zookeeper
+
+  metadata:
+    extends:
+      file: ../Common/dependencies.yaml
+      service: metadata
+
+  coordinator:
+    extends:
+      file: ../Common/druid.yaml
+      service: coordinator
+    container_name: coordinator
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+      - druid_manager_segments_pollDuration=PT5S
+      - druid_coordinator_period=PT10S
+    depends_on:
+      - zookeeper
+      - metadata
+
+  overlord:
+    extends:
+      file: ../Common/druid.yaml
+      service: overlord
+    container_name: overlord
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    depends_on:
+      - zookeeper
+      - metadata
+
+  broker:
+    extends:
+      file: ../Common/druid.yaml
+      service: broker
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    depends_on:
+      - zookeeper
+
+  router:
+    extends:
+      file: ../Common/druid.yaml
+      service: router
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    depends_on:
+      - zookeeper
+
+  historical:
+    extends:
+      file: ../Common/druid.yaml
+      service: historical
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    depends_on:
+      - zookeeper
+
+  indexer:
+    extends:
+      file: ../Common/druid.yaml
+      service: indexer
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    volumes:
+      # Test data
+      - ../../resources:/resources
+    depends_on:
+      - zookeeper
diff --git a/integration-tests-ex/cases/pom.xml b/integration-tests-ex/cases/pom.xml
index 86713aec4e..ae2c80d0d3 100644
--- a/integration-tests-ex/cases/pom.xml
+++ b/integration-tests-ex/cases/pom.xml
@@ -182,6 +182,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.druid.extensions</groupId>
+            <artifactId>druid-multi-stage-query</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
@@ -288,6 +293,15 @@
                 <it.category>AzureDeepStorage</it.category>
             </properties>
         </profile>
+        <profile>
+            <id>IT-MultiStageQuery</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <properties>
+                <it.category>MultiStageQuery</it.category>
+            </properties>
+        </profile>
         <profile>
             <id>docker-tests</id>
             <activation>
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/MultiStageQuery.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/MultiStageQuery.java
new file mode 100644
index 0000000000..e4a8fdc9a0
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/MultiStageQuery.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.categories;
+
+public class MultiStageQuery
+{
+
+}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
index 0c8df88686..ba19758289 100644
--- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
@@ -22,15 +22,18 @@ package org.apache.druid.testsEx.config;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.inject.Binder;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
 import org.apache.druid.cli.GuiceRunnable;
 import org.apache.druid.curator.CuratorModule;
 import org.apache.druid.curator.discovery.DiscoveryModule;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.guice.AnnouncerModule;
 import org.apache.druid.guice.DruidProcessingConfigModule;
 import org.apache.druid.guice.JsonConfigProvider;
@@ -82,6 +85,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * The magic needed to piece together enough of Druid to allow clients to
@@ -141,11 +145,16 @@ public class Initializer
           .in(LazySingleton.class);
 
       // Dummy DruidNode instance to make Guice happy. This instance is unused.
+      DruidNode dummy = new DruidNode("integration-tests", "localhost", false, 9191, null, null, true, false);
       binder
           .bind(DruidNode.class)
           .annotatedWith(Self.class)
-          .toInstance(
-              new DruidNode("integration-tests", "localhost", false, 9191, null, null, true, false));
+          .toInstance(dummy);
+
+      // Required for MSQIndexingModule
+      binder.bind(new TypeLiteral<Set<NodeRole>>()
+      {
+      }).annotatedWith(Self.class).toInstance(ImmutableSet.of(NodeRole.PEON));
 
       // Reduced form of SQLMetadataStorageDruidModule
       String prop = SQLMetadataStorageDruidModule.PROPERTY;
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
new file mode 100644
index 0000000000..e7ec1c0e3e
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.utils.DataLoaderHelper;
+import org.apache.druid.testing.utils.MsqTestQueryHelper;
+import org.apache.druid.testsEx.categories.MultiStageQuery;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(DruidTestRunner.class)
+@Category(MultiStageQuery.class)
+public class ITMultiStageQuery
+{
+  @Inject
+  private MsqTestQueryHelper msqHelper;
+
+  @Inject
+  private SqlResourceTestClient msqClient;
+
+  @Inject
+  private IntegrationTestingConfig config;
+
+  @Inject
+  private ObjectMapper jsonMapper;
+
+  @Inject
+  private DataLoaderHelper dataLoaderHelper;
+
+  @Inject
+  private CoordinatorResourceTestClient coordinatorClient;
+
+  private static final String QUERY_FILE = "/multi-stage-query/wikipedia_msq_select_query1.json";
+
+  @Test
+  public void testMsqIngestionAndQuerying() throws Exception
+  {
+    String datasource = "dst";
+
+    // Clear up the datasource from the previous runs
+    coordinatorClient.unloadSegmentsForDataSource(datasource);
+
+    String queryLocal =
+        StringUtils.format(
+            "INSERT INTO %s\n"
+            + "SELECT\n"
+            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
+            + "  isRobot,\n"
+            + "  diffUrl,\n"
+            + "  added,\n"
+            + "  countryIsoCode,\n"
+            + "  regionName,\n"
+            + "  channel,\n"
+            + "  flags,\n"
+            + "  delta,\n"
+            + "  isUnpatrolled,\n"
+            + "  isNew,\n"
+            + "  deltaBucket,\n"
+            + "  isMinor,\n"
+            + "  isAnonymous,\n"
+            + "  deleted,\n"
+            + "  cityName,\n"
+            + "  metroCode,\n"
+            + "  namespace,\n"
+            + "  comment,\n"
+            + "  page,\n"
+            + "  commentLength,\n"
+            + "  countryName,\n"
+            + "  user,\n"
+            + "  regionIsoCode\n"
+            + "FROM TABLE(\n"
+            + "  EXTERN(\n"
+            + "    '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+            + "    '{\"type\":\"json\"}',\n"
+            + "    '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i [...]
+            + "  )\n"
+            + ")\n"
+            + "PARTITIONED BY DAY\n"
+            + "CLUSTERED BY \"__time\"",
+            datasource
+        );
+
+    // Submit the task and wait for the datasource to get loaded
+    SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(queryLocal);
+
+    if (sqlTaskStatus.getState().isFailure()) {
+      Assert.fail(StringUtils.format(
+          "Unable to start the task successfully.\nPossible exception: %s",
+          sqlTaskStatus.getError()
+      ));
+    }
+
+    msqHelper.pollTaskIdForCompletion(sqlTaskStatus.getTaskId());
+    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+
+    msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
+  }
+}
diff --git a/integration-tests-ex/cases/src/test/resources/cluster/MultiStageQuery/docker.yaml b/integration-tests-ex/cases/src/test/resources/cluster/MultiStageQuery/docker.yaml
new file mode 100644
index 0000000000..8fab9c0992
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/cluster/MultiStageQuery/docker.yaml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#-------------------------------------------------------------------------
+
+# Definition of the multi stage query test cluster.
+# See https://yaml.org/spec/1.2.2 for more about YAML
+include:
+  - /cluster/Common/zk-metastore.yaml
+
+druid:
+  coordinator:
+    instances:
+      - port: 8081
+  overlord:
+    instances:
+      - port: 8090
+  broker:
+    instances:
+      - port: 8082
+  router:
+    instances:
+      - port: 8888
+  historical:
+    instances:
+      - port: 8083
+  indexer:
+    instances:
+      - port: 8091
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query1.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query1.json
new file mode 100644
index 0000000000..32c3d592d6
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query1.json
@@ -0,0 +1,31 @@
+[
+  {
+    "query": "SELECT __time, isRobot, added, delta, deleted, namespace FROM %%DATASOURCE%%",
+    "expectedResults": [
+      {
+        "__time": 1377910953000,
+        "isRobot": null,
+        "added": 57,
+        "delta": -143,
+        "deleted": 200,
+        "namespace": "article"
+      },
+      {
+        "__time": 1377919965000,
+        "isRobot": null,
+        "added": 459,
+        "delta": 330,
+        "deleted": 129,
+        "namespace": "wikipedia"
+      },
+      {
+        "__time": 1377933081000,
+        "isRobot": null,
+        "added": 123,
+        "delta": 111,
+        "deleted": 12,
+        "namespace":"article"
+      }
+    ]
+  }
+]
\ No newline at end of file
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 5a3166e273..bdcea9605f 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -239,6 +239,11 @@
             <artifactId>simple-client-sslcontext</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.druid.extensions</groupId>
+            <artifactId>druid-multi-stage-query</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.druid</groupId>
             <artifactId>druid-services</artifactId>
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index e3f2a98dc6..bf0f549b19 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -61,7 +61,7 @@ public class OverlordResourceTestClient
   private final String indexer;
 
   @Inject
-  OverlordResourceTestClient(
+  protected OverlordResourceTestClient(
       ObjectMapper jsonMapper,
       @TestClient HttpClient httpClient,
       IntegrationTestingConfig config
@@ -72,7 +72,7 @@ public class OverlordResourceTestClient
     this.indexer = config.getOverlordUrl();
   }
 
-  private String getIndexerURL()
+  protected String getIndexerURL()
   {
     return StringUtils.format(
         "%s/druid/indexer/v1/",
@@ -720,7 +720,7 @@ public class OverlordResourceTestClient
     }
   }
 
-  private StatusResponseHolder makeRequest(HttpMethod method, String url)
+  protected StatusResponseHolder makeRequest(HttpMethod method, String url)
   {
     try {
       StatusResponseHolder response = this.httpClient
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java
new file mode 100644
index 0000000000..c9291c4532
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.clients.msq;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.OverlordResourceTestClient;
+import org.apache.druid.testing.guice.TestClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Overlord resource client for MSQ Tasks
+ */
+public class MsqOverlordResourceTestClient extends OverlordResourceTestClient
+{
+  private ObjectMapper jsonMapper;
+
+  @Inject
+  MsqOverlordResourceTestClient(
+      @Json ObjectMapper jsonMapper,
+      @TestClient HttpClient httpClient,
+      IntegrationTestingConfig config
+  )
+  {
+    super(jsonMapper, httpClient, config);
+    this.jsonMapper = jsonMapper;
+    this.jsonMapper.registerModules(new MSQIndexingModule().getJacksonModules());
+  }
+
+  public Map<String, MSQTaskReport> getMsqTaskReport(String taskId)
+  {
+    try {
+      StatusResponseHolder response = makeRequest(
+          HttpMethod.GET,
+          StringUtils.format(
+              "%s%s",
+              getIndexerURL(),
+              StringUtils.format("task/%s/reports", StringUtils.urlEncode(taskId))
+          )
+      );
+      return jsonMapper.readValue(response.getContent(), new TypeReference<Map<String, MSQTaskReport>>()
+      {
+      });
+    }
+    catch (RuntimeException e) {
+      throw e;
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java
index b0bef03408..cef449c782 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java
@@ -21,10 +21,13 @@ package org.apache.druid.testing.guice;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
 import com.google.inject.Binder;
 import com.google.inject.Module;
 import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
 import org.apache.druid.curator.CuratorConfig;
+import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.EscalatedClient;
@@ -41,6 +44,8 @@ import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.IntegrationTestingConfigProvider;
 import org.apache.druid.testing.IntegrationTestingCuratorConfig;
 
+import java.util.Set;
+
 /**
  */
 public class DruidTestModule implements Module
@@ -59,6 +64,11 @@ public class DruidTestModule implements Module
     binder.bind(DruidNode.class).annotatedWith(Self.class).toInstance(
         new DruidNode("integration-tests", "localhost", false, 9191, null, null, true, false)
     );
+
+    // Required for MSQIndexingModule
+    binder.bind(new TypeLiteral<Set<NodeRole>>()
+    {
+    }).annotatedWith(Self.class).toInstance(ImmutableSet.of(NodeRole.PEON));
   }
 
   @Provides
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java
index 8c7bcb0208..7f2773a898 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java
@@ -146,24 +146,28 @@ public abstract class AbstractTestQueryHelper<QueryResultType extends AbstractQu
     for (QueryResultType queryWithResult : queries) {
       LOG.info("Running Query %s", queryWithResult.getQuery());
       List<Map<String, Object>> result = queryClient.query(url, queryWithResult.getQuery());
-      if (!QueryResultVerifier.compareResults(result, queryWithResult.getExpectedResults(),
-                                              queryWithResult.getFieldsToTest()
-      )) {
+      QueryResultVerifier.ResultVerificationObject resultsComparison = QueryResultVerifier.compareResults(
+          result,
+          queryWithResult.getExpectedResults(),
+          queryWithResult.getFieldsToTest()
+      );
+      if (!resultsComparison.isSuccess()) {
         LOG.error(
             "Failed while executing query %s \n expectedResults: %s \n actualResults : %s",
             queryWithResult.getQuery(),
             jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
             jsonMapper.writeValueAsString(result)
         );
-        failed = true;
+        throw new ISE(
+            "Results mismatch while executing the query %s.\n"
+            + "Mismatch error: %s\n",
+            queryWithResult.getQuery(),
+            resultsComparison.getErrorMessage()
+        );
       } else {
         LOG.info("Results Verified for Query %s", queryWithResult.getQuery());
       }
     }
-
-    if (failed) {
-      throw new ISE("one or more queries failed");
-    }
   }
 
   @SuppressWarnings("unchecked")
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqQueryWithResults.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqQueryWithResults.java
new file mode 100644
index 0000000000..7c246aeff5
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqQueryWithResults.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class MsqQueryWithResults extends AbstractQueryWithResults<String>
+{
+
+  @JsonCreator
+  public MsqQueryWithResults(
+      @JsonProperty("query") String query,
+      @JsonProperty("expectedResults") List<Map<String, Object>> expectedResults
+  )
+  {
+    super(query, expectedResults, Collections.emptyList());
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
new file mode 100644
index 0000000000..424d070529
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default headers and parameters
+   */
+  public SqlTaskStatus submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
+   * and returns the status associated with the submitted task
+   */
+  public SqlTaskStatus submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the task id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder;
+    try {
+      statusResponseHolder = responseHolderFuture.get(5, TimeUnit.MINUTES);
+    }
+    catch (TimeoutException e) {
+      throw new ISE(e, "Unable to fetch the task id for the submitted task in time.");
+    }
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    return sqlTaskStatus;
+  }
+
+  /**
+   * The method retries till the task with taskId gets completed i.e. {@link TaskState#isComplete()}} returns true and
+   * returns the last fetched state {@link TaskState} of the task
+   */
+  public TaskState pollTaskIdForCompletion(String taskId) throws Exception
+  {
+    return RetryUtils.retry(
+        () -> {
+          TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId);
+          TaskState statusCode = taskStatusPlus.getStatusCode();
+          if (statusCode != null && statusCode.isComplete()) {
+            return taskStatusPlus.getStatusCode();
+          }
+          throw new TaskStillRunningException();
+        },
+        (Throwable t) -> t instanceof TaskStillRunningException,
+        100
+    );
+  }
+
+  /**
+   * Fetches status reports for a given task
+   */
+  public Map<String, MSQTaskReport> fetchStatusReports(String taskId)
+  {
+    return overlordClient.getMsqTaskReport(taskId);
+  }
+
+  /**
+   * Compares the results for a given taskId. It is required that the task has produced some results that can be verified
+   */
+  private void compareResults(String taskId, MsqQueryWithResults expectedQueryWithResults)
+  {
+    Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId);
+    MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY);
+    if (taskReport == null) {
+      throw new ISE("Unable to fetch the status report for the task [%]", taskId);
+    }
+    MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
+        taskReport.getPayload(),
+        "payload"
+    );
+    MSQResultsReport resultsReport = Preconditions.checkNotNull(
+        taskReportPayload.getResults(),
+        "Results report for the task id is empty"
+    );
+
+    List<Map<String, Object>> actualResults = new ArrayList<>();
+
+    Yielder<Object[]> yielder = resultsReport.getResultYielder();
+    RowSignature rowSignature = resultsReport.getSignature();
+
+    while (!yielder.isDone()) {
+      Object[] row = yielder.get();
+      Map<String, Object> rowWithFieldNames = new LinkedHashMap<>();
+      for (int i = 0; i < row.length; ++i) {
+        rowWithFieldNames.put(rowSignature.getColumnName(i), row[i]);
+      }
+      actualResults.add(rowWithFieldNames);
+      yielder = yielder.next(null);
+    }
+
+    QueryResultVerifier.ResultVerificationObject resultsComparison = QueryResultVerifier.compareResults(
+        actualResults,
+        expectedQueryWithResults.getExpectedResults(),
+        Collections.emptyList()
+    );
+    if (!resultsComparison.isSuccess()) {
+      throw new IAE(
+          "Expected query result is different from the actual result.\n"
+          + "Query: %s\n"
+          + "Actual Result: %s\n"
+          + "Expected Result: %s\n"
+          + "Mismatch Error: %s\n",
+          expectedQueryWithResults.getQuery(),
+          actualResults,
+          expectedQueryWithResults.getExpectedResults(),
+          resultsComparison.getErrorMessage()
+      );
+    }
+  }
+
+  /**
+   * Runs queries from files using MSQ and compares the results with the ones provided
+   */
+  @Override
+  public void testQueriesFromFile(String filePath, String fullDatasourcePath) throws Exception
+  {
+    LOG.info("Starting query tests for [%s]", filePath);
+    List<MsqQueryWithResults> queries =
+        jsonMapper.readValue(
+            TestQueryHelper.class.getResourceAsStream(filePath),
+            new TypeReference<List<MsqQueryWithResults>>()
+            {
+            }
+        );
+    for (MsqQueryWithResults queryWithResults : queries) {
+      String queryString = queryWithResults.getQuery();
+      String queryWithDatasource = StringUtils.replace(queryString, "%%DATASOURCE%%", fullDatasourcePath);
+      SqlTaskStatus sqlTaskStatus = submitMsqTask(queryWithDatasource);
+      if (sqlTaskStatus.getState().isFailure()) {
+        throw new ISE(
+            "Unable to start the task successfully.\nPossible exception: %s",
+            sqlTaskStatus.getError()
+        );
+      }
+      String taskId = sqlTaskStatus.getTaskId();
+      pollTaskIdForCompletion(taskId);
+      compareResults(taskId, queryWithResults);
+    }
+  }
+
+  private static class TaskStillRunningException extends Exception
+  {
+
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java
index 2e25f11792..7d8be55bf7 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java
@@ -19,13 +19,21 @@
 
 package org.apache.druid.testing.utils;
 
+import org.apache.druid.java.util.common.StringUtils;
+
+import javax.annotation.Nullable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 public class QueryResultVerifier
 {
-  public static boolean compareResults(
+  /**
+   * Tests the actual vs the expected results for equality and returns a {@link ResultVerificationObject} containing the
+   * result of the check. If fieldsToTest is not null and non empty, only the supplied fields would be tested for
+   * equality. Else, the whole row is compared
+   */
+  public static ResultVerificationObject compareResults(
       Iterable<Map<String, Object>> actual,
       Iterable<Map<String, Object>> expected,
       List<String> fieldsToTest
@@ -34,6 +42,7 @@ public class QueryResultVerifier
     Iterator<Map<String, Object>> actualIter = actual.iterator();
     Iterator<Map<String, Object>> expectedIter = expected.iterator();
 
+    int rowNumber = 1;
     while (actualIter.hasNext() && expectedIter.hasNext()) {
       Map<String, Object> actualRes = actualIter.next();
       Map<String, Object> expRes = expectedIter.next();
@@ -41,19 +50,60 @@ public class QueryResultVerifier
       if (fieldsToTest != null && !fieldsToTest.isEmpty()) {
         for (String field : fieldsToTest) {
           if (!actualRes.get(field).equals(expRes.get(field))) {
-            return false;
+            String mismatchMessage = StringUtils.format(
+                "Mismatch in row no. [%d], column [%s]. Expected: [%s], Actual: [%s]",
+                rowNumber,
+                field,
+                expRes,
+                actualRes
+            );
+            return new ResultVerificationObject(mismatchMessage);
           }
         }
       } else {
         if (!actualRes.equals(expRes)) {
-          return false;
+          String mismatchMessage = StringUtils.format(
+              "Mismatch in row no. [%d]. Expected: [%s], Actual: [%s]",
+              rowNumber,
+              expRes,
+              actualRes
+          );
+          return new ResultVerificationObject(mismatchMessage);
         }
       }
+      ++rowNumber;
     }
 
     if (actualIter.hasNext() || expectedIter.hasNext()) {
-      return false;
+      String mismatchMessage =
+          StringUtils.format(
+              "Results size mismatch. The actual result contain %s rows than the expected result.",
+              actualIter.hasNext() ? "more" : "less"
+          );
+      return new ResultVerificationObject(mismatchMessage);
+    }
+    return new ResultVerificationObject(null);
+  }
+
+  public static class ResultVerificationObject
+  {
+    @Nullable
+    private final String errorMessage;
+
+    ResultVerificationObject(@Nullable final String errorMessage)
+    {
+      this.errorMessage = errorMessage;
+    }
+
+    public boolean isSuccess()
+    {
+      return getErrorMessage() == null;
+    }
+
+    @Nullable
+    public String getErrorMessage()
+    {
+      return errorMessage;
     }
-    return true;
   }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
index 7d8528b05c..999ebc8c68 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
@@ -164,7 +164,7 @@ public class ITQueryRetryTestOnMissingSegments
             result,
             queryWithResult.getExpectedResults(),
             queryWithResult.getFieldsToTest()
-        )) {
+        ).isSuccess()) {
           if (expectation != Expectation.INCORRECT_RESULT) {
             throw new ISE(
                 "Incorrect query results for query %s \n expectedResults: %s \n actualResults : %s",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org