You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2021/04/13 12:23:24 UTC

[beam] 02/04: [BEAM-11712] Add options for input/output paths, make it run via SparkRunner

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a407d79680d01c35760f3fe4e76cd4192e34edd1
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Tue Mar 30 18:04:22 2021 +0200

    [BEAM-11712] Add options for input/output paths, make it run via SparkRunner
---
 sdks/java/testing/tpcds/README.md                  | 68 ++++++++++++++++++++++
 sdks/java/testing/tpcds/build.gradle               | 11 +++-
 .../apache/beam/sdk/tpcds/BeamSqlEnvRunner.java    | 14 ++---
 .../java/org/apache/beam/sdk/tpcds/BeamTpcds.java  | 18 +-----
 .../org/apache/beam/sdk/tpcds/QueryReader.java     | 49 +---------------
 .../apache/beam/sdk/tpcds/SqlTransformRunner.java  |  9 +--
 .../beam/sdk/tpcds/TableSchemaJSONLoader.java      | 43 +++++---------
 .../org/apache/beam/sdk/tpcds/TpcdsOptions.java    | 14 +++++
 .../beam/sdk/tpcds/TableSchemaJSONLoaderTest.java  |  4 +-
 9 files changed, 123 insertions(+), 107 deletions(-)

diff --git a/sdks/java/testing/tpcds/README.md b/sdks/java/testing/tpcds/README.md
new file mode 100644
index 0000000..89f8073
--- /dev/null
+++ b/sdks/java/testing/tpcds/README.md
@@ -0,0 +1,68 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# TPC-DS Benchmark
+
+## Google Dataflow Runner
+
+To execute TPC-DS benchmark for 1Gb dataset on Google Dataflow, run the following example command from the command line:
+
+```bash
+./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
+  --runner=DataflowRunner \
+  --queries=3,26,55 \
+  --tpcParallel=2 \
+  --dataDirectory=/path/to/tpcds_data/ \  
+  --project=apache-beam-testing \
+  --stagingLocation=gs://beamsql_tpcds_1/staging \
+  --tempLocation=gs://beamsql_tpcds_2/temp \
+  --dataDirectory=/path/to/tpcds_data/ \  
+  --region=us-west1 \
+  --maxNumWorkers=10"
+```
+
+To run a query using ZetaSQL planner (currently Query96 can be run using ZetaSQL), set the plannerName as below. If not specified, the default planner is Calcite.
+
+```bash
+./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
+  --runner=DataflowRunner \
+  --queries=96 \
+  --tpcParallel=2 \
+  --dataDirectory=/path/to/tpcds_data/ \  
+  --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \
+  --project=apache-beam-testing \
+  --stagingLocation=gs://beamsql_tpcds_1/staging \
+  --tempLocation=gs://beamsql_tpcds_2/temp \
+  --region=us-west1 \
+  --maxNumWorkers=10"
+```
+
+## Spark Runner
+
+To execute TPC-DS benchmark with Query3 for 1Gb dataset on Apache Spark 2.x, run the following example command from the command line:
+
+```bash
+./gradlew :sdks:java:testing:tpcds:run -Ptpcds.runner=":runners:spark:2" -Ptpcds.args=" \
+  --runner=SparkRunner \
+  --queries=3 \
+  --tpcParallel=1 \
+  --dataDirectory=/path/to/tpcds_data/ \
+  --dataSize=1G \
+  --resultsDirectory=/path/to/tpcds_results/"
+```
diff --git a/sdks/java/testing/tpcds/build.gradle b/sdks/java/testing/tpcds/build.gradle
index 6237776..79fb1e8 100644
--- a/sdks/java/testing/tpcds/build.gradle
+++ b/sdks/java/testing/tpcds/build.gradle
@@ -33,7 +33,7 @@ def tpcdsArgsProperty = "tpcds.args"
 def tpcdsRunnerProperty = "tpcds.runner"
 def tpcdsRunnerDependency = project.findProperty(tpcdsRunnerProperty)
         ?: ":runners:direct-java"
-def shouldProvideSpark = ":runners:spark".equals(tpcdsRunnerDependency)
+def shouldProvideSpark = ":runners:spark:2".equals(tpcdsRunnerDependency)
 def isDataflowRunner = ":runners:google-cloud-dataflow-java".equals(tpcdsRunnerDependency)
 def runnerConfiguration = ":runners:direct-java".equals(tpcdsRunnerDependency) ? "shadow" : null
 
@@ -88,6 +88,15 @@ if (shouldProvideSpark) {
     }
 }
 
