You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/02 19:12:08 UTC

[GitHub] [beam] amaliujia commented on a change in pull request #12436: [BEAM-9891] TPC-DS module initialization, tables and queries stored

amaliujia commented on a change in pull request #12436:
URL: https://github.com/apache/beam/pull/12436#discussion_r464112377



##########
File path: sdks/java/extensions/sql/build.gradle
##########
@@ -56,6 +56,7 @@ dependencies {
   compile "com.alibaba:fastjson:1.2.68"
   compile "org.codehaus.janino:janino:3.0.11"
   compile "org.codehaus.janino:commons-compiler:3.0.11"
+  compile project(path: ":runners:google-cloud-dataflow-java")

Review comment:
       Do you need to change sql module's gradle file? The benchmarking tool is built on top of SQL, so SQL is just a library, right?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
##########
@@ -149,13 +149,35 @@ public static Object autoCastField(Schema.Field field, Object rawObj) {
         case INT16:
           return Short.valueOf(raw);
         case INT32:
+          if (raw.equals("")) {

Review comment:
       seems to me that this is not the place where you should handle "" to null conversion.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
##########
@@ -149,13 +149,35 @@ public static Object autoCastField(Schema.Field field, Object rawObj) {
         case INT16:
           return Short.valueOf(raw);
         case INT32:
+          if (raw.equals("")) {
+            return null;
+          }
           return Integer.valueOf(raw);
         case INT64:
+          if (raw.equals("")) {
+            return null;
+          }
           return Long.valueOf(raw);
         case FLOAT:
+          if (raw.equals("")) {
+            return null;
+          }
           return Float.valueOf(raw);
         case DOUBLE:
+          if (raw.equals("")) {
+            return null;
+          }
           return Double.valueOf(raw);
+          //          BigDecimal bdvalue = new BigDecimal(raw);

Review comment:
       remove useless comment.

##########
File path: sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.tpcds;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import java.util.List;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+
+/**
+ * To execute this main() method, run the following example command from the command line.
+ *
+ * ./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
+ *         --queries=3,26,55 \
+ *         --tpcParallel=2 \
+ *         --project=apache-beam-testing \
+ *         --stagingLocation=gs://beamsql_tpcds_1/staging \
+ *         --tempLocation=gs://beamsql_tpcds_2/temp \
+ *         --runner=DataflowRunner \
+ *         --region=us-west1 \
+ *         --maxNumWorkers=10"
+ */
+public class BeamTpcds {
+    public static void main(String[] args) throws Exception {
+        InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
+        inMemoryMetaStore.registerProvider(new TextTableProvider());
+
+        TpcdsOptions tpcdsOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+
+        String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+        String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
+        int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
+
+        // Using ExecutorService and CompletionService to fulfill multi-threading functionality
+        ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+        CompletionService<PipelineResult> completion = new ExecutorCompletionService<>(executor);
+
+        // After getting necessary parameters from tpcdsOptions, cast tpcdsOptions as a DataflowPipelineOptions object to read and set required parameters for pipeline execution.
+        DataflowPipelineOptions dataflowPipelineOptions = tpcdsOptions.as(DataflowPipelineOptions.class);
+
+        BeamSqlEnv env =
+                BeamSqlEnv
+                        .builder(inMemoryMetaStore)
+                        .setPipelineOptions(dataflowPipelineOptions)
+                        .build();
+
+        // Register all tables, set their schemas, and set the locations where their corresponding data are stored.
+        List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
+        for (String tableName : tableNames) {
+            String createStatement = "CREATE EXTERNAL TABLE " + tableName + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
+            String tableSchema = TableSchemaJSONLoader.parseTableSchema(tableName);
+            String dataLocation = "gs://beamsql_tpcds_1/data/" + dataSize +"/" + tableName + ".dat";
+            env.executeDdl(String.format(createStatement, tableSchema, dataLocation));
+        }
+
+        // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
+        Pipeline[] pipelines = new Pipeline[queryNameArr.length];
+
+        // Execute all queries, transform the each result into a PCollection<String>, write them into the txt file and store in a GCP directory.
+        for (int i = 0; i < queryNameArr.length; i++) {
+            // For each query, get a copy of pipelineOptions from command line arguments, set a unique job name using the time stamp so that multiple different pipelines can run together.
+            TpcdsOptions tpcdsOptionsCopy = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+            DataflowPipelineOptions dataflowPipelineOptionsCopy = tpcdsOptionsCopy.as(DataflowPipelineOptions.class);
+            dataflowPipelineOptionsCopy.setJobName(queryNameArr[i] + "result" + System.currentTimeMillis());
+
+            pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
+            String queryString = QueryReader.readQuery(queryNameArr[i]);
+
+            // Query execution
+            PCollection<Row> rows = BeamSqlRelUtils.toPCollection(pipelines[i], env.parseQuery(queryString));
+
+            // Transform the result from PCollection<Row> into PCollection<String>, and write it to the location where results are stored.
+            PCollection<String> rowStrings = rows.apply(MapElements
+                    .into(TypeDescriptors.strings())
+                    .via((Row row) -> row.toString()));
+            rowStrings.apply(TextIO.write().to("gs://beamsql_tpcds_1/tpcds_results/" + dataSize + "/" + pipelines[i].getOptions().getJobName()).withSuffix(".txt").withNumShards(1));

Review comment:
       Can you remove such `gs://beamsql_tpcds_1/tpcds_results/` to some static pulic final String?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org