You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/09/14 06:30:30 UTC

[GitHub] [druid] cryptoe commented on a diff in pull request #12992: Add IT for MSQ task engine using the new IT framework

cryptoe commented on code in PR #12992:
URL: https://github.com/apache/druid/pull/12992#discussion_r970367807


##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.MsqOverlordResourceTestClient;
+import org.apache.druid.testing.clients.MsqTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ
+ */
+public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final MsqTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final MsqTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final MsqTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {
+      throw new ISE("Unable to start the task successfully.\nPossible exception: %s", sqlTaskStatus.getError());
+    }
+    return sqlTaskStatus.getTaskId();
+  }
+
+  /**
+   * Polls the overlord API every 1 second and waits for a submitted MSQ task to be completed. Alternatively, one can
+   * specify the maximum time to poll. The method returns the last fetched {@link TaskState} of the task
+   */
+  public TaskState pollTaskIdForCompletion(String taskId, long maxTimeoutSeconds)
+  {
+    if (maxTimeoutSeconds < 0) {
+      throw new IAE("Timeout cannot be negative");
+    } else if (maxTimeoutSeconds == 0) {
+      maxTimeoutSeconds = Long.MAX_VALUE;
+    }
+    long time = 0;
+    do {
+      TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId);
+      TaskState statusCode = taskStatusPlus.getStatusCode();
+      if (statusCode != null && statusCode.isComplete()) {
+        return taskStatusPlus.getStatusCode();
+      }
+      try {
+        Thread.sleep(1000);
+      }
+      catch (InterruptedException e) {
+        throw new ISE(e, "Interrupted while polling for task [%s] completion", taskId);
+      }
+    } while (time++ < maxTimeoutSeconds);
+    return overlordClient.getTaskStatus(taskId).getStatusCode();
+  }
+
+  /**
+   * Fetches status reports for a given task
+   */
+  public Map<String, MSQTaskReport> fetchStatusReports(String taskId)
+  {
+    return overlordClient.getTaskReportForMsqTask(taskId);
+  }
+
+  /**
+   * Compares the results for a given taskId. It is required that the task has produced some results that can be verified
+   */
+  public void compareResults(String taskId, MsqQueryWithResults expectedQueryWithResults)
+  {
+    Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId);
+    MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY);
+    if (taskReport == null) {
+      throw new ISE("Unable to fetch the status report for the task [%]", taskId);
+    }
+    MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
+        taskReport.getPayload(),
+        "payload"
+    );
+    MSQResultsReport resultsReport = Preconditions.checkNotNull(
+        taskReportPayload.getResults(),
+        "Results report for the task id is empty"
+    );
+
+    List<Map<String, Object>> actualResults = new ArrayList<>();
+
+    Yielder<Object[]> yielder = resultsReport.getResultYielder();
+    RowSignature rowSignature = resultsReport.getSignature();
+
+    while (!yielder.isDone()) {
+      Object[] row = yielder.get();
+      Map<String, Object> rowWithFieldNames = new LinkedHashMap<>();
+      for (int i = 0; i < row.length; ++i) {
+        rowWithFieldNames.put(rowSignature.getColumnName(i), row[i]);
+      }
+      actualResults.add(rowWithFieldNames);
+      yielder = yielder.next(null);
+    }
+
+    boolean resultsComparison = QueryResultVerifier.compareResults(
+        actualResults,
+        expectedQueryWithResults.getExpectedResults(),
+        Collections.emptyList()
+    );
+    if (!resultsComparison) {
+      throw new IAE("Expected query result is different from the actual result");

Review Comment:
   I second Kashif's opinion. The signature of the resultVerifier can be "Optional<StringErrorMessage>" and then the caller can do whatever it wants. 
   Though one version in the results verifier should always throw an exception. 
   
   void resultsEqualsOrThrow()
   
   Something like that. 



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the client APIs for MSQ and performs the boiler
+ * plate functions for the tests
+ */
+public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {
+      throw new ISE("Unable to start the task successfully.\nPossible exception: %s", sqlTaskStatus.getError());
+    }
+    return sqlTaskStatus.getTaskId();
+  }
+
+  /**
+   * Polls the overlord API every 1 second and waits for a submitted MSQ task to be completed. Alternatively, one can
+   * specify the maximum time to poll. The method returns the last fetched {@link TaskState} of the task

Review Comment:
   Nit: Javadocs might need to update here. 



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java:
##########
@@ -41,17 +46,29 @@ public static boolean compareResults(
       if (fieldsToTest != null && !fieldsToTest.isEmpty()) {
         for (String field : fieldsToTest) {
           if (!actualRes.get(field).equals(expRes.get(field))) {
+            LOG.info(StringUtils.format(
+                "Field [%s] mismatch. Expected: [%s], Actual: [%s]",
+                field,
+                expRes,
+                actualRes
+            ));
             return false;
           }
         }
       } else {
         if (!actualRes.equals(expRes)) {
+          LOG.info(StringUtils.format(
+              "Row mismatch. Expected: [%s], Actual: [%s]",
+              expRes,
+              actualRes
+          ));
           return false;
         }
       }
     }
 
     if (actualIter.hasNext() || expectedIter.hasNext()) {
+      LOG.info("Results size mismatch");

Review Comment:
   Please mention if there are fewer results or more results if possible. 



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java:
##########
@@ -19,12 +19,17 @@
 
 package org.apache.druid.testing.utils;
 
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 public class QueryResultVerifier
 {
+  private static final Logger LOG = new Logger(QueryResultVerifier.class);
+
   public static boolean compareResults(

Review Comment:
   I think this method should throw an exception when there is a mismatch else every caller would have to do that handling. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org