You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/09/16 17:58:23 UTC

[GitHub] [druid] paul-rogers commented on a diff in pull request #12992: Add IT for MSQ task engine using the new IT framework

paul-rogers commented on code in PR #12992:
URL: https://github.com/apache/druid/pull/12992#discussion_r973248371


##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();

Review Comment:
   More to the point: this is a test. There is nothing for the test to do while the query runs. So, this code should block so the test doesn't have to include the required boilerplate. There should, however, be a timeout in case bad things happen.



##########
integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.clients.msq;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.OverlordResourceTestClient;
+import org.apache.druid.testing.guice.TestClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Overlord resource client for MSQ Tasks
+ */
+public class MsqOverlordResourceTestClient extends OverlordResourceTestClient
+{
+  ObjectMapper jsonMapper;
+
+  @Inject
+  MsqOverlordResourceTestClient(
+      @Json ObjectMapper jsonMapper,
+      @TestClient HttpClient httpClient,
+      IntegrationTestingConfig config
+  )
+  {
+    super(jsonMapper, httpClient, config);
+    this.jsonMapper = jsonMapper;
+    this.jsonMapper.registerModules(new MSQIndexingModule().getJacksonModules());
+  }
+
+  public Map<String, MSQTaskReport> getTaskReportForMsqTask(String taskId)
+  {
+    try {
+      StatusResponseHolder response = makeRequest(
+          HttpMethod.GET,
+          StringUtils.format(
+              "%s%s",
+              getIndexerURL(),
+              StringUtils.format("task/%s/reports", StringUtils.urlEncode(taskId))
+          )
+      );
+      return jsonMapper.readValue(response.getContent(), new TypeReference<Map<String, MSQTaskReport>>()
+      {
+      });
+    }
+    catch (ISE e) {

Review Comment:
   This can be `RuntimeException`: no need to wrap other runtime exceptions.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {
+      throw new ISE("Unable to start the task successfully.\nPossible exception: %s", sqlTaskStatus.getError());
+    }
+    return sqlTaskStatus.getTaskId();
+  }
+
+  /**
+   * The method retries till the task with taskId gets completed i.e. {@link TaskState#isComplete()}} returns true and
+   * returns the last fetched state {@link TaskState} of the task
+   */
+  public TaskState pollTaskIdForCompletion(String taskId) throws Exception
+  {
+    return RetryUtils.retry(

Review Comment:
   Would recommend against using this utility: it masks true errors. If the server is down, it will retry 240 times.
   
   Instead, do your own polling. A message timeout is an error here: we don't expect the server to be down. Any response other than the ones we expect is an error and need not be retried. Only the "not yet ready" response should be retried.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {
+      throw new ISE("Unable to start the task successfully.\nPossible exception: %s", sqlTaskStatus.getError());
+    }
+    return sqlTaskStatus.getTaskId();
+  }
+
+  /**
+   * The method retries till the task with taskId gets completed i.e. {@link TaskState#isComplete()}} returns true and
+   * returns the last fetched state {@link TaskState} of the task
+   */
+  public TaskState pollTaskIdForCompletion(String taskId) throws Exception
+  {
+    return RetryUtils.retry(
+        () -> {
+          TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId);
+          TaskState statusCode = taskStatusPlus.getStatusCode();
+          if (statusCode != null && statusCode.isComplete()) {
+            return taskStatusPlus.getStatusCode();
+          }
+          throw new TaskStillRunningException();
+        },
+        (Throwable t) -> t instanceof TaskStillRunningException,
+        100
+    );
+  }
+
+  /**
+   * Fetches status reports for a given task
+   */
+  public Map<String, MSQTaskReport> fetchStatusReports(String taskId)
+  {
+    return overlordClient.getTaskReportForMsqTask(taskId);
+  }
+
+  /**
+   * Compares the results for a given taskId. It is required that the task has produced some results that can be verified
+   */
+  private void compareResults(String taskId, MsqQueryWithResults expectedQueryWithResults)
+  {
+    Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId);
+    MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY);
+    if (taskReport == null) {
+      throw new ISE("Unable to fetch the status report for the task [%]", taskId);
+    }
+    MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
+        taskReport.getPayload(),
+        "payload"
+    );
+    MSQResultsReport resultsReport = Preconditions.checkNotNull(
+        taskReportPayload.getResults(),
+        "Results report for the task id is empty"
+    );
+
+    List<Map<String, Object>> actualResults = new ArrayList<>();
+
+    Yielder<Object[]> yielder = resultsReport.getResultYielder();
+    RowSignature rowSignature = resultsReport.getSignature();
+
+    while (!yielder.isDone()) {
+      Object[] row = yielder.get();
+      Map<String, Object> rowWithFieldNames = new LinkedHashMap<>();
+      for (int i = 0; i < row.length; ++i) {
+        rowWithFieldNames.put(rowSignature.getColumnName(i), row[i]);
+      }
+      actualResults.add(rowWithFieldNames);
+      yielder = yielder.next(null);
+    }
+
+    Optional<String> resultsComparison = QueryResultVerifier.compareResults(
+        actualResults,
+        expectedQueryWithResults.getExpectedResults(),
+        Collections.emptyList()
+    );
+    if (resultsComparison.isPresent()) {
+      throw new IAE(
+          "Expected query result is different from the actual result.\n"
+          + "Query: %s\n"
+          + "Actual Result: %s\n"
+          + "Expected Result: %s\n"
+          + "Mismatch Error: %s\n",
+          expectedQueryWithResults.getQuery(),
+          actualResults,
+          expectedQueryWithResults.getExpectedResults(),
+          resultsComparison.get()
+      );
+    }
+  }
+
+  /**
+   * Runs queries from files using MSQ and compares the results with the ones provided
+   */
+  @Override
+  public void testQueriesFromFile(String filePath, String fullDatasourcePath) throws Exception
+  {
+    LOG.info("Starting query tests for [%s]", filePath);

Review Comment:
   I believe we'll find that the file format here will be quite hard to maintain. Looks like it is JSON, which contains SQL, which contains JSON. While JSON is handy, this is a place where it does not serve us well.
   
   There is an open PR that proposes an "Impala-like" file format for query tests:
   
   ```text
   === sql
   SELECT ...
   === results
   {"a": 10, "b": "foo"}
   ...
   ```
   
   It is too much to ask to include that format here: the test PR is not yet merged. However, moving forward, perhaps we can retrofit that format here to make the query files a bit easier to use.
   
   In fact, we could use the entire framework: it already has mechanisms to compare expected and actual results, to loop through test cases, to run queries with differing context options, etc.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java:
##########
@@ -19,13 +19,19 @@
 
 package org.apache.druid.testing.utils;
 
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class QueryResultVerifier
 {
-  public static boolean compareResults(
+  private static final Logger LOG = new Logger(QueryResultVerifier.class);
+
+  public static Optional<String> compareResults(

Review Comment:
   The approach taken in the planner test framework is to define a "test case" that has the query and expected results. Run that to get the actual results. Compare the two. The idea is to focus on the test and make the test code as simple as possible. Here:
   
   ```java
   testCase = loadTestCase(...);
   result = testCase.run(...);
   ```
   
   Where `run()` would verify the results and would fail if the results don't match. (Results could be an expected error, or expected rows.)
   



##########
integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.clients.msq;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.OverlordResourceTestClient;
+import org.apache.druid.testing.guice.TestClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Overlord resource client for MSQ Tasks
+ */
+public class MsqOverlordResourceTestClient extends OverlordResourceTestClient
+{
+  ObjectMapper jsonMapper;
+
+  @Inject
+  MsqOverlordResourceTestClient(
+      @Json ObjectMapper jsonMapper,
+      @TestClient HttpClient httpClient,
+      IntegrationTestingConfig config
+  )
+  {
+    super(jsonMapper, httpClient, config);
+    this.jsonMapper = jsonMapper;
+    this.jsonMapper.registerModules(new MSQIndexingModule().getJacksonModules());
+  }
+
+  public Map<String, MSQTaskReport> getTaskReportForMsqTask(String taskId)
+  {
+    try {
+      StatusResponseHolder response = makeRequest(
+          HttpMethod.GET,
+          StringUtils.format(
+              "%s%s",
+              getIndexerURL(),
+              StringUtils.format("task/%s/reports", StringUtils.urlEncode(taskId))
+          )
+      );

Review Comment:
   It is a real nuisance that, with the dozens of places that the "format JSON message, send it, and convert the response" code is used, we don't have a reusable class. In one of the branches I started to create such a thing; let me see if I can find it and perhaps we can use it here to avoid redundancy.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {

Review Comment:
   Perhaps return the response, error or not. This lets us write tests that verify that something fails when it should. For example, if we want to verify permissions or that the engine rejects certain invalid syntax, etc.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java:
##########
@@ -146,24 +147,29 @@ private void testQueries(String url, List<QueryResultType> queries) throws Excep
     for (QueryResultType queryWithResult : queries) {
       LOG.info("Running Query %s", queryWithResult.getQuery());
       List<Map<String, Object>> result = queryClient.query(url, queryWithResult.getQuery());
-      if (!QueryResultVerifier.compareResults(result, queryWithResult.getExpectedResults(),
-                                              queryWithResult.getFieldsToTest()
-      )) {
+      Optional<String> resultsComparison = QueryResultVerifier.compareResults(result, queryWithResult.getExpectedResults(),
+                                                                                             queryWithResult.getFieldsToTest());
+      if (resultsComparison.isPresent()) {
         LOG.error(
             "Failed while executing query %s \n expectedResults: %s \n actualResults : %s",
             queryWithResult.getQuery(),
             jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
             jsonMapper.writeValueAsString(result)
         );
-        failed = true;
+        Assert.fail(StringUtils.format(
+            "Results mismatch while executing the query %s.\n"
+            + "Expected results: %s\n"
+            + "Actual results: %s\n"
+            + "Mismatch error: %s\n",
+            queryWithResult.getQuery(),
+            jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
+            jsonMapper.writeValueAsString(result),
+            resultsComparison.get()
+        ));

Review Comment:
   The above will be very hard to debug: the JSON payloads are large. This is already a problem in SQL tests when we compare queries: if they don't match, we're left with two big JSON blobs and it is tedious to determine where they differ.
   
   Results are a set of rows. So, it would be great to report, say, the number of the first row that differs, and display just those values. Then, display the actual results in the same format that they will appear in the "expected results" file. This will allow us to develop a new test by starting with empty results, then copy the failed test results into the file, then watch the test pass.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {
+      throw new ISE("Unable to start the task successfully.\nPossible exception: %s", sqlTaskStatus.getError());
+    }
+    return sqlTaskStatus.getTaskId();
+  }
+
+  /**
+   * The method retries till the task with taskId gets completed i.e. {@link TaskState#isComplete()}} returns true and
+   * returns the last fetched state {@link TaskState} of the task
+   */
+  public TaskState pollTaskIdForCompletion(String taskId) throws Exception
+  {
+    return RetryUtils.retry(
+        () -> {
+          TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId);
+          TaskState statusCode = taskStatusPlus.getStatusCode();
+          if (statusCode != null && statusCode.isComplete()) {
+            return taskStatusPlus.getStatusCode();
+          }
+          throw new TaskStillRunningException();
+        },
+        (Throwable t) -> t instanceof TaskStillRunningException,
+        100
+    );
+  }
+
+  /**
+   * Fetches status reports for a given task
+   */
+  public Map<String, MSQTaskReport> fetchStatusReports(String taskId)
+  {
+    return overlordClient.getTaskReportForMsqTask(taskId);
+  }
+
+  /**
+   * Compares the results for a given taskId. It is required that the task has produced some results that can be verified
+   */
+  private void compareResults(String taskId, MsqQueryWithResults expectedQueryWithResults)
+  {
+    Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId);
+    MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY);
+    if (taskReport == null) {
+      throw new ISE("Unable to fetch the status report for the task [%]", taskId);
+    }
+    MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
+        taskReport.getPayload(),
+        "payload"
+    );
+    MSQResultsReport resultsReport = Preconditions.checkNotNull(
+        taskReportPayload.getResults(),
+        "Results report for the task id is empty"
+    );
+
+    List<Map<String, Object>> actualResults = new ArrayList<>();
+
+    Yielder<Object[]> yielder = resultsReport.getResultYielder();
+    RowSignature rowSignature = resultsReport.getSignature();
+
+    while (!yielder.isDone()) {
+      Object[] row = yielder.get();
+      Map<String, Object> rowWithFieldNames = new LinkedHashMap<>();
+      for (int i = 0; i < row.length; ++i) {
+        rowWithFieldNames.put(rowSignature.getColumnName(i), row[i]);
+      }
+      actualResults.add(rowWithFieldNames);
+      yielder = yielder.next(null);
+    }
+
+    Optional<String> resultsComparison = QueryResultVerifier.compareResults(
+        actualResults,
+        expectedQueryWithResults.getExpectedResults(),
+        Collections.emptyList()
+    );
+    if (resultsComparison.isPresent()) {
+      throw new IAE(
+          "Expected query result is different from the actual result.\n"
+          + "Query: %s\n"
+          + "Actual Result: %s\n"
+          + "Expected Result: %s\n"
+          + "Mismatch Error: %s\n",

Review Comment:
   See not above about format.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {
+      throw new ISE("Unable to start the task successfully.\nPossible exception: %s", sqlTaskStatus.getError());
+    }
+    return sqlTaskStatus.getTaskId();
+  }
+
+  /**
+   * The method retries till the task with taskId gets completed i.e. {@link TaskState#isComplete()}} returns true and
+   * returns the last fetched state {@link TaskState} of the task
+   */
+  public TaskState pollTaskIdForCompletion(String taskId) throws Exception
+  {
+    return RetryUtils.retry(
+        () -> {
+          TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId);
+          TaskState statusCode = taskStatusPlus.getStatusCode();
+          if (statusCode != null && statusCode.isComplete()) {
+            return taskStatusPlus.getStatusCode();
+          }
+          throw new TaskStillRunningException();
+        },
+        (Throwable t) -> t instanceof TaskStillRunningException,
+        100
+    );
+  }
+
+  /**
+   * Fetches status reports for a given task
+   */
+  public Map<String, MSQTaskReport> fetchStatusReports(String taskId)

Review Comment:
   There is only ever one report for MSQ, correct? I've never seen more than one.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {
+      throw new ISE("Unable to start the task successfully.\nPossible exception: %s", sqlTaskStatus.getError());
+    }
+    return sqlTaskStatus.getTaskId();
+  }
+
+  /**
+   * The method retries till the task with taskId gets completed i.e. {@link TaskState#isComplete()}} returns true and
+   * returns the last fetched state {@link TaskState} of the task
+   */
+  public TaskState pollTaskIdForCompletion(String taskId) throws Exception
+  {
+    return RetryUtils.retry(
+        () -> {
+          TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId);
+          TaskState statusCode = taskStatusPlus.getStatusCode();
+          if (statusCode != null && statusCode.isComplete()) {
+            return taskStatusPlus.getStatusCode();
+          }
+          throw new TaskStillRunningException();
+        },
+        (Throwable t) -> t instanceof TaskStillRunningException,
+        100
+    );
+  }
+
+  /**
+   * Fetches status reports for a given task
+   */
+  public Map<String, MSQTaskReport> fetchStatusReports(String taskId)

Review Comment:
   This would be a handy place to have a `MsqResult` object that holds the reports and pulls out the relevant bits as needed.



##########
integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query1.json:
##########
@@ -0,0 +1,31 @@
+[

Review Comment:
   The above file is empty. Intended?



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java:
##########
@@ -19,13 +19,19 @@
 
 package org.apache.druid.testing.utils;
 
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class QueryResultVerifier
 {
-  public static boolean compareResults(
+  private static final Logger LOG = new Logger(QueryResultVerifier.class);
+
+  public static Optional<String> compareResults(

Review Comment:
   Since this is used in a test, we won't do anything with the results. The call will be either:
   
   ```java
   Assert.assertTrue(QueryResultVerifier.compareResults(...));
   ```
   
   Or just:
   
   ```java
   QueryResultVerifier.compareResults(...);
   ```
   
   Given the fact that this is a test, perhaps rename the method:
   
   ```java
   results = // run the query
   QueryResultVerifier.assertEqual(...);
   ```
   



##########
integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.clients.msq;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.OverlordResourceTestClient;
+import org.apache.druid.testing.guice.TestClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Overlord resource client for MSQ Tasks
+ */
+public class MsqOverlordResourceTestClient extends OverlordResourceTestClient

Review Comment:
   I wonder if these should simply be merged into the Overlord client. Or, be add-ons that take the overlord client as a parameter. Otherwise, we may find ourselves in need of Overlord Client X and Overlord Client Y, both as subclasses, and thus need two client when one would do.



##########
integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.clients.msq;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.OverlordResourceTestClient;
+import org.apache.druid.testing.guice.TestClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Overlord resource client for MSQ Tasks
+ */
+public class MsqOverlordResourceTestClient extends OverlordResourceTestClient
+{
+  ObjectMapper jsonMapper;
+
+  @Inject
+  MsqOverlordResourceTestClient(
+      @Json ObjectMapper jsonMapper,
+      @TestClient HttpClient httpClient,
+      IntegrationTestingConfig config
+  )
+  {
+    super(jsonMapper, httpClient, config);
+    this.jsonMapper = jsonMapper;
+    this.jsonMapper.registerModules(new MSQIndexingModule().getJacksonModules());
+  }
+
+  public Map<String, MSQTaskReport> getTaskReportForMsqTask(String taskId)

Review Comment:
   At present, MSQ is somewhat awkward when running a SELECT: one has to paw through the reports to get the results. One can safely predict that this will change later to have some direct way to get results. So, let's not bake the current report-based results too deeply into the tests.
   
   To anticipate this, perhaps the code here should have a method to return just results. Or, a `MsqResponse` object that holds reports and lets us pull out the error (if any) and the results. The results would ideally include schema and data (pulled from the various places in the reports.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org