You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/04/13 12:21:00 UTC

[jira] [Work logged] (BEAM-11712) Run TPC-DS with BeamSQL and Spark runner

     [ https://issues.apache.org/jira/browse/BEAM-11712?focusedWorklogId=581721&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-581721 ]

ASF GitHub Bot logged work on BEAM-11712:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Apr/21 12:20
            Start Date: 13/Apr/21 12:20
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on a change in pull request #14373:
URL: https://github.com/apache/beam/pull/14373#discussion_r611011688



##########
File path: sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
##########
@@ -34,155 +43,196 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class executes jobs using BeamSqlEnv, it uses BeamSqlEnv.executeDdl and BeamSqlEnv.parseQuery to run queries.
+ * This class executes jobs using BeamSqlEnv, it uses BeamSqlEnv.executeDdl and
+ * BeamSqlEnv.parseQuery to run queries.
  */
 public class BeamSqlEnvRunner {
-    private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
-    private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
-    private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
-    private static final List<String> SUMMARY_HEADERS_LIST = Arrays.asList("Query Name", "Job Name", "Data Size", "Dialect", "Status", "Start Time", "End Time", "Elapsed Time(sec)");
-
-    private static final Logger Log = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
-
-    private static String buildTableCreateStatement(String tableName) {
-        String createStatement = "CREATE EXTERNAL TABLE " + tableName + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
-        return createStatement;
+  private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
+  private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
+  private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
+  private static final List<String> SUMMARY_HEADERS_LIST =
+      Arrays.asList(
+          "Query Name",
+          "Job Name",
+          "Data Size",
+          "Dialect",
+          "Status",
+          "Start Time",
+          "End Time",
+          "Elapsed Time(sec)");
+
+  private static final Logger LOG = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
+
+  private static String buildTableCreateStatement(String tableName) {
+    return "CREATE EXTERNAL TABLE "
+        + tableName
+        + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
+  }
+
+  private static String buildDataLocation(String dataSize, String tableName) {
+    return DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
+  }
+
+  /**
+   * Register all tables into BeamSqlEnv, set their schemas, and set the locations where their
+   * corresponding data are stored. Currently this method is not supported by ZetaSQL planner.
+   */
+  private static void registerAllTablesByBeamSqlEnv(BeamSqlEnv env, String dataSize)
+      throws Exception {
+    List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
+    for (String tableName : tableNames) {
+      String createStatement = buildTableCreateStatement(tableName);
+      String tableSchema = TableSchemaJSONLoader.parseTableSchema(tableName);
+      String dataLocation = buildDataLocation(dataSize, tableName);
+      env.executeDdl(String.format(createStatement, tableSchema, dataLocation));
     }
-
-    private static String buildDataLocation(String dataSize, String tableName) {
-        String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-        return dataLocation;
+  }
+
+  /**
+   * Register all tables into InMemoryMetaStore, set their schemas, and set the locations where
+   * their corresponding data are stored.
+   */
+  private static void registerAllTablesByInMemoryMetaStore(
+      InMemoryMetaStore inMemoryMetaStore, String dataSize) throws Exception {
+    JSONObject properties = new JSONObject();
+    properties.put("csvformat", "InformixUnload");
+
+    Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
+    for (Map.Entry<String, Schema> entry : schemaMap.entrySet()) {
+      String tableName = entry.getKey();
+      String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
+      Schema tableSchema = schemaMap.get(tableName);
+      checkArgumentNotNull(tableSchema, "Table schema can't be null for table: " + tableName);
+      Table table =
+          Table.builder()
+              .name(tableName)
+              .schema(tableSchema)
+              .location(dataLocation)
+              .properties(properties)
+              .type("text")
+              .build();
+      inMemoryMetaStore.createTable(table);
     }
-
-    /** Register all tables into BeamSqlEnv, set their schemas, and set the locations where their corresponding data are stored.
-     *  Currently this method is not supported by ZetaSQL planner. */
-    private static void registerAllTablesByBeamSqlEnv(BeamSqlEnv env, String dataSize) throws Exception {
-        List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
-        for (String tableName : tableNames) {
-            String createStatement = buildTableCreateStatement(tableName);
-            String tableSchema = TableSchemaJSONLoader.parseTableSchema(tableName);
-            String dataLocation = buildDataLocation(dataSize, tableName);
-            env.executeDdl(String.format(createStatement, tableSchema, dataLocation));
-        }
+  }
+
+  /**
+   * Print the summary table after all jobs are finished.
+   *
+   * @param completion A collection of all TpcdsRunResult that are from finished jobs.
+   * @param numOfResults The number of results in the collection.
+   * @throws Exception
+   */
+  private static void printExecutionSummary(
+      CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
+    List<List<String>> summaryRowsList = new ArrayList<>();
+    for (int i = 0; i < numOfResults; i++) {
+      TpcdsRunResult tpcdsRunResult = completion.take().get();
+      List<String> list = new ArrayList<>();
+      list.add(tpcdsRunResult.getQueryName());
+      list.add(tpcdsRunResult.getJobName());
+      list.add(tpcdsRunResult.getDataSize());
+      list.add(tpcdsRunResult.getDialect());
+      list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
+      list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
+      list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString() : "");
+      list.add(
+          tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
+      summaryRowsList.add(list);
     }
 
-    /** Register all tables into InMemoryMetaStore, set their schemas, and set the locations where their corresponding data are stored. */
-    private static void registerAllTablesByInMemoryMetaStore(InMemoryMetaStore inMemoryMetaStore, String dataSize) throws Exception {
-        JSONObject properties = new JSONObject();
-        properties.put("csvformat", "InformixUnload");
-
-        Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
-        for (String tableName : schemaMap.keySet()) {
-            String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-            Schema tableSchema = schemaMap.get(tableName);
-            Table table = Table.builder().name(tableName).schema(tableSchema).location(dataLocation).properties(properties).type("text").build();
-            inMemoryMetaStore.createTable(table);
-        }
+    System.out.println(SUMMARY_START);
+    System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+  }
+
+  /**
+   * This is the alternative method in BeamTpcds.main method. Run job using BeamSqlEnv.parseQuery()
+   * method. (Doesn't perform well when running query96).
+   *
+   * @param args Command line arguments
+   * @throws Exception
+   */
+  public static void runUsingBeamSqlEnv(String[] args) throws Exception {
+    InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
+    inMemoryMetaStore.registerProvider(new TextTableProvider());
+
+    TpcdsOptions tpcdsOptions =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+
+    String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+    String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
+    int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
+
+    // Using ExecutorService and CompletionService to fulfill multi-threading functionality
+    ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+    CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
+
+    // Directly create all tables and register them into inMemoryMetaStore before creating
+    // BeamSqlEnv object.
+    registerAllTablesByInMemoryMetaStore(inMemoryMetaStore, dataSize);
+
+    BeamSqlPipelineOptions beamSqlPipelineOptions = tpcdsOptions.as(BeamSqlPipelineOptions.class);
+    BeamSqlEnv env =
+        BeamSqlEnv.builder(inMemoryMetaStore)
+            .setPipelineOptions(beamSqlPipelineOptions)
+            .setQueryPlannerClassName(beamSqlPipelineOptions.getPlannerName())
+            .build();
+
+    // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
+    Pipeline[] pipelines = new Pipeline[queryNameArr.length];
+
+    // Execute all queries, transform the each result into a PCollection<String>, write them into
+    // the txt file and store in a GCP directory.
+    for (int i = 0; i < queryNameArr.length; i++) {
+      // For each query, get a copy of pipelineOptions from command line arguments, cast
+      // tpcdsOptions as a DataflowPipelineOptions object to read and set required parameters for
+      // pipeline execution.
+      TpcdsOptions tpcdsOptionsCopy =
+          PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+      DataflowPipelineOptions dataflowPipelineOptionsCopy =

Review comment:
       This extra copy is unneeded because its only use is to `setJobName` which is generic and part of `PipelineOptions` so we can do it instead in the TpcdsOptions copy. I even wonder if we really need that copy I think we can just reuse the existing `tpcdsOptions` and do `setJobName` for each case. The original value is always preserved because we have it in `args.
   

##########
File path: sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
##########
@@ -22,86 +22,88 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-/**
- * Get and check the TpcdsOptions' parameters, throw exceptions when user input is invalid
- */
+/** Get and check the TpcdsOptions' parameters, throw exceptions when user input is invalid. */
 public class TpcdsParametersReader {
-    /** The data sizes that have been supported. */
-    private static final Set<String> supportedDataSizes = Stream.of("1G", "10G", "100G").collect(Collectors.toCollection(HashSet::new));
-
-    /**
-     * Get and check dataSize entered by user. This dataSize has to have been supported.
-     *
-     * @param tpcdsOptions TpcdsOptions object constructed from user input
-     * @return The dateSize user entered, if it is contained in supportedDataSizes set.
-     * @throws Exception
-     */
-    public static String getAndCheckDataSize(TpcdsOptions tpcdsOptions) throws Exception {
-        String dataSize = tpcdsOptions.getDataSize();
+  /** The data sizes that have been supported. */
+  private static final Set<String> supportedDataSizes =
+      Stream.of("1G", "10G", "100G").collect(Collectors.toCollection(HashSet::new));
 
-        if (!supportedDataSizes.contains(dataSize)) {
-            throw new Exception("The data size you entered has not been supported.");
-        }
+  /**
+   * Get and check dataSize entered by user. This dataSize has to have been supported.
+   *
+   * @param tpcdsOptions TpcdsOptions object constructed from user input
+   * @return The dateSize user entered, if it is contained in supportedDataSizes set.
+   * @throws Exception
+   */
+  public static String getAndCheckDataSize(TpcdsOptions tpcdsOptions) throws Exception {
+    String dataSize = tpcdsOptions.getDataSize();
 
-        return dataSize;
+    if (!supportedDataSizes.contains(dataSize)) {
+      throw new Exception("The data size you entered has not been supported.");
     }
 
-    /**
-     * Get and check queries entered by user. This has to be a string of numbers separated by commas or "all" which means run all 99 queiries.
-     * All query numbers have to be between 1 and 99.
-     *
-     * @param tpcdsOptions TpcdsOptions object constructed from user input
-     * @return An array of query names, for example "1,2,7" will be output as "query1,query2,query7"
-     * @throws Exception
-     */
-    public static String[] getAndCheckQueryNameArray(TpcdsOptions tpcdsOptions) throws Exception {
-        String queryNums = tpcdsOptions.getQueries();
+    return dataSize;
+  }
 
-        String[] queryNumArr;
-        if (queryNums.toLowerCase().equals("all")) {
-            // All 99 TPC-DS queries need to be executed.
-            queryNumArr = new String[99];
-            for (int i = 0; i < 99; i++) {
-                queryNumArr[i] = Integer.toString(i + 1);
-            }
-        } else {
-            // Split user input queryNums by spaces and commas, get an array of all query numbers.
-            queryNumArr = queryNums.split("[\\s,]+");
+  /**
+   * Get and check queries entered by user. This has to be a string of numbers separated by commas
+   * or "all" which means run all 99 queiries. All query numbers have to be between 1 and 99.

Review comment:
       s/quieries/queries




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 581721)
    Time Spent: 1h  (was: 50m)

> Run TPC-DS with BeamSQL and Spark runner
> ----------------------------------------
>
>                 Key: BEAM-11712
>                 URL: https://issues.apache.org/jira/browse/BEAM-11712
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Alexey Romanenko
>            Priority: P2
>          Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)