+// Execute the TPC-DS queries or suites via Gradle.
+//
+// Parameters:
+//   -Ptpcds.runner
+//       Specify a runner subproject, such as ":runners:spark:2" or ":runners:flink:1.10"
+//       Defaults to ":runners:direct-java"
+//
+//   -Ptpcds.args
+//       Specify the command line for invoking org.apache.beam.sdk.tpcds.BeamTpcds
 task run(type: JavaExec) {
     def tpcdsArgsStr = project.findProperty(tpcdsArgsProperty) ?: ""
     def tpcdsArgsList = new ArrayList<String>()
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
index 43b97d2..69e676f 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
 import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -66,16 +68,13 @@ public class BeamSqlEnvRunner {
   private static final Logger LOG = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
 
   private static String buildTableCreateStatement(String tableName) {
-    String createStatement =
-        "CREATE EXTERNAL TABLE "
-            + tableName
-            + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
-    return createStatement;
+    return "CREATE EXTERNAL TABLE "
+        + tableName
+        + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
   }
 
   private static String buildDataLocation(String dataSize, String tableName) {
-    String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-    return dataLocation;
+    return DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
   }
 
   /**
@@ -107,6 +106,7 @@ public class BeamSqlEnvRunner {
       String tableName = entry.getKey();
       String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
       Schema tableSchema = schemaMap.get(tableName);
+      checkArgumentNotNull(tableSchema, "Table schema can't be null for table: " + tableName);
       Table table =
           Table.builder()
               .name(tableName)
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
index 6b25f65..3361453 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
@@ -17,23 +17,7 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-/**
- * To execute this main() method, run the following example command from the command line.
- *
- * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=3,26,55 \
- * --tpcParallel=2 \ --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging
- * \ --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \
- * --maxNumWorkers=10"
- *
- * <p>To run query using ZetaSQL planner (currently query96 can be run using ZetaSQL), set the
- * plannerName as below. If not specified, the default planner is Calcite.
- *
- * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=96 \
- * --tpcParallel=2 \ --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \
- * --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging \
- * --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \
- * --maxNumWorkers=10"
- */
+/** Main driver program to run TPC-DS benchmark. */
 public class BeamTpcds {
   /**
    * The main method can choose to run either SqlTransformRunner.runUsingSqlTransform() or
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
index ca4cf63..7b00a37 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
@@ -17,14 +17,6 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Objects;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
 
@@ -33,12 +25,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
  * ';'), write the query as a string and return it.
  */
 public class QueryReader {
-  public static String readQuery(String queryFileName) throws Exception {
-    String path = "queries/" + queryFileName + ".sql";
-    String query = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
-    return query;
-  }
-
   /**
    * Reads a query file (.sql), return the query as a string.
    *
@@ -47,38 +33,9 @@ public class QueryReader {
    * @return The query string stored in this file.
    * @throws Exception
    */
-  public static String readQuery2(String queryFileName) throws Exception {
-
-    // Prepare the file reader.
-    ClassLoader classLoader = QueryReader.class.getClassLoader();
-    if (classLoader == null) {
-      throw new RuntimeException("Can't get classloader from QueryReader.");
-    }
+  public static String readQuery(String queryFileName) throws Exception {
     String path = "queries/" + queryFileName + ".sql";
-
-    URL resource = classLoader.getResource(path);
-    if (resource == null) {
-      throw new RuntimeException("Resource for " + path + " can't be null.");
-    }
-    String queryFilePath = Objects.requireNonNull(resource).getPath();
-    File queryFile = new File(queryFilePath);
-    Reader fileReader =
-        new InputStreamReader(new FileInputStream(queryFile), StandardCharsets.UTF_8);
-    BufferedReader reader = new BufferedReader(fileReader);
-
-    // Read the file into stringBuilder.
-    StringBuilder stringBuilder = new StringBuilder();
-    String line;
-    String ls = System.getProperty("line.separator");
-    while ((line = reader.readLine()) != null) {
-      stringBuilder.append(line);
-      stringBuilder.append(ls);
-    }
-
-    // Delete the last new line separator.
-    stringBuilder.deleteCharAt(stringBuilder.length() - 1);
-    reader.close();
-
-    return stringBuilder.toString();
+    String query = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
+    return query;
   }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
index 34274f9..4f56c1a 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
@@ -49,8 +49,6 @@ import org.slf4j.LoggerFactory;
  * queries.
  */
 public class SqlTransformRunner {
-  private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
-  private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
   private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
   private static final List<String> SUMMARY_HEADERS_LIST =
       Arrays.asList(
@@ -86,14 +84,13 @@ public class SqlTransformRunner {
     PCollectionTuple tables = PCollectionTuple.empty(pipeline);
     for (Map.Entry<String, Schema> tableSchema : schemaMap.entrySet()) {
       String tableName = tableSchema.getKey();
-      System.out.println("tableName = " + tableName);
 
       // Only when queryString contains tableName, the table is relevant to this query and will be
       // added. This can avoid reading unnecessary data files.
       if (queryString.contains(tableName)) {
         // This is location path where the data are stored
-        String filePattern = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-        System.out.println("filePattern = " + filePattern);
+        String filePattern =
+            tpcdsOptions.getDataDirectory() + "/" + dataSize + "/" + tableName + ".dat";
 
         PCollection<Row> table =
             new TextTable(
@@ -196,7 +193,7 @@ public class SqlTransformRunner {
             .apply(
                 TextIO.write()
                     .to(
-                        RESULT_DIRECTORY
+                        tpcdsOptions.getResultsDirectory()
                             + "/"
                             + dataSize
                             + "/"
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
index 3a2371d..b6f8733 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
@@ -17,16 +17,15 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-import java.io.File;
-import java.net.URL;
+import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.FileNameUtils;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.ClassPath;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -36,12 +35,6 @@ import org.json.simple.parser.JSONParser;
  * table's schema into a string.
  */
 public class TableSchemaJSONLoader {
-  public static String readQuery(String tableName) throws Exception {
-    String path = "schemas/" + tableName + ".json";
-    String fixture = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
-    return fixture;
-  }
-
   /**
    * Read a table schema json file from resource/schemas directory, parse the file into a string
    * which can be utilized by BeamSqlEnv.executeDdl method.
@@ -55,7 +48,6 @@ public class TableSchemaJSONLoader {
   public static String parseTableSchema(String tableName) throws Exception {
     String path = "schemas/" + tableName + ".json";
     String schema = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
-    System.out.println("schema = " + schema);
 
     JSONObject jsonObject = (JSONObject) new JSONParser().parse(schema);
     JSONArray jsonArray = (JSONArray) jsonObject.get("schema");
@@ -67,11 +59,11 @@ public class TableSchemaJSONLoader {
     StringBuilder schemaStringBuilder = new StringBuilder();
 
     Iterator jsonArrIterator = jsonArray.iterator();
-    Iterator<Map.Entry> recordIterator;
+    Iterator recordIterator;
     while (jsonArrIterator.hasNext()) {
       recordIterator = ((Map) jsonArrIterator.next()).entrySet().iterator();
       while (recordIterator.hasNext()) {
-        Map.Entry pair = recordIterator.next();
+        Map.Entry pair = (Map.Entry) recordIterator.next();
 
         if (pair.getKey().equals("type")) {
           // If the key of the pair is "type", make some modification before appending it to the
@@ -105,9 +97,7 @@ public class TableSchemaJSONLoader {
       schemaStringBuilder.deleteCharAt(schemaStringBuilder.length() - 1);
     }
 
-    String schemaString = schemaStringBuilder.toString();
-
-    return schemaString;
+    return schemaStringBuilder.toString();
   }
 
   /**
@@ -116,25 +106,20 @@ public class TableSchemaJSONLoader {
    *
    * @return The list of names of all tables.
    */
-  public static List<String> getAllTableNames() {
+  public static List<String> getAllTableNames() throws IOException, URISyntaxException {
     ClassLoader classLoader = TableSchemaJSONLoader.class.getClassLoader();
     if (classLoader == null) {
       throw new RuntimeException("Can't get classloader from TableSchemaJSONLoader.");
     }
-    URL resource = classLoader.getResource("schemas");
-    if (resource == null) {
-      throw new RuntimeException("Resource for \"schemas\" can't be null.");
-    }
-    String tableDirPath = Objects.requireNonNull(resource).getPath();
-    File tableDir = new File(tableDirPath);
-    File[] tableDirListing = tableDir.listFiles();
+    ClassPath classPath = ClassPath.from(classLoader);
 
     List<String> tableNames = new ArrayList<>();
-
-    if (tableDirListing != null) {
-      for (File file : tableDirListing) {
-        // Remove the .json extension in file name
-        tableNames.add(FileNameUtils.getBaseName((file.getName())));
+    for (ClassPath.ResourceInfo resourceInfo : classPath.getResources()) {
+      String resourceName = resourceInfo.getResourceName();
+      if (resourceName.startsWith("schemas/")) {
+        String tableName =
+            resourceName.substring("schemas/".length(), resourceName.length() - ".json".length());
+        tableNames.add(tableName);
       }
     }
 
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
index c693dfd..8e8b3e6 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
@@ -20,11 +20,13 @@ package org.apache.beam.sdk.tpcds;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.Validation;
 
 /** Options used to configure TPC-DS test. */
 public interface TpcdsOptions extends PipelineOptions {
   @Description(
       "The size of TPC-DS data to run query on, user input should contain the unit, such as '1G', '10G'")
+  @Validation.Required
   String getDataSize();
 
   void setDataSize(String dataSize);
@@ -41,4 +43,16 @@ public interface TpcdsOptions extends PipelineOptions {
   Integer getTpcParallel();
 
   void setTpcParallel(Integer parallelism);
+
+  @Description("The path to input data directory")
+  @Validation.Required
+  String getDataDirectory();
+
+  void setDataDirectory(String path);
+
+  @Description("The path to directory with results")
+  @Validation.Required
+  String getResultsDirectory();
+
+  void setResultsDirectory(String path);
 }
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
index 1d597f0..651d6f0 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.tpcds;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -138,7 +140,7 @@ public class TableSchemaJSONLoaderTest {
   }
 
   @Test
-  public void testGetAllTableNames() {
+  public void testGetAllTableNames() throws IOException, URISyntaxException {
     List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
     Collections.sort(tableNames);
     List<String> expectedTableNames =