You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2021/04/13 12:23:24 UTC
[beam] 02/04: [BEAM-11712] Add options for input/output paths,
make it run via SparkRunner
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit a407d79680d01c35760f3fe4e76cd4192e34edd1
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Tue Mar 30 18:04:22 2021 +0200
[BEAM-11712] Add options for input/output paths, make it run via SparkRunner
---
sdks/java/testing/tpcds/README.md | 68 ++++++++++++++++++++++
sdks/java/testing/tpcds/build.gradle | 11 +++-
.../apache/beam/sdk/tpcds/BeamSqlEnvRunner.java | 14 ++---
.../java/org/apache/beam/sdk/tpcds/BeamTpcds.java | 18 +-----
.../org/apache/beam/sdk/tpcds/QueryReader.java | 49 +---------------
.../apache/beam/sdk/tpcds/SqlTransformRunner.java | 9 +--
.../beam/sdk/tpcds/TableSchemaJSONLoader.java | 43 +++++---------
.../org/apache/beam/sdk/tpcds/TpcdsOptions.java | 14 +++++
.../beam/sdk/tpcds/TableSchemaJSONLoaderTest.java | 4 +-
9 files changed, 123 insertions(+), 107 deletions(-)
diff --git a/sdks/java/testing/tpcds/README.md b/sdks/java/testing/tpcds/README.md
new file mode 100644
index 0000000..89f8073
--- /dev/null
+++ b/sdks/java/testing/tpcds/README.md
@@ -0,0 +1,68 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# TPC-DS Benchmark
+
+## Google Dataflow Runner
+
+To execute TPC-DS benchmark for 1Gb dataset on Google Dataflow, run the following example command from the command line:
+
+```bash
+./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
+ --runner=DataflowRunner \
+ --queries=3,26,55 \
+ --tpcParallel=2 \
+ --dataDirectory=/path/to/tpcds_data/ \
+ --project=apache-beam-testing \
+ --stagingLocation=gs://beamsql_tpcds_1/staging \
+ --tempLocation=gs://beamsql_tpcds_2/temp \
+ --dataDirectory=/path/to/tpcds_data/ \
+ --region=us-west1 \
+ --maxNumWorkers=10"
+```
+
+To run a query using ZetaSQL planner (currently Query96 can be run using ZetaSQL), set the plannerName as below. If not specified, the default planner is Calcite.
+
+```bash
+./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
+ --runner=DataflowRunner \
+ --queries=96 \
+ --tpcParallel=2 \
+ --dataDirectory=/path/to/tpcds_data/ \
+ --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \
+ --project=apache-beam-testing \
+ --stagingLocation=gs://beamsql_tpcds_1/staging \
+ --tempLocation=gs://beamsql_tpcds_2/temp \
+ --region=us-west1 \
+ --maxNumWorkers=10"
+```
+
+## Spark Runner
+
+To execute TPC-DS benchmark with Query3 for 1Gb dataset on Apache Spark 2.x, run the following example command from the command line:
+
+```bash
+./gradlew :sdks:java:testing:tpcds:run -Ptpcds.runner=":runners:spark:2" -Ptpcds.args=" \
+ --runner=SparkRunner \
+ --queries=3 \
+ --tpcParallel=1 \
+ --dataDirectory=/path/to/tpcds_data/ \
+ --dataSize=1G \
+ --resultsDirectory=/path/to/tpcds_results/"
+```
diff --git a/sdks/java/testing/tpcds/build.gradle b/sdks/java/testing/tpcds/build.gradle
index 6237776..79fb1e8 100644
--- a/sdks/java/testing/tpcds/build.gradle
+++ b/sdks/java/testing/tpcds/build.gradle
@@ -33,7 +33,7 @@ def tpcdsArgsProperty = "tpcds.args"
def tpcdsRunnerProperty = "tpcds.runner"
def tpcdsRunnerDependency = project.findProperty(tpcdsRunnerProperty)
?: ":runners:direct-java"
-def shouldProvideSpark = ":runners:spark".equals(tpcdsRunnerDependency)
+def shouldProvideSpark = ":runners:spark:2".equals(tpcdsRunnerDependency)
def isDataflowRunner = ":runners:google-cloud-dataflow-java".equals(tpcdsRunnerDependency)
def runnerConfiguration = ":runners:direct-java".equals(tpcdsRunnerDependency) ? "shadow" : null
@@ -88,6 +88,15 @@ if (shouldProvideSpark) {
}
}
+// Execute the TPC-DS queries or suites via Gradle.
+//
+// Parameters:
+// -Ptpcds.runner
+// Specify a runner subproject, such as ":runners:spark:2" or ":runners:flink:1.10"
+// Defaults to ":runners:direct-java"
+//
+// -Ptpcds.args
+// Specify the command line for invoking org.apache.beam.sdk.tpcds.BeamTpcds
task run(type: JavaExec) {
def tpcdsArgsStr = project.findProperty(tpcdsArgsProperty) ?: ""
def tpcdsArgsList = new ArrayList<String>()
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
index 43b97d2..69e676f 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.tpcds;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.Arrays;
@@ -66,16 +68,13 @@ public class BeamSqlEnvRunner {
private static final Logger LOG = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
private static String buildTableCreateStatement(String tableName) {
- String createStatement =
- "CREATE EXTERNAL TABLE "
- + tableName
- + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
- return createStatement;
+ return "CREATE EXTERNAL TABLE "
+ + tableName
+ + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
}
private static String buildDataLocation(String dataSize, String tableName) {
- String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
- return dataLocation;
+ return DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
}
/**
@@ -107,6 +106,7 @@ public class BeamSqlEnvRunner {
String tableName = entry.getKey();
String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
Schema tableSchema = schemaMap.get(tableName);
+ checkArgumentNotNull(tableSchema, "Table schema can't be null for table: " + tableName);
Table table =
Table.builder()
.name(tableName)
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
index 6b25f65..3361453 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
@@ -17,23 +17,7 @@
*/
package org.apache.beam.sdk.tpcds;
-/**
- * To execute this main() method, run the following example command from the command line.
- *
- * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=3,26,55 \
- * --tpcParallel=2 \ --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging
- * \ --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \
- * --maxNumWorkers=10"
- *
- * <p>To run query using ZetaSQL planner (currently query96 can be run using ZetaSQL), set the
- * plannerName as below. If not specified, the default planner is Calcite.
- *
- * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=96 \
- * --tpcParallel=2 \ --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \
- * --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging \
- * --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \
- * --maxNumWorkers=10"
- */
+/** Main driver program to run TPC-DS benchmark. */
public class BeamTpcds {
/**
* The main method can choose to run either SqlTransformRunner.runUsingSqlTransform() or
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
index ca4cf63..7b00a37 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
@@ -17,14 +17,6 @@
*/
package org.apache.beam.sdk.tpcds;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
@@ -33,12 +25,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
* ';'), write the query as a string and return it.
*/
public class QueryReader {
- public static String readQuery(String queryFileName) throws Exception {
- String path = "queries/" + queryFileName + ".sql";
- String query = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
- return query;
- }
-
/**
* Reads a query file (.sql), return the query as a string.
*
@@ -47,38 +33,9 @@ public class QueryReader {
* @return The query string stored in this file.
* @throws Exception
*/
- public static String readQuery2(String queryFileName) throws Exception {
-
- // Prepare the file reader.
- ClassLoader classLoader = QueryReader.class.getClassLoader();
- if (classLoader == null) {
- throw new RuntimeException("Can't get classloader from QueryReader.");
- }
+ public static String readQuery(String queryFileName) throws Exception {
String path = "queries/" + queryFileName + ".sql";
-
- URL resource = classLoader.getResource(path);
- if (resource == null) {
- throw new RuntimeException("Resource for " + path + " can't be null.");
- }
- String queryFilePath = Objects.requireNonNull(resource).getPath();
- File queryFile = new File(queryFilePath);
- Reader fileReader =
- new InputStreamReader(new FileInputStream(queryFile), StandardCharsets.UTF_8);
- BufferedReader reader = new BufferedReader(fileReader);
-
- // Read the file into stringBuilder.
- StringBuilder stringBuilder = new StringBuilder();
- String line;
- String ls = System.getProperty("line.separator");
- while ((line = reader.readLine()) != null) {
- stringBuilder.append(line);
- stringBuilder.append(ls);
- }
-
- // Delete the last new line separator.
- stringBuilder.deleteCharAt(stringBuilder.length() - 1);
- reader.close();
-
- return stringBuilder.toString();
+ String query = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
+ return query;
}
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
index 34274f9..4f56c1a 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
@@ -49,8 +49,6 @@ import org.slf4j.LoggerFactory;
* queries.
*/
public class SqlTransformRunner {
- private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
- private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
private static final List<String> SUMMARY_HEADERS_LIST =
Arrays.asList(
@@ -86,14 +84,13 @@ public class SqlTransformRunner {
PCollectionTuple tables = PCollectionTuple.empty(pipeline);
for (Map.Entry<String, Schema> tableSchema : schemaMap.entrySet()) {
String tableName = tableSchema.getKey();
- System.out.println("tableName = " + tableName);
// Only when queryString contains tableName, the table is relevant to this query and will be
// added. This can avoid reading unnecessary data files.
if (queryString.contains(tableName)) {
// This is location path where the data are stored
- String filePattern = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
- System.out.println("filePattern = " + filePattern);
+ String filePattern =
+ tpcdsOptions.getDataDirectory() + "/" + dataSize + "/" + tableName + ".dat";
PCollection<Row> table =
new TextTable(
@@ -196,7 +193,7 @@ public class SqlTransformRunner {
.apply(
TextIO.write()
.to(
- RESULT_DIRECTORY
+ tpcdsOptions.getResultsDirectory()
+ "/"
+ dataSize
+ "/"
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
index 3a2371d..b6f8733 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
@@ -17,16 +17,15 @@
*/
package org.apache.beam.sdk.tpcds;
-import java.io.File;
-import java.net.URL;
+import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.FileNameUtils;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.ClassPath;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
@@ -36,12 +35,6 @@ import org.json.simple.parser.JSONParser;
* table's schema into a string.
*/
public class TableSchemaJSONLoader {
- public static String readQuery(String tableName) throws Exception {
- String path = "schemas/" + tableName + ".json";
- String fixture = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
- return fixture;
- }
-
/**
* Read a table schema json file from resource/schemas directory, parse the file into a string
* which can be utilized by BeamSqlEnv.executeDdl method.
@@ -55,7 +48,6 @@ public class TableSchemaJSONLoader {
public static String parseTableSchema(String tableName) throws Exception {
String path = "schemas/" + tableName + ".json";
String schema = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
- System.out.println("schema = " + schema);
JSONObject jsonObject = (JSONObject) new JSONParser().parse(schema);
JSONArray jsonArray = (JSONArray) jsonObject.get("schema");
@@ -67,11 +59,11 @@ public class TableSchemaJSONLoader {
StringBuilder schemaStringBuilder = new StringBuilder();
Iterator jsonArrIterator = jsonArray.iterator();
- Iterator<Map.Entry> recordIterator;
+ Iterator recordIterator;
while (jsonArrIterator.hasNext()) {
recordIterator = ((Map) jsonArrIterator.next()).entrySet().iterator();
while (recordIterator.hasNext()) {
- Map.Entry pair = recordIterator.next();
+ Map.Entry pair = (Map.Entry) recordIterator.next();
if (pair.getKey().equals("type")) {
// If the key of the pair is "type", make some modification before appending it to the
@@ -105,9 +97,7 @@ public class TableSchemaJSONLoader {
schemaStringBuilder.deleteCharAt(schemaStringBuilder.length() - 1);
}
- String schemaString = schemaStringBuilder.toString();
-
- return schemaString;
+ return schemaStringBuilder.toString();
}
/**
@@ -116,25 +106,20 @@ public class TableSchemaJSONLoader {
*
* @return The list of names of all tables.
*/
- public static List<String> getAllTableNames() {
+ public static List<String> getAllTableNames() throws IOException, URISyntaxException {
ClassLoader classLoader = TableSchemaJSONLoader.class.getClassLoader();
if (classLoader == null) {
throw new RuntimeException("Can't get classloader from TableSchemaJSONLoader.");
}
- URL resource = classLoader.getResource("schemas");
- if (resource == null) {
- throw new RuntimeException("Resource for \"schemas\" can't be null.");
- }
- String tableDirPath = Objects.requireNonNull(resource).getPath();
- File tableDir = new File(tableDirPath);
- File[] tableDirListing = tableDir.listFiles();
+ ClassPath classPath = ClassPath.from(classLoader);
List<String> tableNames = new ArrayList<>();
-
- if (tableDirListing != null) {
- for (File file : tableDirListing) {
- // Remove the .json extension in file name
- tableNames.add(FileNameUtils.getBaseName((file.getName())));
+ for (ClassPath.ResourceInfo resourceInfo : classPath.getResources()) {
+ String resourceName = resourceInfo.getResourceName();
+ if (resourceName.startsWith("schemas/")) {
+ String tableName =
+ resourceName.substring("schemas/".length(), resourceName.length() - ".json".length());
+ tableNames.add(tableName);
}
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
index c693dfd..8e8b3e6 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
@@ -20,11 +20,13 @@ package org.apache.beam.sdk.tpcds;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.Validation;
/** Options used to configure TPC-DS test. */
public interface TpcdsOptions extends PipelineOptions {
@Description(
"The size of TPC-DS data to run query on, user input should contain the unit, such as '1G', '10G'")
+ @Validation.Required
String getDataSize();
void setDataSize(String dataSize);
@@ -41,4 +43,16 @@ public interface TpcdsOptions extends PipelineOptions {
Integer getTpcParallel();
void setTpcParallel(Integer parallelism);
+
+ @Description("The path to input data directory")
+ @Validation.Required
+ String getDataDirectory();
+
+ void setDataDirectory(String path);
+
+ @Description("The path to directory with results")
+ @Validation.Required
+ String getResultsDirectory();
+
+ void setResultsDirectory(String path);
}
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
index 1d597f0..651d6f0 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.tpcds;
import static org.junit.Assert.assertEquals;
+import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -138,7 +140,7 @@ public class TableSchemaJSONLoaderTest {
}
@Test
- public void testGetAllTableNames() {
+ public void testGetAllTableNames() throws IOException, URISyntaxException {
List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
Collections.sort(tableNames);
List<String> expectedTableNames =