You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/09/22 10:40:00 UTC
[druid] branch master updated: Add IT for MSQ task engine using the new IT framework (#12992)
This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 728745a1d3 Add IT for MSQ task engine using the new IT framework (#12992)
728745a1d3 is described below
commit 728745a1d338b618752c96486fe6f63dd9739ab2
Author: Laksh Singla <la...@gmail.com>
AuthorDate: Thu Sep 22 16:09:47 2022 +0530
Add IT for MSQ task engine using the new IT framework (#12992)
* first test, serde causing problems
* serde working
* insert and select check
* Add cluster annotations for MSQ test cases
* Add cluster config for MSQ
* Add MSQ config to the pom.xml
* cleanup unnecessary changes
* Remove model classes
* Comments, checkstyle, check queries from file
* fixup test case name
* build failure fix
* review changes
* build failure fix
* Trigger Build
* Log the mismatch in QueryResultsVerifier
* Trigger Build
* Change the signature of the results verifier
* review changes
* LGTM fix
* build, change pom
* Trigger Build
* Trigger Build
* trigger build with minimal pom changes
* guice fix in tests
* travis.yml
---
.travis.yml | 11 +
.../cluster/Common/environment-configs/common.env | 2 +-
.../cluster/MultiStageQuery/docker-compose.yaml | 95 ++++++++
integration-tests-ex/cases/pom.xml | 14 ++
.../druid/testsEx/categories/MultiStageQuery.java | 25 ++
.../apache/druid/testsEx/config/Initializer.java | 13 +-
.../druid/testsEx/msq/ITMultiStageQuery.java | 125 ++++++++++
.../resources/cluster/MultiStageQuery/docker.yaml | 40 ++++
.../wikipedia_msq_select_query1.json | 31 +++
integration-tests/pom.xml | 5 +
.../clients/OverlordResourceTestClient.java | 6 +-
.../clients/msq/MsqOverlordResourceTestClient.java | 79 +++++++
.../druid/testing/guice/DruidTestModule.java | 10 +
.../testing/utils/AbstractTestQueryHelper.java | 20 +-
.../druid/testing/utils/MsqQueryWithResults.java | 40 ++++
.../druid/testing/utils/MsqTestQueryHelper.java | 257 +++++++++++++++++++++
.../druid/testing/utils/QueryResultVerifier.java | 60 ++++-
.../query/ITQueryRetryTestOnMissingSegments.java | 2 +-
18 files changed, 815 insertions(+), 20 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 793fb6cf91..f8c224b06a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -688,6 +688,17 @@ jobs:
# the Druid services that did not exit normally.
script: ./it.sh travis HighAvailability
+ - &integration_tests_ex
+ name: "(Compile=openjdk8, Run=openjdk8) multi stage query tests"
+ stage: Tests - phase 2
+ jdk: openjdk8
+ services: *integration_test_services
+ env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
+ # Uses the installation defined above. Then, builds the test tools and docker image,
+ # and runs one IT. If tests fail, echos log lines of any of
+ # the Druid services that did not exit normally.
+ script: ./it.sh travis MultiStageQuery
+
# Disabling BatchIndex test as it is failing with due to timeout, fixing it will be taken in a separate PR.
#- <<: *integration_tests_ex
# name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer (new)"
diff --git a/integration-tests-ex/cases/cluster/Common/environment-configs/common.env b/integration-tests-ex/cases/cluster/Common/environment-configs/common.env
index 9df377f1a8..5354128326 100644
--- a/integration-tests-ex/cases/cluster/Common/environment-configs/common.env
+++ b/integration-tests-ex/cases/cluster/Common/environment-configs/common.env
@@ -49,7 +49,7 @@ DRUID_INSTANCE=
# variables: druid_standard_loadList defined here, and druid_test_loadList, defined
# in a docker-compose.yaml file, for any test-specific extensions.
# See compose.md for more details.
-druid_standard_loadList=mysql-metadata-storage,druid-it-tools,druid-lookups-cached-global,druid-histogram,druid-datasketches,druid-parquet-extensions,druid-avro-extensions,druid-protobuf-extensions,druid-orc-extensions,druid-kafka-indexing-service,druid-s3-extensions
+druid_standard_loadList=mysql-metadata-storage,druid-it-tools,druid-lookups-cached-global,druid-histogram,druid-datasketches,druid-parquet-extensions,druid-avro-extensions,druid-protobuf-extensions,druid-orc-extensions,druid-kafka-indexing-service,druid-s3-extensions,druid-multi-stage-query
# Location of Hadoop dependencies provided at runtime in the shared directory.
druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies
diff --git a/integration-tests-ex/cases/cluster/MultiStageQuery/docker-compose.yaml b/integration-tests-ex/cases/cluster/MultiStageQuery/docker-compose.yaml
new file mode 100644
index 0000000000..f22ff3e6dd
--- /dev/null
+++ b/integration-tests-ex/cases/cluster/MultiStageQuery/docker-compose.yaml
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+networks:
+ druid-it-net:
+ name: druid-it-net
+ ipam:
+ config:
+ - subnet: 172.172.172.0/24
+
+services:
+ zookeeper:
+ extends:
+ file: ../Common/dependencies.yaml
+ service: zookeeper
+
+ metadata:
+ extends:
+ file: ../Common/dependencies.yaml
+ service: metadata
+
+ coordinator:
+ extends:
+ file: ../Common/druid.yaml
+ service: coordinator
+ container_name: coordinator
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ - druid_manager_segments_pollDuration=PT5S
+ - druid_coordinator_period=PT10S
+ depends_on:
+ - zookeeper
+ - metadata
+
+ overlord:
+ extends:
+ file: ../Common/druid.yaml
+ service: overlord
+ container_name: overlord
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ depends_on:
+ - zookeeper
+ - metadata
+
+ broker:
+ extends:
+ file: ../Common/druid.yaml
+ service: broker
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ depends_on:
+ - zookeeper
+
+ router:
+ extends:
+ file: ../Common/druid.yaml
+ service: router
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ depends_on:
+ - zookeeper
+
+ historical:
+ extends:
+ file: ../Common/druid.yaml
+ service: historical
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ depends_on:
+ - zookeeper
+
+ indexer:
+ extends:
+ file: ../Common/druid.yaml
+ service: indexer
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ volumes:
+ # Test data
+ - ../../resources:/resources
+ depends_on:
+ - zookeeper
diff --git a/integration-tests-ex/cases/pom.xml b/integration-tests-ex/cases/pom.xml
index 86713aec4e..ae2c80d0d3 100644
--- a/integration-tests-ex/cases/pom.xml
+++ b/integration-tests-ex/cases/pom.xml
@@ -182,6 +182,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.druid.extensions</groupId>
+ <artifactId>druid-multi-stage-query</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@@ -288,6 +293,15 @@
<it.category>AzureDeepStorage</it.category>
</properties>
</profile>
+ <profile>
+ <id>IT-MultiStageQuery</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <properties>
+ <it.category>MultiStageQuery</it.category>
+ </properties>
+ </profile>
<profile>
<id>docker-tests</id>
<activation>
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/MultiStageQuery.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/MultiStageQuery.java
new file mode 100644
index 0000000000..e4a8fdc9a0
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/MultiStageQuery.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.categories;
+
+public class MultiStageQuery
+{
+
+}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
index 0c8df88686..ba19758289 100644
--- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
@@ -22,15 +22,18 @@ package org.apache.druid.testsEx.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
import org.apache.druid.cli.GuiceRunnable;
import org.apache.druid.curator.CuratorModule;
import org.apache.druid.curator.discovery.DiscoveryModule;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.AnnouncerModule;
import org.apache.druid.guice.DruidProcessingConfigModule;
import org.apache.druid.guice.JsonConfigProvider;
@@ -82,6 +85,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
+import java.util.Set;
/**
* The magic needed to piece together enough of Druid to allow clients to
@@ -141,11 +145,16 @@ public class Initializer
.in(LazySingleton.class);
// Dummy DruidNode instance to make Guice happy. This instance is unused.
+ DruidNode dummy = new DruidNode("integration-tests", "localhost", false, 9191, null, null, true, false);
binder
.bind(DruidNode.class)
.annotatedWith(Self.class)
- .toInstance(
- new DruidNode("integration-tests", "localhost", false, 9191, null, null, true, false));
+ .toInstance(dummy);
+
+ // Required for MSQIndexingModule
+ binder.bind(new TypeLiteral<Set<NodeRole>>()
+ {
+ }).annotatedWith(Self.class).toInstance(ImmutableSet.of(NodeRole.PEON));
// Reduced form of SQLMetadataStorageDruidModule
String prop = SQLMetadataStorageDruidModule.PROPERTY;
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
new file mode 100644
index 0000000000..e7ec1c0e3e
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.utils.DataLoaderHelper;
+import org.apache.druid.testing.utils.MsqTestQueryHelper;
+import org.apache.druid.testsEx.categories.MultiStageQuery;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(DruidTestRunner.class)
+@Category(MultiStageQuery.class)
+public class ITMultiStageQuery
+{
+ @Inject
+ private MsqTestQueryHelper msqHelper;
+
+ @Inject
+ private SqlResourceTestClient msqClient;
+
+ @Inject
+ private IntegrationTestingConfig config;
+
+ @Inject
+ private ObjectMapper jsonMapper;
+
+ @Inject
+ private DataLoaderHelper dataLoaderHelper;
+
+ @Inject
+ private CoordinatorResourceTestClient coordinatorClient;
+
+ private static final String QUERY_FILE = "/multi-stage-query/wikipedia_msq_select_query1.json";
+
+ @Test
+ public void testMsqIngestionAndQuerying() throws Exception
+ {
+ String datasource = "dst";
+
+ // Clear up the datasource from the previous runs
+ coordinatorClient.unloadSegmentsForDataSource(datasource);
+
+ String queryLocal =
+ StringUtils.format(
+ "INSERT INTO %s\n"
+ + "SELECT\n"
+ + " TIME_PARSE(\"timestamp\") AS __time,\n"
+ + " isRobot,\n"
+ + " diffUrl,\n"
+ + " added,\n"
+ + " countryIsoCode,\n"
+ + " regionName,\n"
+ + " channel,\n"
+ + " flags,\n"
+ + " delta,\n"
+ + " isUnpatrolled,\n"
+ + " isNew,\n"
+ + " deltaBucket,\n"
+ + " isMinor,\n"
+ + " isAnonymous,\n"
+ + " deleted,\n"
+ + " cityName,\n"
+ + " metroCode,\n"
+ + " namespace,\n"
+ + " comment,\n"
+ + " page,\n"
+ + " commentLength,\n"
+ + " countryName,\n"
+ + " user,\n"
+ + " regionIsoCode\n"
+ + "FROM TABLE(\n"
+ + " EXTERN(\n"
+ + " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+ + " '{\"type\":\"json\"}',\n"
+ + " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i [...]
+ + " )\n"
+ + ")\n"
+ + "PARTITIONED BY DAY\n"
+ + "CLUSTERED BY \"__time\"",
+ datasource
+ );
+
+ // Submit the task and wait for the datasource to get loaded
+ SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(queryLocal);
+
+ if (sqlTaskStatus.getState().isFailure()) {
+ Assert.fail(StringUtils.format(
+ "Unable to start the task successfully.\nPossible exception: %s",
+ sqlTaskStatus.getError()
+ ));
+ }
+
+ msqHelper.pollTaskIdForCompletion(sqlTaskStatus.getTaskId());
+ dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+
+ msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
+ }
+}
diff --git a/integration-tests-ex/cases/src/test/resources/cluster/MultiStageQuery/docker.yaml b/integration-tests-ex/cases/src/test/resources/cluster/MultiStageQuery/docker.yaml
new file mode 100644
index 0000000000..8fab9c0992
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/cluster/MultiStageQuery/docker.yaml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#-------------------------------------------------------------------------
+
+# Definition of the multi stage query test cluster.
+# See https://yaml.org/spec/1.2.2 for more about YAML
+include:
+ - /cluster/Common/zk-metastore.yaml
+
+druid:
+ coordinator:
+ instances:
+ - port: 8081
+ overlord:
+ instances:
+ - port: 8090
+ broker:
+ instances:
+ - port: 8082
+ router:
+ instances:
+ - port: 8888
+ historical:
+ instances:
+ - port: 8083
+ indexer:
+ instances:
+ - port: 8091
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query1.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query1.json
new file mode 100644
index 0000000000..32c3d592d6
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query1.json
@@ -0,0 +1,31 @@
+[
+ {
+ "query": "SELECT __time, isRobot, added, delta, deleted, namespace FROM %%DATASOURCE%%",
+ "expectedResults": [
+ {
+ "__time": 1377910953000,
+ "isRobot": null,
+ "added": 57,
+ "delta": -143,
+ "deleted": 200,
+ "namespace": "article"
+ },
+ {
+ "__time": 1377919965000,
+ "isRobot": null,
+ "added": 459,
+ "delta": 330,
+ "deleted": 129,
+ "namespace": "wikipedia"
+ },
+ {
+ "__time": 1377933081000,
+ "isRobot": null,
+ "added": 123,
+ "delta": 111,
+ "deleted": 12,
+ "namespace":"article"
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 5a3166e273..bdcea9605f 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -239,6 +239,11 @@
<artifactId>simple-client-sslcontext</artifactId>
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.druid.extensions</groupId>
+ <artifactId>druid-multi-stage-query</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-services</artifactId>
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index e3f2a98dc6..bf0f549b19 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -61,7 +61,7 @@ public class OverlordResourceTestClient
private final String indexer;
@Inject
- OverlordResourceTestClient(
+ protected OverlordResourceTestClient(
ObjectMapper jsonMapper,
@TestClient HttpClient httpClient,
IntegrationTestingConfig config
@@ -72,7 +72,7 @@ public class OverlordResourceTestClient
this.indexer = config.getOverlordUrl();
}
- private String getIndexerURL()
+ protected String getIndexerURL()
{
return StringUtils.format(
"%s/druid/indexer/v1/",
@@ -720,7 +720,7 @@ public class OverlordResourceTestClient
}
}
- private StatusResponseHolder makeRequest(HttpMethod method, String url)
+ protected StatusResponseHolder makeRequest(HttpMethod method, String url)
{
try {
StatusResponseHolder response = this.httpClient
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java
new file mode 100644
index 0000000000..c9291c4532
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.clients.msq;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.OverlordResourceTestClient;
+import org.apache.druid.testing.guice.TestClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Overlord resource client for MSQ Tasks
+ */
+public class MsqOverlordResourceTestClient extends OverlordResourceTestClient
+{
+ private ObjectMapper jsonMapper;
+
+ @Inject
+ MsqOverlordResourceTestClient(
+ @Json ObjectMapper jsonMapper,
+ @TestClient HttpClient httpClient,
+ IntegrationTestingConfig config
+ )
+ {
+ super(jsonMapper, httpClient, config);
+ this.jsonMapper = jsonMapper;
+ this.jsonMapper.registerModules(new MSQIndexingModule().getJacksonModules());
+ }
+
+ public Map<String, MSQTaskReport> getMsqTaskReport(String taskId)
+ {
+ try {
+ StatusResponseHolder response = makeRequest(
+ HttpMethod.GET,
+ StringUtils.format(
+ "%s%s",
+ getIndexerURL(),
+ StringUtils.format("task/%s/reports", StringUtils.urlEncode(taskId))
+ )
+ );
+ return jsonMapper.readValue(response.getContent(), new TypeReference<Map<String, MSQTaskReport>>()
+ {
+ });
+ }
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java
index b0bef03408..cef449c782 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java
@@ -21,10 +21,13 @@ package org.apache.druid.testing.guice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
import org.apache.druid.curator.CuratorConfig;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.EscalatedClient;
@@ -41,6 +44,8 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.IntegrationTestingConfigProvider;
import org.apache.druid.testing.IntegrationTestingCuratorConfig;
+import java.util.Set;
+
/**
*/
public class DruidTestModule implements Module
@@ -59,6 +64,11 @@ public class DruidTestModule implements Module
binder.bind(DruidNode.class).annotatedWith(Self.class).toInstance(
new DruidNode("integration-tests", "localhost", false, 9191, null, null, true, false)
);
+
+ // Required for MSQIndexingModule
+ binder.bind(new TypeLiteral<Set<NodeRole>>()
+ {
+ }).annotatedWith(Self.class).toInstance(ImmutableSet.of(NodeRole.PEON));
}
@Provides
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java
index 8c7bcb0208..7f2773a898 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java
@@ -146,24 +146,28 @@ public abstract class AbstractTestQueryHelper<QueryResultType extends AbstractQu
for (QueryResultType queryWithResult : queries) {
LOG.info("Running Query %s", queryWithResult.getQuery());
List<Map<String, Object>> result = queryClient.query(url, queryWithResult.getQuery());
- if (!QueryResultVerifier.compareResults(result, queryWithResult.getExpectedResults(),
- queryWithResult.getFieldsToTest()
- )) {
+ QueryResultVerifier.ResultVerificationObject resultsComparison = QueryResultVerifier.compareResults(
+ result,
+ queryWithResult.getExpectedResults(),
+ queryWithResult.getFieldsToTest()
+ );
+ if (!resultsComparison.isSuccess()) {
LOG.error(
"Failed while executing query %s \n expectedResults: %s \n actualResults : %s",
queryWithResult.getQuery(),
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
jsonMapper.writeValueAsString(result)
);
- failed = true;
+ throw new ISE(
+ "Results mismatch while executing the query %s.\n"
+ + "Mismatch error: %s\n",
+ queryWithResult.getQuery(),
+ resultsComparison.getErrorMessage()
+ );
} else {
LOG.info("Results Verified for Query %s", queryWithResult.getQuery());
}
}
-
- if (failed) {
- throw new ISE("one or more queries failed");
- }
}
@SuppressWarnings("unchecked")
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqQueryWithResults.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqQueryWithResults.java
new file mode 100644
index 0000000000..7c246aeff5
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqQueryWithResults.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class MsqQueryWithResults extends AbstractQueryWithResults<String>
+{
+
+ @JsonCreator
+ public MsqQueryWithResults(
+ @JsonProperty("query") String query,
+ @JsonProperty("expectedResults") List<Map<String, Object>> expectedResults
+ )
+ {
+ super(query, expectedResults, Collections.emptyList());
+ }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
new file mode 100644
index 0000000000..424d070529
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+ private final ObjectMapper jsonMapper;
+ private final IntegrationTestingConfig config;
+ private final MsqOverlordResourceTestClient overlordClient;
+ private final SqlResourceTestClient msqClient;
+
+
+ @Inject
+ MsqTestQueryHelper(
+ final ObjectMapper jsonMapper,
+ final SqlResourceTestClient queryClient,
+ final IntegrationTestingConfig config,
+ final MsqOverlordResourceTestClient overlordClient,
+ final SqlResourceTestClient msqClient
+ )
+ {
+ super(jsonMapper, queryClient, config);
+ this.jsonMapper = jsonMapper;
+ this.config = config;
+ this.overlordClient = overlordClient;
+ this.msqClient = msqClient;
+ }
+
+ @Override
+ public String getQueryURL(String schemeAndHost)
+ {
+ return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+ }
+
+ /**
+ * Submits a task to the MSQ API with the given query string, and default headers and parameters
+ */
+ public SqlTaskStatus submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
+ {
+ return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
+ }
+
+ // Run the task, wait for it to complete, fetch the reports, verify the results,
+
+ /**
+ * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
+ * and returns the status associated with the submitted task
+ */
+ public SqlTaskStatus submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException
+ {
+ String queryUrl = getQueryURL(config.getBrokerUrl());
+ Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery);
+ // It is okay to block here for the result because MSQ tasks return the task id associated with it, which shouldn't
+ // consume a lot of time
+ StatusResponseHolder statusResponseHolder;
+ try {
+ statusResponseHolder = responseHolderFuture.get(5, TimeUnit.MINUTES);
+ }
+ catch (TimeoutException e) {
+ throw new ISE(e, "Unable to fetch the task id for the submitted task in time.");
+ }
+
+ // Check if the task has been accepted successfully
+ HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+ if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+ throw new ISE(
+ "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
+ httpResponseStatus,
+ statusResponseHolder.getContent()
+ );
+ }
+ String content = statusResponseHolder.getContent();
+ SqlTaskStatus sqlTaskStatus;
+ try {
+ sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+ }
+ catch (JsonProcessingException e) {
+ throw new ISE("Unable to parse the response");
+ }
+ return sqlTaskStatus;
+ }
+
+ /**
+ * The method retries till the task with taskId gets completed i.e. {@link TaskState#isComplete()}} returns true and
+ * returns the last fetched state {@link TaskState} of the task
+ */
+ public TaskState pollTaskIdForCompletion(String taskId) throws Exception
+ {
+ return RetryUtils.retry(
+ () -> {
+ TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId);
+ TaskState statusCode = taskStatusPlus.getStatusCode();
+ if (statusCode != null && statusCode.isComplete()) {
+ return taskStatusPlus.getStatusCode();
+ }
+ throw new TaskStillRunningException();
+ },
+ (Throwable t) -> t instanceof TaskStillRunningException,
+ 100
+ );
+ }
+
+ /**
+ * Fetches status reports for a given task
+ */
+ public Map<String, MSQTaskReport> fetchStatusReports(String taskId)
+ {
+ return overlordClient.getMsqTaskReport(taskId);
+ }
+
+ /**
+ * Compares the results for a given taskId. It is required that the task has produced some results that can be verified
+ */
+ private void compareResults(String taskId, MsqQueryWithResults expectedQueryWithResults)
+ {
+ Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId);
+ MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY);
+ if (taskReport == null) {
+ throw new ISE("Unable to fetch the status report for the task [%]", taskId);
+ }
+ MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
+ taskReport.getPayload(),
+ "payload"
+ );
+ MSQResultsReport resultsReport = Preconditions.checkNotNull(
+ taskReportPayload.getResults(),
+ "Results report for the task id is empty"
+ );
+
+ List<Map<String, Object>> actualResults = new ArrayList<>();
+
+ Yielder<Object[]> yielder = resultsReport.getResultYielder();
+ RowSignature rowSignature = resultsReport.getSignature();
+
+ while (!yielder.isDone()) {
+ Object[] row = yielder.get();
+ Map<String, Object> rowWithFieldNames = new LinkedHashMap<>();
+ for (int i = 0; i < row.length; ++i) {
+ rowWithFieldNames.put(rowSignature.getColumnName(i), row[i]);
+ }
+ actualResults.add(rowWithFieldNames);
+ yielder = yielder.next(null);
+ }
+
+ QueryResultVerifier.ResultVerificationObject resultsComparison = QueryResultVerifier.compareResults(
+ actualResults,
+ expectedQueryWithResults.getExpectedResults(),
+ Collections.emptyList()
+ );
+ if (!resultsComparison.isSuccess()) {
+ throw new IAE(
+ "Expected query result is different from the actual result.\n"
+ + "Query: %s\n"
+ + "Actual Result: %s\n"
+ + "Expected Result: %s\n"
+ + "Mismatch Error: %s\n",
+ expectedQueryWithResults.getQuery(),
+ actualResults,
+ expectedQueryWithResults.getExpectedResults(),
+ resultsComparison.getErrorMessage()
+ );
+ }
+ }
+
+ /**
+ * Runs queries from files using MSQ and compares the results with the ones provided
+ */
+ @Override
+ public void testQueriesFromFile(String filePath, String fullDatasourcePath) throws Exception
+ {
+ LOG.info("Starting query tests for [%s]", filePath);
+ List<MsqQueryWithResults> queries =
+ jsonMapper.readValue(
+ TestQueryHelper.class.getResourceAsStream(filePath),
+ new TypeReference<List<MsqQueryWithResults>>()
+ {
+ }
+ );
+ for (MsqQueryWithResults queryWithResults : queries) {
+ String queryString = queryWithResults.getQuery();
+ String queryWithDatasource = StringUtils.replace(queryString, "%%DATASOURCE%%", fullDatasourcePath);
+ SqlTaskStatus sqlTaskStatus = submitMsqTask(queryWithDatasource);
+ if (sqlTaskStatus.getState().isFailure()) {
+ throw new ISE(
+ "Unable to start the task successfully.\nPossible exception: %s",
+ sqlTaskStatus.getError()
+ );
+ }
+ String taskId = sqlTaskStatus.getTaskId();
+ pollTaskIdForCompletion(taskId);
+ compareResults(taskId, queryWithResults);
+ }
+ }
+
+ private static class TaskStillRunningException extends Exception
+ {
+
+ }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java
index 2e25f11792..7d8be55bf7 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java
@@ -19,13 +19,21 @@
package org.apache.druid.testing.utils;
+import org.apache.druid.java.util.common.StringUtils;
+
+import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class QueryResultVerifier
{
- public static boolean compareResults(
+ /**
+ * Tests the actual vs the expected results for equality and returns a {@link ResultVerificationObject} containing the
+ * result of the check. If fieldsToTest is not null and non empty, only the supplied fields would be tested for
+ * equality. Else, the whole row is compared
+ */
+ public static ResultVerificationObject compareResults(
Iterable<Map<String, Object>> actual,
Iterable<Map<String, Object>> expected,
List<String> fieldsToTest
@@ -34,6 +42,7 @@ public class QueryResultVerifier
Iterator<Map<String, Object>> actualIter = actual.iterator();
Iterator<Map<String, Object>> expectedIter = expected.iterator();
+ int rowNumber = 1;
while (actualIter.hasNext() && expectedIter.hasNext()) {
Map<String, Object> actualRes = actualIter.next();
Map<String, Object> expRes = expectedIter.next();
@@ -41,19 +50,60 @@ public class QueryResultVerifier
if (fieldsToTest != null && !fieldsToTest.isEmpty()) {
for (String field : fieldsToTest) {
if (!actualRes.get(field).equals(expRes.get(field))) {
- return false;
+ String mismatchMessage = StringUtils.format(
+ "Mismatch in row no. [%d], column [%s]. Expected: [%s], Actual: [%s]",
+ rowNumber,
+ field,
+ expRes,
+ actualRes
+ );
+ return new ResultVerificationObject(mismatchMessage);
}
}
} else {
if (!actualRes.equals(expRes)) {
- return false;
+ String mismatchMessage = StringUtils.format(
+ "Mismatch in row no. [%d]. Expected: [%s], Actual: [%s]",
+ rowNumber,
+ expRes,
+ actualRes
+ );
+ return new ResultVerificationObject(mismatchMessage);
}
}
+ ++rowNumber;
}
if (actualIter.hasNext() || expectedIter.hasNext()) {
- return false;
+ String mismatchMessage =
+ StringUtils.format(
+ "Results size mismatch. The actual result contain %s rows than the expected result.",
+ actualIter.hasNext() ? "more" : "less"
+ );
+ return new ResultVerificationObject(mismatchMessage);
+ }
+ return new ResultVerificationObject(null);
+ }
+
+ public static class ResultVerificationObject
+ {
+ @Nullable
+ private final String errorMessage;
+
+ ResultVerificationObject(@Nullable final String errorMessage)
+ {
+ this.errorMessage = errorMessage;
+ }
+
+ public boolean isSuccess()
+ {
+ return getErrorMessage() == null;
+ }
+
+ @Nullable
+ public String getErrorMessage()
+ {
+ return errorMessage;
}
- return true;
}
}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
index 7d8528b05c..999ebc8c68 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
@@ -164,7 +164,7 @@ public class ITQueryRetryTestOnMissingSegments
result,
queryWithResult.getExpectedResults(),
queryWithResult.getFieldsToTest()
- )) {
+ ).isSuccess()) {
if (expectation != Expectation.INCORRECT_RESULT) {
throw new ISE(
"Incorrect query results for query %s \n expectedResults: %s \n actualResults : %s",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org