You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2021/04/13 12:23:22 UTC

[beam] branch master updated (f805f1c -> b3ef203)

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from f805f1c  Merge pull request #14499 from [BEAM-11408, BEAM-11772] Add explicit output typehints to ensure coder determinism for BQ with auto-sharding
     new 28eec3f  [BEAM-11712] Make up-to-date build file and codestyle
     new a407d79  [BEAM-11712] Add options for input/output paths, make it run via SparkRunner
     new 8fe0c5c  [BEAM-11712] Fix static analysis warnings and typos on TPC-DS module
     new b3ef203  Merge pull request #14373: [BEAM-11712] Run TPC-DS via BeamSQL and Spark runner

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/java/testing/tpcds/README.md                  |   68 +
 sdks/java/testing/tpcds/build.gradle               |  108 +-
 .../apache/beam/sdk/tpcds/BeamSqlEnvRunner.java    |  327 +++--
 .../java/org/apache/beam/sdk/tpcds/BeamTpcds.java  |   50 +-
 .../java/org/apache/beam/sdk/tpcds/CsvToRow.java   |   47 +-
 .../org/apache/beam/sdk/tpcds/QueryReader.java     |   51 +-
 .../java/org/apache/beam/sdk/tpcds/RowToCsv.java   |   38 +-
 .../apache/beam/sdk/tpcds/SqlTransformRunner.java  |  314 +++--
 .../apache/beam/sdk/tpcds/SummaryGenerator.java    |  219 ++--
 .../beam/sdk/tpcds/TableSchemaJSONLoader.java      |  162 +--
 .../org/apache/beam/sdk/tpcds/TpcdsOptions.java    |   40 +-
 .../beam/sdk/tpcds/TpcdsOptionsRegistrar.java      |   10 +-
 .../beam/sdk/tpcds/TpcdsParametersReader.java      |  136 +-
 .../java/org/apache/beam/sdk/tpcds/TpcdsRun.java   |   54 +-
 .../org/apache/beam/sdk/tpcds/TpcdsRunResult.java  |  120 +-
 .../org/apache/beam/sdk/tpcds/TpcdsSchemas.java    | 1336 ++++++++++----------
 .../org/apache/beam/sdk/tpcds}/package-info.java   |    4 +-
 .../org/apache/beam/sdk/tpcds/QueryReaderTest.java |  361 +++---
 .../beam/sdk/tpcds/TableSchemaJSONLoaderTest.java  |  261 ++--
 .../beam/sdk/tpcds/TpcdsParametersReaderTest.java  |  110 +-
 .../apache/beam/sdk/tpcds/TpcdsSchemasTest.java    |  183 ++-
 21 files changed, 2159 insertions(+), 1840 deletions(-)
 create mode 100644 sdks/java/testing/tpcds/README.md
 copy sdks/java/testing/{load-tests/src/main/java/org/apache/beam/sdk/loadtests => tpcds/src/main/java/org/apache/beam/sdk/tpcds}/package-info.java (92%)

[beam] 02/04: [BEAM-11712] Add options for input/output paths, make it run via SparkRunner

Posted by ie...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a407d79680d01c35760f3fe4e76cd4192e34edd1
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Tue Mar 30 18:04:22 2021 +0200

    [BEAM-11712] Add options for input/output paths, make it run via SparkRunner
---
 sdks/java/testing/tpcds/README.md                  | 68 ++++++++++++++++++++++
 sdks/java/testing/tpcds/build.gradle               | 11 +++-
 .../apache/beam/sdk/tpcds/BeamSqlEnvRunner.java    | 14 ++---
 .../java/org/apache/beam/sdk/tpcds/BeamTpcds.java  | 18 +-----
 .../org/apache/beam/sdk/tpcds/QueryReader.java     | 49 +---------------
 .../apache/beam/sdk/tpcds/SqlTransformRunner.java  |  9 +--
 .../beam/sdk/tpcds/TableSchemaJSONLoader.java      | 43 +++++---------
 .../org/apache/beam/sdk/tpcds/TpcdsOptions.java    | 14 +++++
 .../beam/sdk/tpcds/TableSchemaJSONLoaderTest.java  |  4 +-
 9 files changed, 123 insertions(+), 107 deletions(-)

diff --git a/sdks/java/testing/tpcds/README.md b/sdks/java/testing/tpcds/README.md
new file mode 100644
index 0000000..89f8073
--- /dev/null
+++ b/sdks/java/testing/tpcds/README.md
@@ -0,0 +1,68 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# TPC-DS Benchmark
+
+## Google Dataflow Runner
+
+To execute TPC-DS benchmark for 1Gb dataset on Google Dataflow, run the following example command from the command line:
+
+```bash
+./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
+  --runner=DataflowRunner \
+  --queries=3,26,55 \
+  --tpcParallel=2 \
+  --dataDirectory=/path/to/tpcds_data/ \  
+  --project=apache-beam-testing \
+  --stagingLocation=gs://beamsql_tpcds_1/staging \
+  --tempLocation=gs://beamsql_tpcds_2/temp \
+  --dataDirectory=/path/to/tpcds_data/ \  
+  --region=us-west1 \
+  --maxNumWorkers=10"
+```
+
+To run a query using ZetaSQL planner (currently Query96 can be run using ZetaSQL), set the plannerName as below. If not specified, the default planner is Calcite.
+
+```bash
+./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
+  --runner=DataflowRunner \
+  --queries=96 \
+  --tpcParallel=2 \
+  --dataDirectory=/path/to/tpcds_data/ \  
+  --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \
+  --project=apache-beam-testing \
+  --stagingLocation=gs://beamsql_tpcds_1/staging \
+  --tempLocation=gs://beamsql_tpcds_2/temp \
+  --region=us-west1 \
+  --maxNumWorkers=10"
+```
+
+## Spark Runner
+
+To execute TPC-DS benchmark with Query3 for 1Gb dataset on Apache Spark 2.x, run the following example command from the command line:
+
+```bash
+./gradlew :sdks:java:testing:tpcds:run -Ptpcds.runner=":runners:spark:2" -Ptpcds.args=" \
+  --runner=SparkRunner \
+  --queries=3 \
+  --tpcParallel=1 \
+  --dataDirectory=/path/to/tpcds_data/ \
+  --dataSize=1G \
+  --resultsDirectory=/path/to/tpcds_results/"
+```
diff --git a/sdks/java/testing/tpcds/build.gradle b/sdks/java/testing/tpcds/build.gradle
index 6237776..79fb1e8 100644
--- a/sdks/java/testing/tpcds/build.gradle
+++ b/sdks/java/testing/tpcds/build.gradle
@@ -33,7 +33,7 @@ def tpcdsArgsProperty = "tpcds.args"
 def tpcdsRunnerProperty = "tpcds.runner"
 def tpcdsRunnerDependency = project.findProperty(tpcdsRunnerProperty)
         ?: ":runners:direct-java"
-def shouldProvideSpark = ":runners:spark".equals(tpcdsRunnerDependency)
+def shouldProvideSpark = ":runners:spark:2".equals(tpcdsRunnerDependency)
 def isDataflowRunner = ":runners:google-cloud-dataflow-java".equals(tpcdsRunnerDependency)
 def runnerConfiguration = ":runners:direct-java".equals(tpcdsRunnerDependency) ? "shadow" : null
 
@@ -88,6 +88,15 @@ if (shouldProvideSpark) {
     }
 }
 
+// Execute the TPC-DS queries or suites via Gradle.
+//
+// Parameters:
+//   -Ptpcds.runner
+//       Specify a runner subproject, such as ":runners:spark:2" or ":runners:flink:1.10"
+//       Defaults to ":runners:direct-java"
+//
+//   -Ptpcds.args
+//       Specify the command line for invoking org.apache.beam.sdk.tpcds.BeamTpcds
 task run(type: JavaExec) {
     def tpcdsArgsStr = project.findProperty(tpcdsArgsProperty) ?: ""
     def tpcdsArgsList = new ArrayList<String>()
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
index 43b97d2..69e676f 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
 import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -66,16 +68,13 @@ public class BeamSqlEnvRunner {
   private static final Logger LOG = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
 
   private static String buildTableCreateStatement(String tableName) {
-    String createStatement =
-        "CREATE EXTERNAL TABLE "
-            + tableName
-            + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
-    return createStatement;
+    return "CREATE EXTERNAL TABLE "
+        + tableName
+        + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
   }
 
   private static String buildDataLocation(String dataSize, String tableName) {
-    String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-    return dataLocation;
+    return DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
   }
 
   /**
@@ -107,6 +106,7 @@ public class BeamSqlEnvRunner {
       String tableName = entry.getKey();
       String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
       Schema tableSchema = schemaMap.get(tableName);
+      checkArgumentNotNull(tableSchema, "Table schema can't be null for table: " + tableName);
       Table table =
           Table.builder()
               .name(tableName)
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
index 6b25f65..3361453 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
@@ -17,23 +17,7 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-/**
- * To execute this main() method, run the following example command from the command line.
- *
- * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=3,26,55 \
- * --tpcParallel=2 \ --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging
- * \ --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \
- * --maxNumWorkers=10"
- *
- * <p>To run query using ZetaSQL planner (currently query96 can be run using ZetaSQL), set the
- * plannerName as below. If not specified, the default planner is Calcite.
- *
- * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=96 \
- * --tpcParallel=2 \ --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \
- * --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging \
- * --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \
- * --maxNumWorkers=10"
- */
+/** Main driver program to run TPC-DS benchmark. */
 public class BeamTpcds {
   /**
    * The main method can choose to run either SqlTransformRunner.runUsingSqlTransform() or
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
index ca4cf63..7b00a37 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
@@ -17,14 +17,6 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Objects;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
 
@@ -33,12 +25,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
  * ';'), write the query as a string and return it.
  */
 public class QueryReader {
-  public static String readQuery(String queryFileName) throws Exception {
-    String path = "queries/" + queryFileName + ".sql";
-    String query = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
-    return query;
-  }
-
   /**
    * Reads a query file (.sql), return the query as a string.
    *
@@ -47,38 +33,9 @@ public class QueryReader {
    * @return The query string stored in this file.
    * @throws Exception
    */
-  public static String readQuery2(String queryFileName) throws Exception {
-
-    // Prepare the file reader.
-    ClassLoader classLoader = QueryReader.class.getClassLoader();
-    if (classLoader == null) {
-      throw new RuntimeException("Can't get classloader from QueryReader.");
-    }
+  public static String readQuery(String queryFileName) throws Exception {
     String path = "queries/" + queryFileName + ".sql";
-
-    URL resource = classLoader.getResource(path);
-    if (resource == null) {
-      throw new RuntimeException("Resource for " + path + " can't be null.");
-    }
-    String queryFilePath = Objects.requireNonNull(resource).getPath();
-    File queryFile = new File(queryFilePath);
-    Reader fileReader =
-        new InputStreamReader(new FileInputStream(queryFile), StandardCharsets.UTF_8);
-    BufferedReader reader = new BufferedReader(fileReader);
-
-    // Read the file into stringBuilder.
-    StringBuilder stringBuilder = new StringBuilder();
-    String line;
-    String ls = System.getProperty("line.separator");
-    while ((line = reader.readLine()) != null) {
-      stringBuilder.append(line);
-      stringBuilder.append(ls);
-    }
-
-    // Delete the last new line separator.
-    stringBuilder.deleteCharAt(stringBuilder.length() - 1);
-    reader.close();
-
-    return stringBuilder.toString();
+    String query = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
+    return query;
   }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
index 34274f9..4f56c1a 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
@@ -49,8 +49,6 @@ import org.slf4j.LoggerFactory;
  * queries.
  */
 public class SqlTransformRunner {
-  private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
-  private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
   private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
   private static final List<String> SUMMARY_HEADERS_LIST =
       Arrays.asList(
@@ -86,14 +84,13 @@ public class SqlTransformRunner {
     PCollectionTuple tables = PCollectionTuple.empty(pipeline);
     for (Map.Entry<String, Schema> tableSchema : schemaMap.entrySet()) {
       String tableName = tableSchema.getKey();
-      System.out.println("tableName = " + tableName);
 
       // Only when queryString contains tableName, the table is relevant to this query and will be
       // added. This can avoid reading unnecessary data files.
       if (queryString.contains(tableName)) {
         // This is location path where the data are stored
-        String filePattern = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-        System.out.println("filePattern = " + filePattern);
+        String filePattern =
+            tpcdsOptions.getDataDirectory() + "/" + dataSize + "/" + tableName + ".dat";
 
         PCollection<Row> table =
             new TextTable(
@@ -196,7 +193,7 @@ public class SqlTransformRunner {
             .apply(
                 TextIO.write()
                     .to(
-                        RESULT_DIRECTORY
+                        tpcdsOptions.getResultsDirectory()
                             + "/"
                             + dataSize
                             + "/"
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
index 3a2371d..b6f8733 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
@@ -17,16 +17,15 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-import java.io.File;
-import java.net.URL;
+import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.FileNameUtils;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.ClassPath;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -36,12 +35,6 @@ import org.json.simple.parser.JSONParser;
  * table's schema into a string.
  */
 public class TableSchemaJSONLoader {
-  public static String readQuery(String tableName) throws Exception {
-    String path = "schemas/" + tableName + ".json";
-    String fixture = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
-    return fixture;
-  }
-
   /**
    * Read a table schema json file from resource/schemas directory, parse the file into a string
    * which can be utilized by BeamSqlEnv.executeDdl method.
@@ -55,7 +48,6 @@ public class TableSchemaJSONLoader {
   public static String parseTableSchema(String tableName) throws Exception {
     String path = "schemas/" + tableName + ".json";
     String schema = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
-    System.out.println("schema = " + schema);
 
     JSONObject jsonObject = (JSONObject) new JSONParser().parse(schema);
     JSONArray jsonArray = (JSONArray) jsonObject.get("schema");
@@ -67,11 +59,11 @@ public class TableSchemaJSONLoader {
     StringBuilder schemaStringBuilder = new StringBuilder();
 
     Iterator jsonArrIterator = jsonArray.iterator();
-    Iterator<Map.Entry> recordIterator;
+    Iterator recordIterator;
     while (jsonArrIterator.hasNext()) {
       recordIterator = ((Map) jsonArrIterator.next()).entrySet().iterator();
       while (recordIterator.hasNext()) {
-        Map.Entry pair = recordIterator.next();
+        Map.Entry pair = (Map.Entry) recordIterator.next();
 
         if (pair.getKey().equals("type")) {
           // If the key of the pair is "type", make some modification before appending it to the
@@ -105,9 +97,7 @@ public class TableSchemaJSONLoader {
       schemaStringBuilder.deleteCharAt(schemaStringBuilder.length() - 1);
     }
 
-    String schemaString = schemaStringBuilder.toString();
-
-    return schemaString;
+    return schemaStringBuilder.toString();
   }
 
   /**
@@ -116,25 +106,20 @@ public class TableSchemaJSONLoader {
    *
    * @return The list of names of all tables.
    */
-  public static List<String> getAllTableNames() {
+  public static List<String> getAllTableNames() throws IOException, URISyntaxException {
     ClassLoader classLoader = TableSchemaJSONLoader.class.getClassLoader();
     if (classLoader == null) {
       throw new RuntimeException("Can't get classloader from TableSchemaJSONLoader.");
     }
-    URL resource = classLoader.getResource("schemas");
-    if (resource == null) {
-      throw new RuntimeException("Resource for \"schemas\" can't be null.");
-    }
-    String tableDirPath = Objects.requireNonNull(resource).getPath();
-    File tableDir = new File(tableDirPath);
-    File[] tableDirListing = tableDir.listFiles();
+    ClassPath classPath = ClassPath.from(classLoader);
 
     List<String> tableNames = new ArrayList<>();
-
-    if (tableDirListing != null) {
-      for (File file : tableDirListing) {
-        // Remove the .json extension in file name
-        tableNames.add(FileNameUtils.getBaseName((file.getName())));
+    for (ClassPath.ResourceInfo resourceInfo : classPath.getResources()) {
+      String resourceName = resourceInfo.getResourceName();
+      if (resourceName.startsWith("schemas/")) {
+        String tableName =
+            resourceName.substring("schemas/".length(), resourceName.length() - ".json".length());
+        tableNames.add(tableName);
       }
     }
 
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
index c693dfd..8e8b3e6 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
@@ -20,11 +20,13 @@ package org.apache.beam.sdk.tpcds;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.Validation;
 
 /** Options used to configure TPC-DS test. */
 public interface TpcdsOptions extends PipelineOptions {
   @Description(
       "The size of TPC-DS data to run query on, user input should contain the unit, such as '1G', '10G'")
+  @Validation.Required
   String getDataSize();
 
   void setDataSize(String dataSize);
@@ -41,4 +43,16 @@ public interface TpcdsOptions extends PipelineOptions {
   Integer getTpcParallel();
 
   void setTpcParallel(Integer parallelism);
+
+  @Description("The path to input data directory")
+  @Validation.Required
+  String getDataDirectory();
+
+  void setDataDirectory(String path);
+
+  @Description("The path to directory with results")
+  @Validation.Required
+  String getResultsDirectory();
+
+  void setResultsDirectory(String path);
 }
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
index 1d597f0..651d6f0 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.tpcds;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -138,7 +140,7 @@ public class TableSchemaJSONLoaderTest {
   }
 
   @Test
-  public void testGetAllTableNames() {
+  public void testGetAllTableNames() throws IOException, URISyntaxException {
     List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
     Collections.sort(tableNames);
     List<String> expectedTableNames =

[beam] 01/04: [BEAM-11712] Make up-to-date build file and codestyle

Posted by ie...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 28eec3f9d484fb7e9484c8f8c57c30a4fdaf867b
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Tue Mar 30 11:16:39 2021 +0200

    [BEAM-11712] Make up-to-date build file and codestyle
---
 sdks/java/testing/tpcds/build.gradle               |   99 +-
 .../apache/beam/sdk/tpcds/BeamSqlEnvRunner.java    |  328 +++--
 .../java/org/apache/beam/sdk/tpcds/BeamTpcds.java  |   54 +-
 .../java/org/apache/beam/sdk/tpcds/CsvToRow.java   |   47 +-
 .../org/apache/beam/sdk/tpcds/QueryReader.java     |   87 +-
 .../java/org/apache/beam/sdk/tpcds/RowToCsv.java   |   38 +-
 .../apache/beam/sdk/tpcds/SqlTransformRunner.java  |  317 +++--
 .../apache/beam/sdk/tpcds/SummaryGenerator.java    |  219 ++--
 .../beam/sdk/tpcds/TableSchemaJSONLoader.java      |  171 ++-
 .../org/apache/beam/sdk/tpcds/TpcdsOptions.java    |   26 +-
 .../beam/sdk/tpcds/TpcdsOptionsRegistrar.java      |   10 +-
 .../beam/sdk/tpcds/TpcdsParametersReader.java      |  136 +-
 .../java/org/apache/beam/sdk/tpcds/TpcdsRun.java   |   54 +-
 .../org/apache/beam/sdk/tpcds/TpcdsRunResult.java  |  123 +-
 .../org/apache/beam/sdk/tpcds/TpcdsSchemas.java    | 1338 ++++++++++----------
 ...pcdsOptionsRegistrar.java => package-info.java} |   16 +-
 .../org/apache/beam/sdk/tpcds/QueryReaderTest.java |  361 +++---
 .../beam/sdk/tpcds/TableSchemaJSONLoaderTest.java  |  260 ++--
 .../beam/sdk/tpcds/TpcdsParametersReaderTest.java  |  110 +-
 .../apache/beam/sdk/tpcds/TpcdsSchemasTest.java    |  183 ++-
 20 files changed, 2138 insertions(+), 1839 deletions(-)

diff --git a/sdks/java/testing/tpcds/build.gradle b/sdks/java/testing/tpcds/build.gradle
index c52ec7a..6237776 100644
--- a/sdks/java/testing/tpcds/build.gradle
+++ b/sdks/java/testing/tpcds/build.gradle
@@ -16,38 +16,97 @@
  * limitations under the License.
  */
 
-plugins {
-    id 'java'
-}
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+        automaticModuleName: 'org.apache.beam.sdk.tpcds',
+        exportJavadoc: false,
+        archivesBaseName: 'beam-sdks-java-tpcds',
+)
 
-description = "Apache Beam :: SDKs :: Java :: TPC-DS Benchark"
+description = "Apache Beam :: SDKs :: Java :: TPC-DS"
 
-version '2.24.0-SNAPSHOT'
+// When running via Gradle, this property can be used to pass commandline arguments
+// to the TPD-DS run
+def tpcdsArgsProperty = "tpcds.args"
 
-sourceCompatibility = 1.8
+// When running via Gradle, this property sets the runner dependency
+def tpcdsRunnerProperty = "tpcds.runner"
+def tpcdsRunnerDependency = project.findProperty(tpcdsRunnerProperty)
+        ?: ":runners:direct-java"
+def shouldProvideSpark = ":runners:spark".equals(tpcdsRunnerDependency)
+def isDataflowRunner = ":runners:google-cloud-dataflow-java".equals(tpcdsRunnerDependency)
+def runnerConfiguration = ":runners:direct-java".equals(tpcdsRunnerDependency) ? "shadow" : null
+
+if (isDataflowRunner) {
+    /*
+     * We need to rely on manually specifying these evaluationDependsOn to ensure that
+     * the following projects are evaluated before we evaluate this project. This is because
+     * we are attempting to reference a property from the project directly.
+     */
+    evaluationDependsOn(":runners:google-cloud-dataflow-java:worker:legacy-worker")
+}
 
-repositories {
-    mavenCentral()
+configurations {
+    // A configuration for running the TPC-DS launcher directly from Gradle, which
+    // uses Gradle to put the appropriate dependencies on the Classpath rather than
+    // bundling them into a fat jar
+    gradleRun
 }
 
 dependencies {
-    compile 'com.googlecode.json-simple:json-simple:1.1.1'
-    compile project(path: ":sdks:java:core", configuration: "shadow")
-    compile project(path: ":runners:google-cloud-dataflow-java")
-    compile project(":sdks:java:io:google-cloud-platform")
+    compile library.java.vendored_guava_26_0_jre
+    compile library.java.vendored_calcite_1_20_0
+    compile library.java.commons_csv
+    compile library.java.slf4j_api
+    compile "com.googlecode.json-simple:json-simple:1.1.1"
+    compile "com.alibaba:fastjson:1.2.69"
     compile project(":sdks:java:extensions:sql")
     compile project(":sdks:java:extensions:sql:zetasql")
-    compile group: 'com.google.auto.service', name: 'auto-service', version: '1.0-rc1'
-    testCompile group: 'junit', name: 'junit', version: '4.12'
+    compile project(path: ":runners:google-cloud-dataflow-java")
+    compile project(path: ":sdks:java:core", configuration: "shadow")
+    testRuntimeClasspath library.java.slf4j_jdk14
+    testCompile project(path: ":sdks:java:io:google-cloud-platform", configuration: "testRuntime")
+    gradleRun project(project.path)
+    gradleRun project(path: tpcdsRunnerDependency, configuration: runnerConfiguration)
+
+    // The Spark runner requires the user to provide a Spark dependency. For self-contained
+    // runs with the Spark runner, we can provide such a dependency. This is deliberately phrased
+    // to not hardcode any runner other than :runners:direct-java
+    if (shouldProvideSpark) {
+        gradleRun library.java.spark_core, {
+            exclude group:"org.slf4j", module:"jul-to-slf4j"
+        }
+        gradleRun library.java.spark_sql
+        gradleRun library.java.spark_streaming
+    }
 }
 
-// When running via Gradle, this property can be used to pass commandline arguments
-// to the tpcds run
-def tpcdsArgsProperty = "tpcds.args"
+if (shouldProvideSpark) {
+    configurations.gradleRun {
+        // Using Spark runner causes a StackOverflowError if slf4j-jdk14 is on the classpath
+        exclude group: "org.slf4j", module: "slf4j-jdk14"
+    }
+}
 
 task run(type: JavaExec) {
-    main = "org.apache.beam.sdk.tpcds.BeamTpcds"
-    classpath = sourceSets.main.runtimeClasspath
     def tpcdsArgsStr = project.findProperty(tpcdsArgsProperty) ?: ""
-    args tpcdsArgsStr.split()
+    def tpcdsArgsList = new ArrayList<String>()
+    Collections.addAll(tpcdsArgsList, tpcdsArgsStr.split())
+
+    if (isDataflowRunner) {
+        dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
+
+        def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?:
+                project(":runners:google-cloud-dataflow-java:worker:legacy-worker")
+                        .shadowJar.archivePath
+        // Provide job with a customizable worker jar.
+        // With legacy worker jar, containerImage is set to empty (i.e. to use the internal build).
+        // More context and discussions can be found in PR#6694.
+        tpcdsArgsList.add("--dataflowWorkerJar=${dataflowWorkerJar}".toString())
+        tpcdsArgsList.add('--workerHarnessContainerImage=')
+    }
+
+    main = "org.apache.beam.sdk.tpcds.BeamTpcds"
+    classpath = configurations.gradleRun
+    args tpcdsArgsList.toArray()
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
index f94b748..43b97d2 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
@@ -18,9 +18,16 @@
 package org.apache.beam.sdk.tpcds;
 
 import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.SqlTransform;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -34,155 +41,198 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class executes jobs using BeamSqlEnv, it uses BeamSqlEnv.executeDdl and BeamSqlEnv.parseQuery to run queries.
+ * This class executes jobs using BeamSqlEnv, it uses BeamSqlEnv.executeDdl and
+ * BeamSqlEnv.parseQuery to run queries.
  */
 public class BeamSqlEnvRunner {
-    private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
-    private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
-    private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
-    private static final List<String> SUMMARY_HEADERS_LIST = Arrays.asList("Query Name", "Job Name", "Data Size", "Dialect", "Status", "Start Time", "End Time", "Elapsed Time(sec)");
-
-    private static final Logger Log = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
-
-    private static String buildTableCreateStatement(String tableName) {
-        String createStatement = "CREATE EXTERNAL TABLE " + tableName + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
-        return createStatement;
+  private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
+  private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
+  private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
+  private static final List<String> SUMMARY_HEADERS_LIST =
+      Arrays.asList(
+          "Query Name",
+          "Job Name",
+          "Data Size",
+          "Dialect",
+          "Status",
+          "Start Time",
+          "End Time",
+          "Elapsed Time(sec)");
+
+  private static final Logger LOG = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
+
+  private static String buildTableCreateStatement(String tableName) {
+    String createStatement =
+        "CREATE EXTERNAL TABLE "
+            + tableName
+            + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
+    return createStatement;
+  }
+
+  private static String buildDataLocation(String dataSize, String tableName) {
+    String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
+    return dataLocation;
+  }
+
+  /**
+   * Register all tables into BeamSqlEnv, set their schemas, and set the locations where their
+   * corresponding data are stored. Currently this method is not supported by ZetaSQL planner.
+   */
+  private static void registerAllTablesByBeamSqlEnv(BeamSqlEnv env, String dataSize)
+      throws Exception {
+    List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
+    for (String tableName : tableNames) {
+      String createStatement = buildTableCreateStatement(tableName);
+      String tableSchema = TableSchemaJSONLoader.parseTableSchema(tableName);
+      String dataLocation = buildDataLocation(dataSize, tableName);
+      env.executeDdl(String.format(createStatement, tableSchema, dataLocation));
     }
-
-    private static String buildDataLocation(String dataSize, String tableName) {
-        String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-        return dataLocation;
+  }
+
+  /**
+   * Register all tables into InMemoryMetaStore, set their schemas, and set the locations where
+   * their corresponding data are stored.
+   */
+  private static void registerAllTablesByInMemoryMetaStore(
+      InMemoryMetaStore inMemoryMetaStore, String dataSize) throws Exception {
+    JSONObject properties = new JSONObject();
+    properties.put("csvformat", "InformixUnload");
+
+    Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
+    for (Map.Entry<String, Schema> entry : schemaMap.entrySet()) {
+      String tableName = entry.getKey();
+      String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
+      Schema tableSchema = schemaMap.get(tableName);
+      Table table =
+          Table.builder()
+              .name(tableName)
+              .schema(tableSchema)
+              .location(dataLocation)
+              .properties(properties)
+              .type("text")
+              .build();
+      inMemoryMetaStore.createTable(table);
     }
-
-    /** Register all tables into BeamSqlEnv, set their schemas, and set the locations where their corresponding data are stored.
-     *  Currently this method is not supported by ZetaSQL planner. */
-    private static void registerAllTablesByBeamSqlEnv(BeamSqlEnv env, String dataSize) throws Exception {
-        List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
-        for (String tableName : tableNames) {
-            String createStatement = buildTableCreateStatement(tableName);
-            String tableSchema = TableSchemaJSONLoader.parseTableSchema(tableName);
-            String dataLocation = buildDataLocation(dataSize, tableName);
-            env.executeDdl(String.format(createStatement, tableSchema, dataLocation));
-        }
+  }
+
+  /**
+   * Print the summary table after all jobs are finished.
+   *
+   * @param completion A collection of all TpcdsRunResult that are from finished jobs.
+   * @param numOfResults The number of results in the collection.
+   * @throws Exception
+   */
+  private static void printExecutionSummary(
+      CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
+    List<List<String>> summaryRowsList = new ArrayList<>();
+    for (int i = 0; i < numOfResults; i++) {
+      TpcdsRunResult tpcdsRunResult = completion.take().get();
+      List<String> list = new ArrayList<>();
+      list.add(tpcdsRunResult.getQueryName());
+      list.add(tpcdsRunResult.getJobName());
+      list.add(tpcdsRunResult.getDataSize());
+      list.add(tpcdsRunResult.getDialect());
+      list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
+      list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
+      list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString() : "");
+      list.add(
+          tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
+      summaryRowsList.add(list);
     }
 
-    /** Register all tables into InMemoryMetaStore, set their schemas, and set the locations where their corresponding data are stored. */
-    private static void registerAllTablesByInMemoryMetaStore(InMemoryMetaStore inMemoryMetaStore, String dataSize) throws Exception {
-        JSONObject properties = new JSONObject();
-        properties.put("csvformat", "InformixUnload");
-
-        Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
-        for (String tableName : schemaMap.keySet()) {
-            String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-            Schema tableSchema = schemaMap.get(tableName);
-            Table table = Table.builder().name(tableName).schema(tableSchema).location(dataLocation).properties(properties).type("text").build();
-            inMemoryMetaStore.createTable(table);
-        }
+    System.out.println(SUMMARY_START);
+    System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+  }
+
+  /**
+   * This is the alternative method in BeamTpcds.main method. Run job using BeamSqlEnv.parseQuery()
+   * method. (Doesn't perform well when running query96).
+   *
+   * @param args Command line arguments
+   * @throws Exception
+   */
+  public static void runUsingBeamSqlEnv(String[] args) throws Exception {
+    InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
+    inMemoryMetaStore.registerProvider(new TextTableProvider());
+
+    TpcdsOptions tpcdsOptions =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+
+    String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+    String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
+    int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
+
+    // Using ExecutorService and CompletionService to fulfill multi-threading functionality
+    ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+    CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
+
+    // Directly create all tables and register them into inMemoryMetaStore before creating
+    // BeamSqlEnv object.
+    registerAllTablesByInMemoryMetaStore(inMemoryMetaStore, dataSize);
+
+    BeamSqlPipelineOptions beamSqlPipelineOptions = tpcdsOptions.as(BeamSqlPipelineOptions.class);
+    BeamSqlEnv env =
+        BeamSqlEnv.builder(inMemoryMetaStore)
+            .setPipelineOptions(beamSqlPipelineOptions)
+            .setQueryPlannerClassName(beamSqlPipelineOptions.getPlannerName())
+            .build();
+
+    // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
+    Pipeline[] pipelines = new Pipeline[queryNameArr.length];
+
+    // Execute all queries, transform the each result into a PCollection<String>, write them into
+    // the txt file and store in a GCP directory.
+    for (int i = 0; i < queryNameArr.length; i++) {
+      // For each query, get a copy of pipelineOptions from command line arguments, cast
+      // tpcdsOptions as a DataflowPipelineOptions object to read and set required parameters for
+      // pipeline execution.
+      TpcdsOptions tpcdsOptionsCopy =
+          PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+      DataflowPipelineOptions dataflowPipelineOptionsCopy =
+          tpcdsOptionsCopy.as(DataflowPipelineOptions.class);
+
+      // Set a unique job name using the time stamp so that multiple different pipelines can run
+      // together.
+      dataflowPipelineOptionsCopy.setJobName(
+          queryNameArr[i] + "result" + System.currentTimeMillis());
+
+      pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
+      String queryString = QueryReader.readQuery(queryNameArr[i]);
+
+      try {
+        // Query execution
+        PCollection<Row> rows =
+            BeamSqlRelUtils.toPCollection(pipelines[i], env.parseQuery(queryString));
+
+        // Transform the result from PCollection<Row> into PCollection<String>, and write it to the
+        // location where results are stored.
+        PCollection<String> rowStrings =
+            rows.apply(
+                MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()));
+        rowStrings.apply(
+            TextIO.write()
+                .to(
+                    RESULT_DIRECTORY
+                        + "/"
+                        + dataSize
+                        + "/"
+                        + pipelines[i].getOptions().getJobName())
+                .withSuffix(".txt")
+                .withNumShards(1));
+      } catch (Exception e) {
+        LOG.error("{} failed to execute", queryNameArr[i]);
+        e.printStackTrace();
+      }
+
+      completion.submit(new TpcdsRun(pipelines[i]));
     }
 
-    /**
-     * Print the summary table after all jobs are finished.
-     * @param completion A collection of all TpcdsRunResult that are from finished jobs.
-     * @param numOfResults The number of results in the collection.
-     * @throws Exception
-     */
-    private static void printExecutionSummary(CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
-        List<List<String>> summaryRowsList = new ArrayList<>();
-        for (int i = 0; i < numOfResults; i++) {
-            TpcdsRunResult tpcdsRunResult = completion.take().get();
-            List<String> list = new ArrayList<>();
-            list.add(tpcdsRunResult.getQueryName());
-            list.add(tpcdsRunResult.getJobName());
-            list.add(tpcdsRunResult.getDataSize());
-            list.add(tpcdsRunResult.getDialect());
-            list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
-            list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
-            list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString(): "");
-            list.add(tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
-            summaryRowsList.add(list);
-        }
-
-        System.out.println(SUMMARY_START);
-        System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
-    }
+    executor.shutdown();
 
-    /**
-     * This is the alternative method in BeamTpcds.main method. Run job using BeamSqlEnv.parseQuery() method. (Doesn't perform well when running query96).
-     * @param args Command line arguments
-     * @throws Exception
-     */
-    public static void runUsingBeamSqlEnv(String[] args) throws Exception {
-        InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
-        inMemoryMetaStore.registerProvider(new TextTableProvider());
-
-        TpcdsOptions tpcdsOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
-
-        String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
-        String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
-        int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
-
-        // Using ExecutorService and CompletionService to fulfill multi-threading functionality
-        ExecutorService executor = Executors.newFixedThreadPool(nThreads);
-        CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
-
-        // Directly create all tables and register them into inMemoryMetaStore before creating BeamSqlEnv object.
-        registerAllTablesByInMemoryMetaStore(inMemoryMetaStore, dataSize);
-
-        BeamSqlPipelineOptions beamSqlPipelineOptions = tpcdsOptions.as(BeamSqlPipelineOptions.class);
-        BeamSqlEnv env =
-                BeamSqlEnv
-                        .builder(inMemoryMetaStore)
-                        .setPipelineOptions(beamSqlPipelineOptions)
-                        .setQueryPlannerClassName(beamSqlPipelineOptions.getPlannerName())
-                        .build();
-
-        // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
-        Pipeline[] pipelines = new Pipeline[queryNameArr.length];
-
-        // Execute all queries, transform the each result into a PCollection<String>, write them into the txt file and store in a GCP directory.
-        for (int i = 0; i < queryNameArr.length; i++) {
-            // For each query, get a copy of pipelineOptions from command line arguments, cast tpcdsOptions as a DataflowPipelineOptions object to read and set required parameters for pipeline execution.
-            TpcdsOptions tpcdsOptionsCopy = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
-            DataflowPipelineOptions dataflowPipelineOptionsCopy = tpcdsOptionsCopy.as(DataflowPipelineOptions.class);
-
-            // Set a unique job name using the time stamp so that multiple different pipelines can run together.
-            dataflowPipelineOptionsCopy.setJobName(queryNameArr[i] + "result" + System.currentTimeMillis());
-
-            pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
-            String queryString = QueryReader.readQuery(queryNameArr[i]);
-
-            try {
-                // Query execution
-                PCollection<Row> rows = BeamSqlRelUtils.toPCollection(pipelines[i], env.parseQuery(queryString));
-
-                // Transform the result from PCollection<Row> into PCollection<String>, and write it to the location where results are stored.
-                PCollection<String> rowStrings = rows.apply(MapElements
-                        .into(TypeDescriptors.strings())
-                        .via((Row row) -> row.toString()));
-                rowStrings.apply(TextIO.write().to(RESULT_DIRECTORY + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName()).withSuffix(".txt").withNumShards(1));
-            } catch (Exception e) {
-                Log.error("{} failed to execute", queryNameArr[i]);
-                e.printStackTrace();
-            }
-
-            completion.submit(new TpcdsRun(pipelines[i]));
-        }
-
-        executor.shutdown();
-
-        printExecutionSummary(completion, queryNameArr.length);
-    }
+    printExecutionSummary(completion, queryNameArr.length);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
index a7f67de..6b25f65 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
@@ -17,43 +17,33 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-
 /**
  * To execute this main() method, run the following example command from the command line.
  *
- * ./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
- *         --queries=3,26,55 \
- *         --tpcParallel=2 \
- *         --project=apache-beam-testing \
- *         --stagingLocation=gs://beamsql_tpcds_1/staging \
- *         --tempLocation=gs://beamsql_tpcds_2/temp \
- *         --runner=DataflowRunner \
- *         --region=us-west1 \
- *         --maxNumWorkers=10"
- *
+ * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=3,26,55 \
+ * --tpcParallel=2 \ --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging
+ * \ --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \
+ * --maxNumWorkers=10"
  *
- * To run query using ZetaSQL planner (currently query96 can be run using ZetaSQL), set the plannerName as below. If not specified, the default planner is Calcite.
+ * <p>To run query using ZetaSQL planner (currently query96 can be run using ZetaSQL), set the
+ * plannerName as below. If not specified, the default planner is Calcite.
  *
- * ./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
- *         --queries=96 \
- *         --tpcParallel=2 \
- *         --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \
- *         --project=apache-beam-testing \
- *         --stagingLocation=gs://beamsql_tpcds_1/staging \
- *         --tempLocation=gs://beamsql_tpcds_2/temp \
- *         --runner=DataflowRunner \
- *         --region=us-west1 \
- *         --maxNumWorkers=10"
+ * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=96 \
+ * --tpcParallel=2 \ --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \
+ * --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging \
+ * --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \
+ * --maxNumWorkers=10"
  */
 public class BeamTpcds {
-    /**
-     * The main method can choose to run either SqlTransformRunner.runUsingSqlTransform() or BeamSqlEnvRunner.runUsingBeamSqlEnv()
-     * Currently the former has better performance so it is chosen.
-     *
-     * @param args Command line arguments
-     * @throws Exception
-     */
-    public static void main(String[] args) throws Exception {
-        SqlTransformRunner.runUsingSqlTransform(args);
-    }
+  /**
+   * The main method can choose to run either SqlTransformRunner.runUsingSqlTransform() or
+   * BeamSqlEnvRunner.runUsingBeamSqlEnv() Currently the former has better performance so it is
+   * chosen.
+   *
+   * @param args Command line arguments
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    SqlTransformRunner.runUsingSqlTransform(args);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java
index 22519b2..d66b128 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLines2BeamRows;
+
+import java.io.Serializable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.FlatMapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -25,34 +28,30 @@ import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.commons.csv.CSVFormat;
 
-import java.io.Serializable;
-
-import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLines2BeamRows;
-
 /**
- * A readConverter class for TextTable that can read csv file and transform it to PCollection<Row>
+ * A readConverter class for TextTable that can read csv file and transform it to PCollection<Row>.
  */
 public class CsvToRow extends PTransform<PCollection<String>, PCollection<Row>>
-        implements Serializable {
-    private Schema schema;
-    private CSVFormat csvFormat;
+    implements Serializable {
+  private Schema schema;
+  private CSVFormat csvFormat;
 
-    public CSVFormat getCsvFormat() {
-        return csvFormat;
-    }
+  public CSVFormat getCsvFormat() {
+    return csvFormat;
+  }
 
-    public CsvToRow(Schema schema, CSVFormat csvFormat) {
-        this.schema = schema;
-        this.csvFormat = csvFormat;
-    }
+  public CsvToRow(Schema schema, CSVFormat csvFormat) {
+    this.schema = schema;
+    this.csvFormat = csvFormat;
+  }
 
-    @Override
-    public PCollection<Row> expand(PCollection<String> input) {
-        return input
-                .apply(
-                        "csvToRow",
-                        FlatMapElements.into(TypeDescriptors.rows())
-                                .via(s -> csvLines2BeamRows(csvFormat, s, schema)))
-                .setRowSchema(schema);
-    }
+  @Override
+  public PCollection<Row> expand(PCollection<String> input) {
+    return input
+        .apply(
+            "csvToRow",
+            FlatMapElements.into(TypeDescriptors.rows())
+                .via(s -> csvLines2BeamRows(csvFormat, s, schema)))
+        .setRowSchema(schema);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
index 1666c78..ca4cf63 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
@@ -19,41 +19,66 @@ package org.apache.beam.sdk.tpcds;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileReader;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
 
 /**
- * The QueryReader reads query file (the file's extension is '.sql' and content doesn't end with a ';'), write the query as a string and return it.
+ * The QueryReader reads query file (the file's extension is '.sql' and content doesn't end with a
+ * ';'), write the query as a string and return it.
  */
 public class QueryReader {
-    /**
-     * Reads a query file (.sql), return the query as a string.
-     * @param queryFileName The name of the query file (such as "query1, query5...") which is stored in resource/queries directory
-     * @return The query string stored in this file.
-     * @throws Exception
-     */
-    public static String readQuery(String queryFileName) throws Exception {
-        // Prepare the file reader.
-        String queryFilePath = Objects.requireNonNull(QueryReader.class.getClassLoader().getResource("queries/" + queryFileName + ".sql")).getPath();
-        File queryFile = new File(queryFilePath);
-        FileReader fileReader = new FileReader(queryFile);
-        BufferedReader reader = new BufferedReader(fileReader);
-
-        // Read the file into stringBuilder.
-        StringBuilder stringBuilder = new StringBuilder();
-        String line;
-        String ls = System.getProperty("line.separator");
-        while ((line = reader.readLine()) != null) {
-            stringBuilder.append(line);
-            stringBuilder.append(ls);
-        }
-
-        // Delete the last new line separator.
-        stringBuilder.deleteCharAt(stringBuilder.length() - 1);
-        reader.close();
-
-        String queryString = stringBuilder.toString();
-
-        return queryString;
+  public static String readQuery(String queryFileName) throws Exception {
+    String path = "queries/" + queryFileName + ".sql";
+    String query = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
+    return query;
+  }
+
+  /**
+   * Reads a query file (.sql), return the query as a string.
+   *
+   * @param queryFileName The name of the query file (such as "query1, query5...") which is stored
+   *     in resource/queries directory
+   * @return The query string stored in this file.
+   * @throws Exception
+   */
+  public static String readQuery2(String queryFileName) throws Exception {
+
+    // Prepare the file reader.
+    ClassLoader classLoader = QueryReader.class.getClassLoader();
+    if (classLoader == null) {
+      throw new RuntimeException("Can't get classloader from QueryReader.");
+    }
+    String path = "queries/" + queryFileName + ".sql";
+
+    URL resource = classLoader.getResource(path);
+    if (resource == null) {
+      throw new RuntimeException("Resource for " + path + " can't be null.");
     }
+    String queryFilePath = Objects.requireNonNull(resource).getPath();
+    File queryFile = new File(queryFilePath);
+    Reader fileReader =
+        new InputStreamReader(new FileInputStream(queryFile), StandardCharsets.UTF_8);
+    BufferedReader reader = new BufferedReader(fileReader);
+
+    // Read the file into stringBuilder.
+    StringBuilder stringBuilder = new StringBuilder();
+    String line;
+    String ls = System.getProperty("line.separator");
+    while ((line = reader.readLine()) != null) {
+      stringBuilder.append(line);
+      stringBuilder.append(ls);
+    }
+
+    // Delete the last new line separator.
+    stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+    reader.close();
+
+    return stringBuilder.toString();
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java
index 3bae75d..40a8cc5 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
+
+import java.io.Serializable;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -24,29 +27,26 @@ import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.commons.csv.CSVFormat;
 
-import java.io.Serializable;
-
-import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
-
 /**
- * A writeConverter class for TextTable that can transform PCollection<Row> into PCollection<String>, the format of string is determined by CSVFormat
+ * A writeConverter class for TextTable that can transform PCollection<Row> into
+ * PCollection<String>, the format of string is determined by CSVFormat.
  */
 public class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>
-        implements Serializable {
-    private CSVFormat csvFormat;
+    implements Serializable {
+  private CSVFormat csvFormat;
 
-    public RowToCsv(CSVFormat csvFormat) {
-        this.csvFormat = csvFormat;
-    }
+  public RowToCsv(CSVFormat csvFormat) {
+    this.csvFormat = csvFormat;
+  }
 
-    public CSVFormat getCsvFormat() {
-        return csvFormat;
-    }
+  public CSVFormat getCsvFormat() {
+    return csvFormat;
+  }
 
-    @Override
-    public PCollection<String> expand(PCollection<Row> input) {
-        return input.apply(
-                "rowToCsv",
-                MapElements.into(TypeDescriptors.strings()).via(row -> beamRow2CsvLine(row, csvFormat)));
-    }
+  @Override
+  public PCollection<String> expand(PCollection<Row> input) {
+    return input.apply(
+        "rowToCsv",
+        MapElements.into(TypeDescriptors.strings()).via(row -> beamRow2CsvLine(row, csvFormat)));
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
index a40cd12..34274f9 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
@@ -17,6 +17,14 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
@@ -27,155 +35,184 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.*;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.commons.csv.CSVFormat;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class executes jobs using PCollection and SqlTransform, it uses SqlTransform.query to run queries.
+ * This class executes jobs using PCollection and SqlTransform, it uses SqlTransform.query to run
+ * queries.
  */
 public class SqlTransformRunner {
-    private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
-    private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
-    private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
-    private static final List<String> SUMMARY_HEADERS_LIST = Arrays.asList("Query Name", "Job Name", "Data Size", "Dialect", "Status", "Start Time", "End Time", "Elapsed Time(sec)");
-
-    private static final Logger Log = LoggerFactory.getLogger(SqlTransform.class);
-
-    /**
-     * Get all tables (in the form of TextTable) needed for a specific query execution
-     * @param pipeline The pipeline that will be run to execute the query
-     * @param csvFormat The csvFormat to construct readConverter (CsvToRow) and writeConverter (RowToCsv)
-     * @param queryName The name of the query which will be executed (for example: query3, query55, query96)
-     * @return A PCollectionTuple which is constructed by all tables needed for running query.
-     * @throws Exception
-     */
-    private static PCollectionTuple getTables(Pipeline pipeline, CSVFormat csvFormat, String queryName) throws Exception {
-        Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
-        TpcdsOptions tpcdsOptions = pipeline.getOptions().as(TpcdsOptions.class);
-        String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
-        String queryString = QueryReader.readQuery(queryName);
-
-        PCollectionTuple tables = PCollectionTuple.empty(pipeline);
-        for (Map.Entry<String, Schema> tableSchema : schemaMap.entrySet()) {
-            String tableName = tableSchema.getKey();
-
-            // Only when queryString contains tableName, the table is relevant to this query and will be added. This can avoid reading unnecessary data files.
-            if (queryString.contains(tableName)) {
-                // This is location path where the data are stored
-                String filePattern = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-
-                PCollection<Row> table =
-                        new TextTable(
-                                tableSchema.getValue(),
-                                filePattern,
-                                new CsvToRow(tableSchema.getValue(), csvFormat),
-                                new RowToCsv(csvFormat))
-                                .buildIOReader(pipeline.begin())
-                                .setCoder(SchemaCoder.of(tableSchema.getValue()))
-                                .setName(tableSchema.getKey());
-
-                tables = tables.and(new TupleTag<>(tableName), table);
-            }
-        }
-        return tables;
+  private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
+  private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
+  private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
+  private static final List<String> SUMMARY_HEADERS_LIST =
+      Arrays.asList(
+          "Query Name",
+          "Job Name",
+          "Data Size",
+          "Dialect",
+          "Status",
+          "Start Time",
+          "End Time",
+          "Elapsed Time(sec)");
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqlTransformRunner.class);
+
+  /**
+   * Get all tables (in the form of TextTable) needed for a specific query execution.
+   *
+   * @param pipeline The pipeline that will be run to execute the query
+   * @param csvFormat The csvFormat to construct readConverter (CsvToRow) and writeConverter
+   *     (RowToCsv)
+   * @param queryName The name of the query which will be executed (for example: query3, query55,
+   *     query96)
+   * @return A PCollectionTuple which is constructed by all tables needed for running query.
+   * @throws Exception
+   */
+  private static PCollectionTuple getTables(
+      Pipeline pipeline, CSVFormat csvFormat, String queryName) throws Exception {
+    Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
+    TpcdsOptions tpcdsOptions = pipeline.getOptions().as(TpcdsOptions.class);
+    String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+    String queryString = QueryReader.readQuery(queryName);
+
+    PCollectionTuple tables = PCollectionTuple.empty(pipeline);
+    for (Map.Entry<String, Schema> tableSchema : schemaMap.entrySet()) {
+      String tableName = tableSchema.getKey();
+      System.out.println("tableName = " + tableName);
+
+      // Only when queryString contains tableName, the table is relevant to this query and will be
+      // added. This can avoid reading unnecessary data files.
+      if (queryString.contains(tableName)) {
+        // This is location path where the data are stored
+        String filePattern = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
+        System.out.println("filePattern = " + filePattern);
+
+        PCollection<Row> table =
+            new TextTable(
+                    tableSchema.getValue(),
+                    filePattern,
+                    new CsvToRow(tableSchema.getValue(), csvFormat),
+                    new RowToCsv(csvFormat))
+                .buildIOReader(pipeline.begin())
+                .setCoder(SchemaCoder.of(tableSchema.getValue()))
+                .setName(tableSchema.getKey());
+
+        tables = tables.and(new TupleTag<>(tableName), table);
+      }
     }
-
-    /**
-     * Print the summary table after all jobs are finished.
-     * @param completion A collection of all TpcdsRunResult that are from finished jobs.
-     * @param numOfResults The number of results in the collection.
-     * @throws Exception
-     */
-    private static void printExecutionSummary(CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
-        List<List<String>> summaryRowsList = new ArrayList<>();
-        for (int i = 0; i < numOfResults; i++) {
-            TpcdsRunResult tpcdsRunResult = completion.take().get();
-            List<String> list = new ArrayList<>();
-            list.add(tpcdsRunResult.getQueryName());
-            list.add(tpcdsRunResult.getJobName());
-            list.add(tpcdsRunResult.getDataSize());
-            list.add(tpcdsRunResult.getDialect());
-            // If the job is not successful, leave the run time related field blank
-            list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
-            list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
-            list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString(): "");
-            list.add(tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
-            summaryRowsList.add(list);
-        }
-
-        System.out.println(SUMMARY_START);
-        System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+    return tables;
+  }
+
+  /**
+   * Print the summary table after all jobs are finished.
+   *
+   * @param completion A collection of all TpcdsRunResult that are from finished jobs.
+   * @param numOfResults The number of results in the collection.
+   * @throws Exception
+   */
+  private static void printExecutionSummary(
+      CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
+    List<List<String>> summaryRowsList = new ArrayList<>();
+    for (int i = 0; i < numOfResults; i++) {
+      TpcdsRunResult tpcdsRunResult = completion.take().get();
+      List<String> list = new ArrayList<>();
+      list.add(tpcdsRunResult.getQueryName());
+      list.add(tpcdsRunResult.getJobName());
+      list.add(tpcdsRunResult.getDataSize());
+      list.add(tpcdsRunResult.getDialect());
+      // If the job is not successful, leave the run time related field blank
+      list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
+      list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
+      list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString() : "");
+      list.add(
+          tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
+      summaryRowsList.add(list);
     }
 
-    /**
-     * This is the default method in BeamTpcds.main method. Run job using SqlTranform.query() method.
-     * @param args Command line arguments
-     * @throws Exception
-     */
-    public static void runUsingSqlTransform(String[] args) throws Exception {
-        TpcdsOptions tpcdsOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
-
-        String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
-        String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
-        int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
-
-        // Using ExecutorService and CompletionService to fulfill multi-threading functionality
-        ExecutorService executor = Executors.newFixedThreadPool(nThreads);
-        CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
-
-        // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
-        Pipeline[] pipelines = new Pipeline[queryNameArr.length];
-        CSVFormat csvFormat = CSVFormat.MYSQL.withDelimiter('|').withNullString("");
-
-        // Execute all queries, transform the each result into a PCollection<String>, write them into the txt file and store in a GCP directory.
-        for (int i = 0; i < queryNameArr.length; i++) {
-            // For each query, get a copy of pipelineOptions from command line arguments.
-            TpcdsOptions tpcdsOptionsCopy = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
-
-            // Cast tpcdsOptions as a BeamSqlPipelineOptions object to read and set queryPlanner (the default one is Calcite, can change to ZetaSQL).
-            BeamSqlPipelineOptions beamSqlPipelineOptionsCopy = tpcdsOptionsCopy.as(BeamSqlPipelineOptions.class);
-
-            // Finally, cast BeamSqlPipelineOptions as a DataflowPipelineOptions object to read and set other required pipeline optionsparameters .
-            DataflowPipelineOptions dataflowPipelineOptionsCopy = beamSqlPipelineOptionsCopy.as(DataflowPipelineOptions.class);
-
-            // Set a unique job name using the time stamp so that multiple different pipelines can run together.
-            dataflowPipelineOptionsCopy.setJobName(queryNameArr[i] + "result" + System.currentTimeMillis());
-
-            pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
-            String queryString = QueryReader.readQuery(queryNameArr[i]);
-            PCollectionTuple tables = getTables(pipelines[i], csvFormat, queryNameArr[i]);
-
-            try {
-                tables
-                        .apply(
-                                SqlTransform.query(queryString))
-                        .apply(
-                                MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
-                        .apply(TextIO.write()
-                                .to(RESULT_DIRECTORY + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName())
-                                .withSuffix(".txt")
-                                .withNumShards(1));
-            } catch (Exception e) {
-                Log.error("{} failed to execute", queryNameArr[i]);
-                e.printStackTrace();
-            }
-
-            completion.submit(new TpcdsRun(pipelines[i]));
-        }
-
-        executor.shutdown();
-
-        printExecutionSummary(completion, queryNameArr.length);
+    System.out.println(SUMMARY_START);
+    System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+  }
+
+  /**
+   * This is the default method in BeamTpcds.main method. Run job using SqlTranform.query() method.
+   *
+   * @param args Command line arguments
+   * @throws Exception
+   */
+  public static void runUsingSqlTransform(String[] args) throws Exception {
+    TpcdsOptions tpcdsOptions =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+
+    String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+    String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
+    int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
+
+    // Using ExecutorService and CompletionService to fulfill multi-threading functionality
+    ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+    CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
+
+    // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
+    Pipeline[] pipelines = new Pipeline[queryNameArr.length];
+    CSVFormat csvFormat = CSVFormat.MYSQL.withDelimiter('|').withNullString("");
+
+    // Execute all queries, transform the each result into a PCollection<String>, write them into
+    // the txt file and store in a GCP directory.
+    for (int i = 0; i < queryNameArr.length; i++) {
+      // For each query, get a copy of pipelineOptions from command line arguments.
+      TpcdsOptions tpcdsOptionsCopy =
+          PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+
+      // Cast tpcdsOptions as a BeamSqlPipelineOptions object to read and set queryPlanner (the
+      // default one is Calcite, can change to ZetaSQL).
+      BeamSqlPipelineOptions beamSqlPipelineOptionsCopy =
+          tpcdsOptionsCopy.as(BeamSqlPipelineOptions.class);
+
+      // Finally, cast BeamSqlPipelineOptions as a DataflowPipelineOptions object to read and set
+      // other required pipeline optionsparameters .
+      DataflowPipelineOptions dataflowPipelineOptionsCopy =
+          beamSqlPipelineOptionsCopy.as(DataflowPipelineOptions.class);
+
+      // Set a unique job name using the time stamp so that multiple different pipelines can run
+      // together.
+      dataflowPipelineOptionsCopy.setJobName(
+          queryNameArr[i] + "result" + System.currentTimeMillis());
+
+      pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
+      String queryString = QueryReader.readQuery(queryNameArr[i]);
+      PCollectionTuple tables = getTables(pipelines[i], csvFormat, queryNameArr[i]);
+
+      try {
+        tables
+            .apply(SqlTransform.query(queryString))
+            .apply(MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
+            .apply(
+                TextIO.write()
+                    .to(
+                        RESULT_DIRECTORY
+                            + "/"
+                            + dataSize
+                            + "/"
+                            + pipelines[i].getOptions().getJobName())
+                    .withSuffix(".txt")
+                    .withNumShards(1));
+      } catch (Exception e) {
+        LOG.error("{} failed to execute", queryNameArr[i]);
+        e.printStackTrace();
+      }
+
+      completion.submit(new TpcdsRun(pipelines[i]));
     }
+
+    executor.shutdown();
+
+    printExecutionSummary(completion, queryNameArr.length);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
index bddb6a8..36f7a28 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
@@ -20,134 +20,157 @@ package org.apache.beam.sdk.tpcds;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
-/**
- * Generate the tpcds queries execution summary on the command line after finishing all jobs.
- */
+/** Generate the tpcds queries execution summary on the command line after finishing all jobs. */
 public class SummaryGenerator {
-    private static final int PADDING_SIZE = 2;
-    private static final String NEW_LINE = "\n";
-    private static final String TABLE_JOINT_SYMBOL = "+";
-    private static final String TABLE_V_SPLIT_SYMBOL = "|";
-    private static final String TABLE_H_SPLIT_SYMBOL = "-";
+  private static final int PADDING_SIZE = 2;
+  private static final String NEW_LINE = "\n";
+  private static final String TABLE_JOINT_SYMBOL = "+";
+  private static final String TABLE_V_SPLIT_SYMBOL = "|";
+  private static final String TABLE_H_SPLIT_SYMBOL = "-";
 
-    public static String generateTable(List<String> headersList, List<List<String>> rowsList,int... overRiddenHeaderHeight) {
-        StringBuilder stringBuilder = new StringBuilder();
+  public static String generateTable(
+      List<String> headersList, List<List<String>> rowsList, int... overRiddenHeaderHeight) {
+    StringBuilder stringBuilder = new StringBuilder();
 
-        int rowHeight = overRiddenHeaderHeight.length > 0 ? overRiddenHeaderHeight[0] : 1;
+    int rowHeight = overRiddenHeaderHeight.length > 0 ? overRiddenHeaderHeight[0] : 1;
 
-        Map<Integer,Integer> columnMaxWidthMapping = getMaximumWidthofTable(headersList, rowsList);
+    Map<Integer, Integer> columnMaxWidthMapping = getMaximumWidthofTable(headersList, rowsList);
 
-        stringBuilder.append(NEW_LINE);
-        stringBuilder.append(NEW_LINE);
-        createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
-        stringBuilder.append(NEW_LINE);
+    stringBuilder.append(NEW_LINE);
+    stringBuilder.append(NEW_LINE);
+    createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+    stringBuilder.append(NEW_LINE);
 
-        for (int headerIndex = 0; headerIndex < headersList.size(); headerIndex++) {
-            fillCell(stringBuilder, headersList.get(headerIndex), headerIndex, columnMaxWidthMapping);
-        }
-
-        stringBuilder.append(NEW_LINE);
+    for (int headerIndex = 0; headerIndex < headersList.size(); headerIndex++) {
+      fillCell(stringBuilder, headersList.get(headerIndex), headerIndex, columnMaxWidthMapping);
+    }
 
-        createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+    stringBuilder.append(NEW_LINE);
 
-        for (List<String> row : rowsList) {
-            for (int i = 0; i < rowHeight; i++) {
-                stringBuilder.append(NEW_LINE);
-            }
-            for (int cellIndex = 0; cellIndex < row.size(); cellIndex++) {
-                fillCell(stringBuilder, row.get(cellIndex), cellIndex, columnMaxWidthMapping);
-            }
-        }
+    createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
 
+    for (List<String> row : rowsList) {
+      for (int i = 0; i < rowHeight; i++) {
         stringBuilder.append(NEW_LINE);
-        createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
-        stringBuilder.append(NEW_LINE);
-        stringBuilder.append(NEW_LINE);
-
-        return stringBuilder.toString();
+      }
+      for (int cellIndex = 0; cellIndex < row.size(); cellIndex++) {
+        fillCell(stringBuilder, row.get(cellIndex), cellIndex, columnMaxWidthMapping);
+      }
     }
 
-    private static void fillSpace(StringBuilder stringBuilder, int length) {
-        for (int i = 0; i < length; i++) {
-            stringBuilder.append(" ");
-        }
-    }
+    stringBuilder.append(NEW_LINE);
+    createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+    stringBuilder.append(NEW_LINE);
+    stringBuilder.append(NEW_LINE);
 
-    /** Add a rowLine at the beginning, the middle between headersList and rowLists, the end of the summary table. */
-    private static void createRowLine(StringBuilder stringBuilder,int headersListSize, Map<Integer,Integer> columnMaxWidthMapping) {
-        for (int i = 0; i < headersListSize; i++) {
-            if(i == 0) {
-                stringBuilder.append(TABLE_JOINT_SYMBOL);
-            }
-
-            for (int j = 0; j < columnMaxWidthMapping.get(i) + PADDING_SIZE * 2 ; j++) {
-                stringBuilder.append(TABLE_H_SPLIT_SYMBOL);
-            }
-            stringBuilder.append(TABLE_JOINT_SYMBOL);
-        }
-    }
+    return stringBuilder.toString();
+  }
 
-    /** Get the width of the summary table. */
-    private static Map<Integer,Integer> getMaximumWidthofTable(List<String> headersList, List<List<String>> rowsList) {
-        Map<Integer,Integer> columnMaxWidthMapping = new HashMap<>();
+  private static void fillSpace(StringBuilder stringBuilder, int length) {
+    for (int i = 0; i < length; i++) {
+      stringBuilder.append(" ");
+    }
+  }
+
+  /**
+   * Add a rowLine at the beginning, the middle between headersList and rowLists, the end of the
+   * summary table.
+   */
+  private static void createRowLine(
+      StringBuilder stringBuilder,
+      int headersListSize,
+      Map<Integer, Integer> columnMaxWidthMapping) {
+    for (int i = 0; i < headersListSize; i++) {
+      if (i == 0) {
+        stringBuilder.append(TABLE_JOINT_SYMBOL);
+      }
+
+      int columnMaxWidth = Optional.ofNullable(columnMaxWidthMapping.get(i)).orElse(0);
+      for (int j = 0; j < columnMaxWidth + PADDING_SIZE * 2; j++) {
+        stringBuilder.append(TABLE_H_SPLIT_SYMBOL);
+      }
+      stringBuilder.append(TABLE_JOINT_SYMBOL);
+    }
+  }
 
-        for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
-            columnMaxWidthMapping.put(columnIndex, 0);
-        }
+  /** Get the width of the summary table. */
+  private static Map<Integer, Integer> getMaximumWidthofTable(
+      List<String> headersList, List<List<String>> rowsList) {
+    Map<Integer, Integer> columnMaxWidthMapping = new HashMap<>();
 
-        for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
-            if(headersList.get(columnIndex).length() > columnMaxWidthMapping.get(columnIndex)) {
-                columnMaxWidthMapping.put(columnIndex, headersList.get(columnIndex).length());
-            }
-        }
+    for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+      columnMaxWidthMapping.put(columnIndex, 0);
+    }
 
-        for (List<String> row : rowsList) {
-            for (int columnIndex = 0; columnIndex < row.size(); columnIndex++) {
-                if(row.get(columnIndex).length() > columnMaxWidthMapping.get(columnIndex)) {
-                    columnMaxWidthMapping.put(columnIndex, row.get(columnIndex).length());
-                }
-            }
-        }
+    for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+      Integer columnMaxWidth =
+          Optional.ofNullable(columnMaxWidthMapping.get(columnIndex)).orElse(0);
+      if (headersList.get(columnIndex).length() > columnMaxWidth) {
+        columnMaxWidthMapping.put(columnIndex, headersList.get(columnIndex).length());
+      }
+    }
 
-        for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
-            if(columnMaxWidthMapping.get(columnIndex) % 2 != 0) {
-                columnMaxWidthMapping.put(columnIndex, columnMaxWidthMapping.get(columnIndex) + 1);
-            }
+    for (List<String> row : rowsList) {
+      for (int columnIndex = 0; columnIndex < row.size(); columnIndex++) {
+        Integer columnMaxWidth =
+            Optional.ofNullable(columnMaxWidthMapping.get(columnIndex)).orElse(0);
+        if (row.get(columnIndex).length() > columnMaxWidth) {
+          columnMaxWidthMapping.put(columnIndex, row.get(columnIndex).length());
         }
-
-        return columnMaxWidthMapping;
+      }
     }
 
-    private static int getOptimumCellPadding(int cellIndex,int datalength,Map<Integer,Integer> columnMaxWidthMapping,int cellPaddingSize) {
-        if(datalength % 2 != 0) {
-            datalength++;
-        }
+    for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+      int columnMaxWidth = Optional.ofNullable(columnMaxWidthMapping.get(columnIndex)).orElse(0);
+      if (columnMaxWidth % 2 != 0) {
+        columnMaxWidthMapping.put(columnIndex, columnMaxWidth + 1);
+      }
+    }
 
-        if(datalength < columnMaxWidthMapping.get(cellIndex)) {
-            cellPaddingSize = cellPaddingSize + (columnMaxWidthMapping.get(cellIndex) - datalength) / 2;
-        }
+    return columnMaxWidthMapping;
+  }
 
-        return cellPaddingSize;
+  private static int getOptimumCellPadding(
+      int cellIndex,
+      int datalength,
+      Map<Integer, Integer> columnMaxWidthMapping,
+      int cellPaddingSize) {
+    if (datalength % 2 != 0) {
+      datalength++;
     }
 
-    /** Use space to fill a single cell with optimum cell padding size. */
-    private static void fillCell(StringBuilder stringBuilder,String cell,int cellIndex,Map<Integer,Integer> columnMaxWidthMapping) {
+    int columnMaxWidth = Optional.ofNullable(columnMaxWidthMapping.get(cellIndex)).orElse(0);
+    if (datalength < columnMaxWidth) {
+      cellPaddingSize = cellPaddingSize + (columnMaxWidth - datalength) / 2;
+    }
 
-        int cellPaddingSize = getOptimumCellPadding(cellIndex, cell.length(), columnMaxWidthMapping, PADDING_SIZE);
+    return cellPaddingSize;
+  }
 
-        if(cellIndex == 0) {
-            stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
-        }
+  /** Use space to fill a single cell with optimum cell padding size. */
+  private static void fillCell(
+      StringBuilder stringBuilder,
+      String cell,
+      int cellIndex,
+      Map<Integer, Integer> columnMaxWidthMapping) {
 
-        fillSpace(stringBuilder, cellPaddingSize);
-        stringBuilder.append(cell);
-        if(cell.length() % 2 != 0) {
-            stringBuilder.append(" ");
-        }
+    int cellPaddingSize =
+        getOptimumCellPadding(cellIndex, cell.length(), columnMaxWidthMapping, PADDING_SIZE);
 
-        fillSpace(stringBuilder, cellPaddingSize);
+    if (cellIndex == 0) {
+      stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+    }
 
-        stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+    fillSpace(stringBuilder, cellPaddingSize);
+    stringBuilder.append(cell);
+    if (cell.length() % 2 != 0) {
+      stringBuilder.append(" ");
     }
+
+    fillSpace(stringBuilder, cellPaddingSize);
+
+    stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
index 420386c..3a2371d 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
@@ -17,96 +17,127 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.FileNameUtils;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-
 import java.io.File;
-import java.io.FileReader;
+import java.net.URL;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.ArrayList;
-
+import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.FileNameUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
 
 /**
- * TableSchemaJSONLoader can get all table's names from resource/schemas directory and parse a table's schema into a string.
+ * TableSchemaJSONLoader can get all table's names from resource/schemas directory and parse a
+ * table's schema into a string.
  */
 public class TableSchemaJSONLoader {
-    /**
-     * Read a table schema json file from resource/schemas directory, parse the file into a string which can be utilized by BeamSqlEnv.executeDdl method.
-     * @param tableName The name of the json file to be read (fo example: item, store_sales).
-     * @return A string that matches the format in BeamSqlEnv.executeDdl method, such as "d_date_sk bigint, d_date_id varchar"
-     * @throws Exception
-     */
-    public static String parseTableSchema(String tableName) throws Exception {
-        String tableFilePath = Objects.requireNonNull(TableSchemaJSONLoader.class.getClassLoader().getResource("schemas/" + tableName +".json")).getPath();
+  public static String readQuery(String tableName) throws Exception {
+    String path = "schemas/" + tableName + ".json";
+    String fixture = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
+    return fixture;
+  }
 
-        JSONObject jsonObject = (JSONObject) new JSONParser().parse(new FileReader(new File(tableFilePath)));
-        JSONArray jsonArray = (JSONArray) jsonObject.get("schema");
+  /**
+   * Read a table schema json file from resource/schemas directory, parse the file into a string
+   * which can be utilized by BeamSqlEnv.executeDdl method.
+   *
+   * @param tableName The name of the json file to be read (fo example: item, store_sales).
+   * @return A string that matches the format in BeamSqlEnv.executeDdl method, such as "d_date_sk
+   *     bigint, d_date_id varchar"
+   * @throws Exception
+   */
+  @SuppressWarnings({"rawtypes", "DefaultCharset"})
+  public static String parseTableSchema(String tableName) throws Exception {
+    String path = "schemas/" + tableName + ".json";
+    String schema = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
+    System.out.println("schema = " + schema);
 
-        // Iterate each element in jsonArray to construct the schema string
-        StringBuilder schemaStringBuilder = new StringBuilder();
+    JSONObject jsonObject = (JSONObject) new JSONParser().parse(schema);
+    JSONArray jsonArray = (JSONArray) jsonObject.get("schema");
+    if (jsonArray == null) {
+      throw new RuntimeException("Can't get Json array for \"schema\" key.");
+    }
 
-        Iterator jsonArrIterator = jsonArray.iterator();
-        Iterator<Map.Entry> recordIterator;
-        while (jsonArrIterator.hasNext()) {
-            recordIterator = ((Map) jsonArrIterator.next()).entrySet().iterator();
-            while (recordIterator.hasNext()) {
-                Map.Entry pair = recordIterator.next();
+    // Iterate each element in jsonArray to construct the schema string
+    StringBuilder schemaStringBuilder = new StringBuilder();
 
-                if (pair.getKey().equals("type")) {
-                    // If the key of the pair is "type", make some modification before appending it to the schemaStringBuilder, then append a comma.
-                    String typeName = (String) pair.getValue();
-                    if (typeName.toLowerCase().equals("identifier") || typeName.toLowerCase().equals("integer")) {
-                        // Use long type to represent int, prevent overflow
-                        schemaStringBuilder.append("bigint");
-                    } else if (typeName.contains("decimal")) {
-                        // Currently Beam SQL doesn't handle "decimal" type properly, use "double" to replace it for now.
-                        schemaStringBuilder.append("double");
-                    } else {
-                        // Currently Beam SQL doesn't handle "date" type properly, use "varchar" replace it for now.
-                        schemaStringBuilder.append("varchar");
-                    }
-                    schemaStringBuilder.append(',');
-                } else {
-                    // If the key of the pair is "name", directly append it to the StringBuilder, then append a space.
-                    schemaStringBuilder.append((pair.getValue()));
-                    schemaStringBuilder.append(' ');
-                }
-            }
-        }
+    Iterator jsonArrIterator = jsonArray.iterator();
+    Iterator<Map.Entry> recordIterator;
+    while (jsonArrIterator.hasNext()) {
+      recordIterator = ((Map) jsonArrIterator.next()).entrySet().iterator();
+      while (recordIterator.hasNext()) {
+        Map.Entry pair = recordIterator.next();
 
-        // Delete the last ',' in schema string
-        if (schemaStringBuilder.length() > 0) {
-            schemaStringBuilder.deleteCharAt(schemaStringBuilder.length() - 1);
+        if (pair.getKey().equals("type")) {
+          // If the key of the pair is "type", make some modification before appending it to the
+          // schemaStringBuilder, then append a comma.
+          String typeName = (String) pair.getValue();
+          if (typeName.toLowerCase().equals("identifier")
+              || typeName.toLowerCase().equals("integer")) {
+            // Use long type to represent int, prevent overflow
+            schemaStringBuilder.append("bigint");
+          } else if (typeName.contains("decimal")) {
+            // Currently Beam SQL doesn't handle "decimal" type properly, use "double" to replace it
+            // for now.
+            schemaStringBuilder.append("double");
+          } else {
+            // Currently Beam SQL doesn't handle "date" type properly, use "varchar" replace it for
+            // now.
+            schemaStringBuilder.append("varchar");
+          }
+          schemaStringBuilder.append(',');
+        } else {
+          // If the key of the pair is "name", directly append it to the StringBuilder, then append
+          // a space.
+          schemaStringBuilder.append((pair.getValue()));
+          schemaStringBuilder.append(' ');
         }
+      }
+    }
 
-        String schemaString = schemaStringBuilder.toString();
-
-        return schemaString;
+    // Delete the last ',' in schema string
+    if (schemaStringBuilder.length() > 0) {
+      schemaStringBuilder.deleteCharAt(schemaStringBuilder.length() - 1);
     }
 
-    /**
-     * Get all tables' names. Tables are stored in resource/schemas directory in the form of json files, such as "item.json", "store_sales.json", they'll be converted to "item", "store_sales".
-     * @return The list of names of all tables.
-     */
-    public static List<String> getAllTableNames() {
-        String tableDirPath = Objects.requireNonNull(TableSchemaJSONLoader.class.getClassLoader().getResource("schemas")).getPath();
-        File tableDir = new File(tableDirPath);
-        File[] tableDirListing = tableDir.listFiles();
+    String schemaString = schemaStringBuilder.toString();
 
-        List<String> tableNames = new ArrayList<>();
+    return schemaString;
+  }
 
-        if (tableDirListing != null) {
-            for (File file : tableDirListing) {
-                // Remove the .json extension in file name
-                tableNames.add(FileNameUtils.getBaseName((file.getName())));
-            }
-        }
+  /**
+   * Get all tables' names. Tables are stored in resource/schemas directory in the form of json
+   * files, such as "item.json", "store_sales.json", they'll be converted to "item", "store_sales".
+   *
+   * @return The list of names of all tables.
+   */
+  public static List<String> getAllTableNames() {
+    ClassLoader classLoader = TableSchemaJSONLoader.class.getClassLoader();
+    if (classLoader == null) {
+      throw new RuntimeException("Can't get classloader from TableSchemaJSONLoader.");
+    }
+    URL resource = classLoader.getResource("schemas");
+    if (resource == null) {
+      throw new RuntimeException("Resource for \"schemas\" can't be null.");
+    }
+    String tableDirPath = Objects.requireNonNull(resource).getPath();
+    File tableDir = new File(tableDirPath);
+    File[] tableDirListing = tableDir.listFiles();
 
-        return tableNames;
+    List<String> tableNames = new ArrayList<>();
+
+    if (tableDirListing != null) {
+      for (File file : tableDirListing) {
+        // Remove the .json extension in file name
+        tableNames.add(FileNameUtils.getBaseName((file.getName())));
+      }
     }
+
+    return tableNames;
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
index 1c567dd..c693dfd 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
@@ -21,22 +21,24 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 
-/** Options used to configure TPC-DS test */
+/** Options used to configure TPC-DS test. */
 public interface TpcdsOptions extends PipelineOptions {
-    @Description("The size of TPC-DS data to run query on, user input should contain the unit, such as '1G', '10G'")
-    String getDataSize();
+  @Description(
+      "The size of TPC-DS data to run query on, user input should contain the unit, such as '1G', '10G'")
+  String getDataSize();
 
-    void setDataSize(String dataSize);
+  void setDataSize(String dataSize);
 
-    // Set the return type to be String since reading from the command line (user input will be like "1,2,55" which represent TPC-DS query1, query3, query55)
-    @Description("The queries numbers, read user input as string, numbers separated by commas")
-    String getQueries();
+  // Set the return type to be String since reading from the command line (user input will be like
+  // "1,2,55" which represent TPC-DS query1, query3, query55)
+  @Description("The queries numbers, read user input as string, numbers separated by commas")
+  String getQueries();
 
-    void setQueries(String queries);
+  void setQueries(String queries);
 
-    @Description("The number of queries to run in parallel")
-    @Default.Integer(1)
-    Integer getTpcParallel();
+  @Description("The number of queries to run in parallel")
+  @Default.Integer(1)
+  Integer getTpcParallel();
 
-    void setTpcParallel(Integer parallelism);
+  void setTpcParallel(Integer parallelism);
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
index d1ddc9d..40f4f86 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
@@ -24,10 +24,10 @@ import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.Immutabl
 
 /** {@link AutoService} registrar for {@link TpcdsOptions}. */
 @AutoService(PipelineOptionsRegistrar.class)
-public class TpcdsOptionsRegistrar implements PipelineOptionsRegistrar{
+public class TpcdsOptionsRegistrar implements PipelineOptionsRegistrar {
 
-    @Override
-    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-        return ImmutableList.of(TpcdsOptions.class);
-    }
+  @Override
+  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+    return ImmutableList.of(TpcdsOptions.class);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
index c281341..8928292 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
@@ -22,86 +22,88 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-/**
- * Get and check the TpcdsOptions' parameters, throw exceptions when user input is invalid
- */
+/** Get and check the TpcdsOptions' parameters, throw exceptions when user input is invalid. */
 public class TpcdsParametersReader {
-    /** The data sizes that have been supported. */
-    private static final Set<String> supportedDataSizes = Stream.of("1G", "10G", "100G").collect(Collectors.toCollection(HashSet::new));
-
-    /**
-     * Get and check dataSize entered by user. This dataSize has to have been supported.
-     *
-     * @param tpcdsOptions TpcdsOptions object constructed from user input
-     * @return The dateSize user entered, if it is contained in supportedDataSizes set.
-     * @throws Exception
-     */
-    public static String getAndCheckDataSize(TpcdsOptions tpcdsOptions) throws Exception {
-        String dataSize = tpcdsOptions.getDataSize();
+  /** The data sizes that have been supported. */
+  private static final Set<String> supportedDataSizes =
+      Stream.of("1G", "10G", "100G").collect(Collectors.toCollection(HashSet::new));
 
-        if (!supportedDataSizes.contains(dataSize)) {
-            throw new Exception("The data size you entered has not been supported.");
-        }
+  /**
+   * Get and check dataSize entered by user. This dataSize has to have been supported.
+   *
+   * @param tpcdsOptions TpcdsOptions object constructed from user input
+   * @return The dateSize user entered, if it is contained in supportedDataSizes set.
+   * @throws Exception
+   */
+  public static String getAndCheckDataSize(TpcdsOptions tpcdsOptions) throws Exception {
+    String dataSize = tpcdsOptions.getDataSize();
 
-        return dataSize;
+    if (!supportedDataSizes.contains(dataSize)) {
+      throw new Exception("The data size you entered has not been supported.");
     }
 
-    /**
-     * Get and check queries entered by user. This has to be a string of numbers separated by commas or "all" which means run all 99 queiries.
-     * All query numbers have to be between 1 and 99.
-     *
-     * @param tpcdsOptions TpcdsOptions object constructed from user input
-     * @return An array of query names, for example "1,2,7" will be output as "query1,query2,query7"
-     * @throws Exception
-     */
-    public static String[] getAndCheckQueryNameArray(TpcdsOptions tpcdsOptions) throws Exception {
-        String queryNums = tpcdsOptions.getQueries();
+    return dataSize;
+  }
 
-        String[] queryNumArr;
-        if (queryNums.toLowerCase().equals("all")) {
-            // All 99 TPC-DS queries need to be executed.
-            queryNumArr = new String[99];
-            for (int i = 0; i < 99; i++) {
-                queryNumArr[i] = Integer.toString(i + 1);
-            }
-        } else {
-            // Split user input queryNums by spaces and commas, get an array of all query numbers.
-            queryNumArr = queryNums.split("[\\s,]+");
+  /**
+   * Get and check queries entered by user. This has to be a string of numbers separated by commas
+   * or "all" which means run all 99 queiries. All query numbers have to be between 1 and 99.
+   *
+   * @param tpcdsOptions TpcdsOptions object constructed from user input
+   * @return An array of query names, for example "1,2,7" will be output as "query1,query2,query7"
+   * @throws Exception
+   */
+  public static String[] getAndCheckQueryNameArray(TpcdsOptions tpcdsOptions) throws Exception {
+    String queryNums = tpcdsOptions.getQueries();
 
-            for (String queryNumStr : queryNumArr) {
-                try {
-                    int queryNum = Integer.parseInt(queryNumStr);
-                    if (queryNum < 1 || queryNum > 99) {
-                        throw new Exception("The queries you entered contains invalid query number, please provide integers between 1 and 99.");
-                    }
-                } catch (NumberFormatException e) {
-                    System.out.println("The queries you entered should be integers, please provide integers between 1 and 99.");
-                }
-            }
-        }
+    String[] queryNumArr;
+    if (queryNums.toLowerCase().equals("all")) {
+      // All 99 TPC-DS queries need to be executed.
+      queryNumArr = new String[99];
+      for (int i = 0; i < 99; i++) {
+        queryNumArr[i] = Integer.toString(i + 1);
+      }
+    } else {
+      // Split user input queryNums by spaces and commas, get an array of all query numbers.
+      queryNumArr = queryNums.split("[\\s,]+");
 
-        String[] queryNameArr = new String[queryNumArr.length];
-        for (int i = 0; i < queryNumArr.length; i++) {
-            queryNameArr[i] = "query" + queryNumArr[i];
+      for (String queryNumStr : queryNumArr) {
+        try {
+          int queryNum = Integer.parseInt(queryNumStr);
+          if (queryNum < 1 || queryNum > 99) {
+            throw new Exception(
+                "The queries you entered contains invalid query number, please provide integers between 1 and 99.");
+          }
+        } catch (NumberFormatException e) {
+          System.out.println(
+              "The queries you entered should be integers, please provide integers between 1 and 99.");
         }
+      }
+    }
 
-        return queryNameArr;
+    String[] queryNameArr = new String[queryNumArr.length];
+    for (int i = 0; i < queryNumArr.length; i++) {
+      queryNameArr[i] = "query" + queryNumArr[i];
     }
 
-    /**
-     * Get and check TpcParallel entered by user. This has to be an integer between 1 and 99.
-     *
-     * @param tpcdsOptions TpcdsOptions object constructed from user input.
-     * @return The TpcParallel user entered.
-     * @throws Exception
-     */
-    public static int getAndCheckTpcParallel(TpcdsOptions tpcdsOptions) throws Exception {
-        int nThreads = tpcdsOptions.getTpcParallel();
+    return queryNameArr;
+  }
 
-        if (nThreads < 1 || nThreads > 99) {
-            throw new Exception("The TpcParallel your entered is invalid, please provide an integer between 1 and 99.");
-        }
+  /**
+   * Get and check TpcParallel entered by user. This has to be an integer between 1 and 99.
+   *
+   * @param tpcdsOptions TpcdsOptions object constructed from user input.
+   * @return The TpcParallel user entered.
+   * @throws Exception
+   */
+  public static int getAndCheckTpcParallel(TpcdsOptions tpcdsOptions) throws Exception {
+    int nThreads = tpcdsOptions.getTpcParallel();
 
-        return nThreads;
+    if (nThreads < 1 || nThreads > 99) {
+      throw new Exception(
+          "The TpcParallel your entered is invalid, please provide an integer between 1 and 99.");
     }
+
+    return nThreads;
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
index 1070a88..4b80aa8 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
@@ -17,40 +17,42 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import java.util.concurrent.Callable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
-import java.util.concurrent.Callable;
 
-/**
- * To fulfill multi-threaded execution
- */
+/** To fulfill multi-threaded execution. */
 public class TpcdsRun implements Callable<TpcdsRunResult> {
-    private final Pipeline pipeline;
+  private final Pipeline pipeline;
 
-    public TpcdsRun (Pipeline pipeline) {
-        this.pipeline = pipeline;
-    }
-
-    @Override
-    public TpcdsRunResult call() {
-        TpcdsRunResult tpcdsRunResult;
+  public TpcdsRun(Pipeline pipeline) {
+    this.pipeline = pipeline;
+  }
 
-        try {
-            PipelineResult pipelineResult = pipeline.run();
-            long startTimeStamp = System.currentTimeMillis();
-            State state = pipelineResult.waitUntilFinish();
-            long endTimeStamp = System.currentTimeMillis();
+  @Override
+  public TpcdsRunResult call() {
+    TpcdsRunResult tpcdsRunResult;
 
-            // Make sure to set the job status to be successful only when pipelineResult's final state is DONE.
-            boolean isSuccessful = state == State.DONE;
-            tpcdsRunResult = new TpcdsRunResult(isSuccessful, startTimeStamp, endTimeStamp, pipeline.getOptions(), pipelineResult);
-        } catch (Exception e) {
-            // If the pipeline execution failed, return a result with failed status but don't interrupt other threads.
-            e.printStackTrace();
-            tpcdsRunResult = new TpcdsRunResult(false, 0, 0, pipeline.getOptions(), null);
-        }
+    try {
+      PipelineResult pipelineResult = pipeline.run();
+      long startTimeStamp = System.currentTimeMillis();
+      State state = pipelineResult.waitUntilFinish();
+      long endTimeStamp = System.currentTimeMillis();
 
-        return tpcdsRunResult;
+      // Make sure to set the job status to be successful only when pipelineResult's final state is
+      // DONE.
+      boolean isSuccessful = state == State.DONE;
+      tpcdsRunResult =
+          new TpcdsRunResult(
+              isSuccessful, startTimeStamp, endTimeStamp, pipeline.getOptions(), pipelineResult);
+    } catch (Exception e) {
+      // If the pipeline execution failed, return a result with failed status but don't interrupt
+      // other threads.
+      e.printStackTrace();
+      tpcdsRunResult = new TpcdsRunResult(false, 0, 0, pipeline.getOptions(), null);
     }
+
+    return tpcdsRunResult;
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
index 0e22dce..c89a34a 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
@@ -17,76 +17,89 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import java.sql.Timestamp;
+import java.util.Date;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import java.sql.Timestamp;
-import java.util.Date;
-
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 public class TpcdsRunResult {
-    private boolean isSuccessful;
-    private long startTime;
-    private long endTime;
-    private PipelineOptions pipelineOptions;
-    private PipelineResult pipelineResult;
+  private final boolean isSuccessful;
+  private final long startTime;
+  private final long endTime;
+  private final PipelineOptions pipelineOptions;
+  private final @Nullable PipelineResult pipelineResult;
 
-    public TpcdsRunResult(boolean isSuccessful, long startTime, long endTime, PipelineOptions pipelineOptions, PipelineResult pipelineResult) {
-        this.isSuccessful = isSuccessful;
-        this.startTime = startTime;
-        this.endTime = endTime;
-        this.pipelineOptions = pipelineOptions;
-        this.pipelineResult = pipelineResult;
-    }
+  public TpcdsRunResult(
+      boolean isSuccessful,
+      long startTime,
+      long endTime,
+      PipelineOptions pipelineOptions,
+      @Nullable PipelineResult pipelineResult) {
+    this.isSuccessful = isSuccessful;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.pipelineOptions = pipelineOptions;
+    this.pipelineResult = pipelineResult;
+  }
 
-    public boolean getIsSuccessful() { return isSuccessful; }
+  public boolean getIsSuccessful() {
+    return isSuccessful;
+  }
 
-    public Date getStartDate() {
-        Timestamp startTimeStamp = new Timestamp(startTime);
-        Date startDate = new Date(startTimeStamp.getTime());
-        return startDate;
-    }
+  public Date getStartDate() {
+    Timestamp startTimeStamp = new Timestamp(startTime);
+    Date startDate = new Date(startTimeStamp.getTime());
+    return startDate;
+  }
 
-    public Date getEndDate() {
-        Timestamp endTimeStamp = new Timestamp(endTime);
-        Date endDate = new Date(endTimeStamp.getTime());
-        return endDate;
-    }
+  public Date getEndDate() {
+    Timestamp endTimeStamp = new Timestamp(endTime);
+    Date endDate = new Date(endTimeStamp.getTime());
+    return endDate;
+  }
 
-    public double getElapsedTime() {
-        return (endTime - startTime) / 1000.0;
-    }
+  public double getElapsedTime() {
+    return (endTime - startTime) / 1000.0;
+  }
 
-    public PipelineOptions getPipelineOptions() { return pipelineOptions; }
+  public PipelineOptions getPipelineOptions() {
+    return pipelineOptions;
+  }
 
-    public PipelineResult getPipelineResult() { return pipelineResult; }
+  public @Nullable PipelineResult getPipelineResult() {
+    return pipelineResult;
+  }
 
-    public String getJobName() {
-        PipelineOptions pipelineOptions = getPipelineOptions();
-        return pipelineOptions.getJobName();
-    }
+  public String getJobName() {
+    PipelineOptions pipelineOptions = getPipelineOptions();
+    return pipelineOptions.getJobName();
+  }
 
-    public String getQueryName() {
-        String jobName = getJobName();
-        int endIndex = jobName.indexOf("result");
-        String queryName = jobName.substring(0, endIndex);
-        return queryName;
-    }
+  public String getQueryName() {
+    String jobName = getJobName();
+    int endIndex = jobName.indexOf("result");
+    String queryName = jobName.substring(0, endIndex);
+    return queryName;
+  }
 
-    public String getDataSize() throws Exception {
-        PipelineOptions pipelineOptions = getPipelineOptions();
-        return TpcdsParametersReader.getAndCheckDataSize(pipelineOptions.as(TpcdsOptions.class));
-    }
+  public String getDataSize() throws Exception {
+    PipelineOptions pipelineOptions = getPipelineOptions();
+    return TpcdsParametersReader.getAndCheckDataSize(pipelineOptions.as(TpcdsOptions.class));
+  }
 
-    public String getDialect() throws Exception {
-        PipelineOptions pipelineOptions = getPipelineOptions();
-        String queryPlannerClassName = pipelineOptions.as(BeamSqlPipelineOptions.class).getPlannerName();
-        String dialect;
-        if (queryPlannerClassName.equals("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")) {
-            dialect = "ZetaSQL";
-        } else {
-            dialect = "Calcite";
-        }
-        return dialect;
+  public String getDialect() throws Exception {
+    PipelineOptions pipelineOptions = getPipelineOptions();
+    String queryPlannerClassName =
+        pipelineOptions.as(BeamSqlPipelineOptions.class).getPlannerName();
+    String dialect;
+    if (queryPlannerClassName.equals(
+        "org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")) {
+      dialect = "ZetaSQL";
+    } else {
+      dialect = "Calcite";
     }
+    return dialect;
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java
index b776551..7f3d874 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java
@@ -17,656 +17,706 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
 public class TpcdsSchemas {
-    /**
-     * Get all tpcds table schemas automatically by reading json files.
-     * In this case all field will be nullable, this is a bit different from the tpcds specification, but doesn't affect query execution.
-     *
-     * @return A map of all tpcds table schemas with their table names as keys.
-     * @throws Exception
-     */
-    public static Map<String, Schema> getTpcdsSchemas() throws Exception {
-        Map<String, Schema> schemaMap = new HashMap<>();
-
-        List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
-        for (String tableName : tableNames) {
-            Schema.Builder schemaBuilder = Schema.builder();
-
-            String tableSchemaString = TableSchemaJSONLoader.parseTableSchema(tableName);
-            String[] nameTypePairs = tableSchemaString.split(",");
-
-            for (String nameTypePairString : nameTypePairs) {
-                String[] nameTypePair = nameTypePairString.split("\\s+");
-                String name = nameTypePair[0];
-                String type = nameTypePair[1];
-
-                Schema.FieldType fieldType;
-                if (type.equals("bigint")) {
-                    fieldType = Schema.FieldType.INT64;
-                } else if (type.equals("double")) {
-                    fieldType = Schema.FieldType.DOUBLE;
-                } else {
-                    fieldType = Schema.FieldType.STRING;
-                }
-
-                schemaBuilder.addNullableField(name, fieldType);
-            }
-
-            Schema tableSchema = schemaBuilder.build();
-            schemaMap.put(tableName,tableSchema);
+  /**
+   * Get all tpcds table schemas automatically by reading json files. In this case all field will be
+   * nullable, this is a bit different from the tpcds specification, but doesn't affect query
+   * execution.
+   *
+   * @return A map of all tpcds table schemas with their table names as keys.
+   * @throws Exception
+   */
+  @SuppressWarnings("StringSplitter")
+  public static Map<String, Schema> getTpcdsSchemas() throws Exception {
+    Map<String, Schema> schemaMap = new HashMap<>();
+
+    List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
+    for (String tableName : tableNames) {
+      Schema.Builder schemaBuilder = Schema.builder();
+
+      String tableSchemaString = TableSchemaJSONLoader.parseTableSchema(tableName);
+      String[] nameTypePairs = tableSchemaString.split(",");
+
+      for (String nameTypePairString : nameTypePairs) {
+        String[] nameTypePair = nameTypePairString.split("\\s+");
+        String name = nameTypePair[0];
+        String type = nameTypePair[1];
+
+        Schema.FieldType fieldType;
+        if (type.equals("bigint")) {
+          fieldType = Schema.FieldType.INT64;
+        } else if (type.equals("double")) {
+          fieldType = Schema.FieldType.DOUBLE;
+        } else {
+          fieldType = Schema.FieldType.STRING;
         }
-        return schemaMap;
-    }
 
-    /**
-     * Get all tpcds table schemas according to tpcds official specification. Some fields are set to be not nullable.
-     *
-     * @return A map of all tpcds table schemas with their table names as keys.
-     */
-    public static Map<String, Schema> getTpcdsSchemasImmutableMap() {
-        ImmutableMap<String, Schema> immutableSchemaMap =
-                ImmutableMap.<String, Schema> builder()
-                        .put("call_center", callCenterSchema)
-                        .put("catalog_page", catalogPageSchema)
-                        .put("catalog_returns", catalogReturnsSchema)
-                        .put("catalog_sales", catalogSalesSchema)
-                        .put("customer", customerSchema)
-                        .put("customer_address", customerAddressSchema)
-                        .put("customer_demographics", customerDemographicsSchema)
-                        .put("date_dim", dateDimSchema)
-                        .put("household_demographics", householdDemographicsSchema)
-                        .put("income_band", incomeBandSchema)
-                        .put("inventory", inventorySchema)
-                        .put("item", itemSchema)
-                        .put("promotion", promotionSchema)
-                        .put("reason", reasonSchema)
-                        .put("ship_mode", shipModeSchema)
-                        .put("store", storeSchema)
-                        .put("store_returns", storeReturnsSchema)
-                        .put("store_sales", storeSalesSchema)
-                        .put("time_dim", timeDimSchema)
-                        .put("warehouse", warehouseSchema)
-                        .put("web_page", webPageSchema)
-                        .put("web_returns", webReturnsSchema)
-                        .put("web_sales", webSalesSchema)
-                        .put("web_site", webSiteSchema)
-                        .build();
-        return immutableSchemaMap;
-    }
+        schemaBuilder.addNullableField(name, fieldType);
+      }
 
-    public static Schema getCallCenterSchema() { return callCenterSchema; }
-
-    public static Schema getCatalogPageSchema() { return catalogPageSchema; }
-
-    public static Schema getCatalogReturnsSchema() { return catalogReturnsSchema; }
-
-    public static Schema getCatalogSalesSchema() { return catalogSalesSchema; }
-
-    public static Schema getCustomerSchema() { return customerSchema; }
-
-    public static Schema getCustomerAddressSchema() { return customerAddressSchema; }
-
-    public static Schema getCustomerDemographicsSchema() { return customerDemographicsSchema; }
-
-    public static Schema getDateDimSchema() { return dateDimSchema; }
-
-    public static Schema getHouseholdDemographicsSchema() { return householdDemographicsSchema; }
-
-    public static Schema getIncomeBandSchema() { return incomeBandSchema; }
-
-    public static Schema getInventorySchema() { return inventorySchema; }
-
-    public static Schema getItemSchema() { return itemSchema; }
-
-    public static Schema getPromotionSchema() { return promotionSchema; }
-
-    public static Schema getReasonSchema() { return reasonSchema; }
-
-    public static Schema getShipModeSchema() { return shipModeSchema; }
-
-    public static Schema getStoreSchema() { return storeSchema; }
-
-    public static Schema getStoreReturnsSchema() { return storeReturnsSchema; }
-
-    public static Schema getStoreSalesSchema() { return storeSalesSchema; }
-
-    public static Schema getTimeDimSchema() { return timeDimSchema; }
-
-    public static Schema getWarehouseSchema() { return warehouseSchema; }
-
-    public static Schema getWebpageSchema() { return webPageSchema; }
-
-    public static Schema getWebReturnsSchema() { return webReturnsSchema; }
-
-    public static Schema getWebSalesSchema() { return webSalesSchema; }
-
-    public static Schema getWebSiteSchema() { return webSiteSchema; }
-
-    private static Schema callCenterSchema =
-            Schema.builder()
-                    .addField("cc_call_center_sk", Schema.FieldType.INT64)
-                    .addField("cc_call_center_id", Schema.FieldType.STRING)
-                    .addNullableField("cc_rec_start_date", Schema.FieldType.STRING)
-                    .addNullableField("cc_rec_end_date", Schema.FieldType.STRING)
-                    .addNullableField("cc_closed_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cc_open_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cc_name", Schema.FieldType.STRING)
-                    .addNullableField("cc_class", Schema.FieldType.STRING)
-                    .addNullableField("cc_employees", Schema.FieldType.INT64)
-                    .addNullableField("cc_sq_ft", Schema.FieldType.INT64)
-                    .addNullableField("cc_hours", Schema.FieldType.STRING)
-                    .addNullableField("cc_manager", Schema.FieldType.STRING)
-                    .addNullableField("cc_mkt_id", Schema.FieldType.INT64)
-                    .addNullableField("cc_mkt_class", Schema.FieldType.STRING)
-                    .addNullableField("cc_mkt_desc", Schema.FieldType.STRING)
-                    .addNullableField("cc_market_manager", Schema.FieldType.STRING)
-                    .addNullableField("cc_division", Schema.FieldType.INT64)
-                    .addNullableField("cc_division_name", Schema.FieldType.STRING)
-                    .addNullableField("cc_company", Schema.FieldType.INT64)
-                    .addNullableField("cc_company_name", Schema.FieldType.STRING)
-                    .addNullableField("cc_street_number", Schema.FieldType.STRING)
-                    .addNullableField("cc_street_name", Schema.FieldType.STRING)
-                    .addNullableField("cc_street_type", Schema.FieldType.STRING)
-                    .addNullableField("cc_suite_number", Schema.FieldType.STRING)
-                    .addNullableField("cc_city", Schema.FieldType.STRING)
-                    .addNullableField("cc_county", Schema.FieldType.STRING)
-                    .addNullableField("cc_state", Schema.FieldType.STRING)
-                    .addNullableField("cc_zip", Schema.FieldType.STRING)
-                    .addNullableField("cc_country", Schema.FieldType.STRING)
-                    .addNullableField("cc_gmt_offset", Schema.FieldType.DOUBLE)
-                    .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema catalogPageSchema =
-            Schema.builder()
-                    .addField("cp_catalog_page_sk", Schema.FieldType.INT64)
-                    .addField("cp_catalog_page_id", Schema.FieldType.STRING)
-                    .addNullableField("cp_start_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cp_end_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cp_department", Schema.FieldType.STRING)
-                    .addNullableField("cp_catalog_number", Schema.FieldType.INT64)
-                    .addNullableField("cp_catalog_page_number", Schema.FieldType.INT64)
-                    .addNullableField("cp_description", Schema.FieldType.STRING)
-                    .addNullableField("cp_type", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema catalogReturnsSchema =
-            Schema.builder()
-                    .addNullableField("cr_returned_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_returned_time_sk", Schema.FieldType.INT64)
-                    .addField("cr_item_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_refunded_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_refunded_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_refunded_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_refunded_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_returning_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_returning_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_returning_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_returning_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_call_center_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_catalog_page_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_ship_mode_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_warehouse_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_reason_sk", Schema.FieldType.INT64)
-                    .addField("cr_order_number", Schema.FieldType.INT64)
-                    .addNullableField("cr_return_quantity", Schema.FieldType.INT64)
-                    .addNullableField("cr_return_amount", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_return_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_fee", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_return_ship_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_refunded_cash", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_reversed_charge", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_store_credit", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_net_loss", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema catalogSalesSchema =
-            Schema.builder()
-                    .addNullableField("cs_sold_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_sold_time_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_ship_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_bill_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_bill_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_bill_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_bill_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_ship_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_ship_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_ship_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_ship_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_call_center_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_catalog_page_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_ship_mode_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_warehouse_sk", Schema.FieldType.INT64)
-                    .addField("cs_item_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_promo_sk", Schema.FieldType.INT64)
-                    .addField("cs_order_number", Schema.FieldType.INT64)
-                    .addNullableField("cs_quantity", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_list_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_sales_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_ext_discount_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_ext_sales_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_ext_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_ext_list_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_ext_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_coupon_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_ext_ship_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_net_paid", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_net_paid_inc_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_net_paid_inc_ship", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_net_paid_inc_ship_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_net_profit", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema customerSchema =
-            Schema.builder()
-                    .addField("c_customer_sk", Schema.FieldType.INT64)
-                    .addField("c_customer_id", Schema.FieldType.STRING)
-                    .addNullableField("c_current_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("c_current_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("c_current_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("c_first_shipto_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("c_first_sales_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("c_salutation", Schema.FieldType.STRING)
-                    .addNullableField("c_first_name", Schema.FieldType.STRING)
-                    .addNullableField("c_last_name", Schema.FieldType.STRING)
-                    .addNullableField("c_preferred_cust_flag", Schema.FieldType.STRING)
-                    .addNullableField("c_birth_day", Schema.FieldType.INT64)
-                    .addNullableField("c_birth_month", Schema.FieldType.INT64)
-                    .addNullableField("c_birth_year", Schema.FieldType.INT64)
-                    .addNullableField("c_birth_country", Schema.FieldType.STRING)
-                    .addNullableField("c_login", Schema.FieldType.STRING)
-                    .addNullableField("c_email_address", Schema.FieldType.STRING)
-                    .addNullableField("c_last_review_date_sk", Schema.FieldType.INT64)
-                    .build();
-
-    private static Schema customerAddressSchema =
-            Schema.builder()
-                    .addField("ca_address_sk", Schema.FieldType.INT64)
-                    .addField("ca_address_id", Schema.FieldType.STRING)
-                    .addNullableField("ca_street_number", Schema.FieldType.STRING)
-                    .addNullableField("ca_street_name", Schema.FieldType.STRING)
-                    .addNullableField("ca_street_type", Schema.FieldType.STRING)
-                    .addNullableField("ca_suite_number", Schema.FieldType.STRING)
-                    .addNullableField("ca_city", Schema.FieldType.STRING)
-                    .addNullableField("ca_county", Schema.FieldType.STRING)
-                    .addNullableField("ca_state", Schema.FieldType.STRING)
-                    .addNullableField("ca_zip", Schema.FieldType.STRING)
-                    .addNullableField("ca_country", Schema.FieldType.STRING)
-                    .addNullableField("ca_gmt_offset", Schema.FieldType.DOUBLE)
-                    .addNullableField("ca_location_type", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema customerDemographicsSchema =
-            Schema.builder()
-                    .addField("cd_demo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cd_gender", Schema.FieldType.STRING)
-                    .addNullableField("cd_marital_status", Schema.FieldType.STRING)
-                    .addNullableField("cd_education_status", Schema.FieldType.STRING)
-                    .addNullableField("cd_purchase_estimate", Schema.FieldType.INT64)
-                    .addNullableField("cd_credit_rating", Schema.FieldType.STRING)
-                    .addNullableField("cd_dep_count", Schema.FieldType.INT64)
-                    .addNullableField("cd_dep_employed_count", Schema.FieldType.INT64)
-                    .addNullableField("cd_dep_college_count", Schema.FieldType.INT64)
-                    .build();
-
-    private static Schema dateDimSchema =
-            Schema.builder()
-                    .addField("d_date_sk", Schema.FieldType.INT64)
-                    .addField("d_date_id", Schema.FieldType.STRING)
-                    .addNullableField("d_date", Schema.FieldType.STRING)
-                    .addNullableField("d_month_seq", Schema.FieldType.INT64)
-                    .addNullableField("d_week_seq", Schema.FieldType.INT64)
-                    .addNullableField("d_quarter_seq", Schema.FieldType.INT64)
-                    .addNullableField("d_year", Schema.FieldType.INT64)
-                    .addNullableField("d_dow", Schema.FieldType.INT64)
-                    .addNullableField("d_moy", Schema.FieldType.INT64)
-                    .addNullableField("d_dom", Schema.FieldType.INT64)
-                    .addNullableField("d_qoy", Schema.FieldType.INT64)
-                    .addNullableField("d_fy_year", Schema.FieldType.INT64)
-                    .addNullableField("d_fy_quarter_seq", Schema.FieldType.INT64)
-                    .addNullableField("d_fy_week_seq", Schema.FieldType.INT64)
-                    .addNullableField("d_day_name", Schema.FieldType.STRING)
-                    .addNullableField("d_quarter_name", Schema.FieldType.STRING)
-                    .addNullableField("d_holiday", Schema.FieldType.STRING)
-                    .addNullableField("d_weekend", Schema.FieldType.STRING)
-                    .addNullableField("d_following_holiday", Schema.FieldType.STRING)
-                    .addNullableField("d_first_dom", Schema.FieldType.INT64)
-                    .addNullableField("d_last_dom", Schema.FieldType.INT64)
-                    .addNullableField("d_same_day_ly", Schema.FieldType.INT64)
-                    .addNullableField("d_same_day_lq", Schema.FieldType.INT64)
-                    .addNullableField("d_current_day", Schema.FieldType.STRING)
-                    .addNullableField("d_current_week", Schema.FieldType.STRING)
-                    .addNullableField("d_current_month", Schema.FieldType.STRING)
-                    .addNullableField("d_current_quarter", Schema.FieldType.STRING)
-                    .addNullableField("d_current_year", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema householdDemographicsSchema =
-            Schema.builder()
-                    .addField("hd_demo_sk", Schema.FieldType.INT64)
-                    .addNullableField("hd_income_band_sk", Schema.FieldType.INT64)
-                    .addNullableField("hd_buy_potential", Schema.FieldType.STRING)
-                    .addNullableField("hd_dep_count", Schema.FieldType.INT64)
-                    .addNullableField("hd_vehicle_count", Schema.FieldType.INT64)
-                    .build();
-
-    private static Schema incomeBandSchema =
-            Schema.builder()
-                    .addField("ib_income_band_sk", Schema.FieldType.INT64)
-                    .addNullableField("ib_lower_bound", Schema.FieldType.INT64)
-                    .addNullableField("ib_upper_bound", Schema.FieldType.INT64)
-                    .build();
-
-    private static Schema inventorySchema =
-            Schema.builder()
-                    .addField("inv_date_sk", Schema.FieldType.INT32)
-                    .addField("inv_item_sk", Schema.FieldType.INT32)
-                    .addField("inv_warehouse_sk", Schema.FieldType.INT32)
-                    .addNullableField("inv_quantity_on_hand", Schema.FieldType.INT32)
-                    .build();
-
-    private static Schema itemSchema =
-            Schema.builder()
-                    .addField("i_item_sk", Schema.FieldType.INT64)
-                    .addField("i_item_id", Schema.FieldType.STRING)
-                    .addNullableField("i_rec_start_date", Schema.FieldType.STRING)
-                    .addNullableField("i_rec_end_date", Schema.FieldType.STRING)
-                    .addNullableField("i_item_desc", Schema.FieldType.STRING)
-                    .addNullableField("i_current_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("i_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("i_brand_id", Schema.FieldType.INT64)
-                    .addNullableField("i_brand", Schema.FieldType.STRING)
-                    .addNullableField("i_class_id", Schema.FieldType.INT64)
-                    .addNullableField("i_class", Schema.FieldType.STRING)
-                    .addNullableField("i_category_id", Schema.FieldType.INT64)
-                    .addNullableField("i_category", Schema.FieldType.STRING)
-                    .addNullableField("i_manufact_id", Schema.FieldType.INT64)
-                    .addNullableField("i_manufact", Schema.FieldType.STRING)
-                    .addNullableField("i_size", Schema.FieldType.STRING)
-                    .addNullableField("i_formulation", Schema.FieldType.STRING)
-                    .addNullableField("i_color", Schema.FieldType.STRING)
-                    .addNullableField("i_units", Schema.FieldType.STRING)
-                    .addNullableField("i_container", Schema.FieldType.STRING)
-                    .addNullableField("i_manager_id", Schema.FieldType.INT64)
-                    .addNullableField("i_product_name", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema promotionSchema =
-            Schema.builder()
-                    .addField("p_promo_sk", Schema.FieldType.INT64)
-                    .addField("p_promo_id", Schema.FieldType.STRING)
-                    .addNullableField("p_start_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("p_end_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("p_item_sk", Schema.FieldType.INT64)
-                    .addNullableField("p_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("p_response_target", Schema.FieldType.INT64)
-                    .addNullableField("p_promo_name", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_dmail", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_email", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_catalog", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_tv", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_radio", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_press", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_event", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_demo", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_details", Schema.FieldType.STRING)
-                    .addNullableField("p_purpose", Schema.FieldType.STRING)
-                    .addNullableField("p_discount_active", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema reasonSchema =
-            Schema.builder()
-                    .addField("r_reason_sk", Schema.FieldType.INT64)
-                    .addField("r_reason_id", Schema.FieldType.STRING)
-                    .addNullableField("r_reason_desc", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema shipModeSchema =
-            Schema.builder()
-                    .addField("sm_ship_mode_sk", Schema.FieldType.INT64)
-                    .addField("sm_ship_mode_id", Schema.FieldType.STRING)
-                    .addNullableField("sm_type", Schema.FieldType.STRING)
-                    .addNullableField("sm_code", Schema.FieldType.STRING)
-                    .addNullableField("sm_carrier", Schema.FieldType.STRING)
-                    .addNullableField("sm_contract", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema storeSchema =
-            Schema.builder()
-                    .addField("s_store_sk", Schema.FieldType.INT64)
-                    .addField("s_store_id", Schema.FieldType.STRING)
-                    .addNullableField("s_rec_start_date", Schema.FieldType.STRING)
-                    .addNullableField("s_rec_end_date", Schema.FieldType.STRING)
-                    .addNullableField("s_closed_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("s_store_name", Schema.FieldType.STRING)
-                    .addNullableField("s_number_employees", Schema.FieldType.INT64)
-                    .addNullableField("s_floor_space", Schema.FieldType.INT64)
-                    .addNullableField("s_hours", Schema.FieldType.STRING)
-                    .addNullableField("S_manager", Schema.FieldType.STRING)
-                    .addNullableField("S_market_id", Schema.FieldType.INT64)
-                    .addNullableField("S_geography_class", Schema.FieldType.STRING)
-                    .addNullableField("S_market_desc", Schema.FieldType.STRING)
-                    .addNullableField("s_market_manager", Schema.FieldType.STRING)
-                    .addNullableField("s_division_id", Schema.FieldType.INT64)
-                    .addNullableField("s_division_name", Schema.FieldType.STRING)
-                    .addNullableField("s_company_id", Schema.FieldType.INT64)
-                    .addNullableField("s_company_name", Schema.FieldType.STRING)
-                    .addNullableField("s_street_number", Schema.FieldType.STRING)
-                    .addNullableField("s_street_name", Schema.FieldType.STRING)
-                    .addNullableField("s_street_type", Schema.FieldType.STRING)
-                    .addNullableField("s_suite_number", Schema.FieldType.STRING)
-                    .addNullableField("s_city", Schema.FieldType.STRING)
-                    .addNullableField("s_county", Schema.FieldType.STRING)
-                    .addNullableField("s_state", Schema.FieldType.STRING)
-                    .addNullableField("s_zip", Schema.FieldType.STRING)
-                    .addNullableField("s_country", Schema.FieldType.STRING)
-                    .addNullableField("s_gmt_offset", Schema.FieldType.DOUBLE)
-                    .addNullableField("s_tax_percentage", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema storeReturnsSchema =
-            Schema.builder()
-                    .addNullableField("sr_returned_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_return_time_sk", Schema.FieldType.INT64)
-                    .addField("sr_item_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_store_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_reason_sk", Schema.FieldType.INT64)
-                    .addField("sr_ticket_number", Schema.FieldType.INT64)
-                    .addNullableField("sr_return_quantity", Schema.FieldType.INT64)
-                    .addNullableField("sr_return_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_return_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_fee", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_return_ship_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_refunded_cash", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_reversed_charge", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_store_credit", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_net_loss", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema storeSalesSchema =
-            Schema.builder()
-                    .addNullableField("ss_sold_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_sold_time_sk", Schema.FieldType.INT64)
-                    .addField("ss_item_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_store_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_promo_sk", Schema.FieldType.INT64)
-                    .addField("ss_ticket_number", Schema.FieldType.INT64)
-                    .addNullableField("ss_quantity", Schema.FieldType.INT64)
-                    .addNullableField("ss_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_list_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_sales_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_ext_discount_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_ext_sales_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_ext_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_ext_list_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_ext_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_coupon_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_net_paid", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_net_paid_inc_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_net_profit", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema timeDimSchema =
-            Schema.builder()
-                    .addField("t_time_sk", Schema.FieldType.INT64)
-                    .addField("t_time_id", Schema.FieldType.STRING)
-                    .addNullableField("t_time", Schema.FieldType.INT64)
-                    .addNullableField("t_hour", Schema.FieldType.INT64)
-                    .addNullableField("t_minute", Schema.FieldType.INT64)
-                    .addNullableField("t_second", Schema.FieldType.INT64)
-                    .addNullableField("t_am_pm", Schema.FieldType.STRING)
-                    .addNullableField("t_shift", Schema.FieldType.STRING)
-                    .addNullableField("t_sub_shift", Schema.FieldType.STRING)
-                    .addNullableField("t_meal_time", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema warehouseSchema =
-            Schema.builder()
-                    .addField("w_warehouse_sk", Schema.FieldType.INT64)
-                    .addField("w_warehouse_id", Schema.FieldType.STRING)
-                    .addNullableField("w_warehouse_name", Schema.FieldType.STRING)
-                    .addNullableField("w_warehouse_sq_ft", Schema.FieldType.INT64)
-                    .addNullableField("w_street_number", Schema.FieldType.STRING)
-                    .addNullableField("w_street_name", Schema.FieldType.STRING)
-                    .addNullableField("w_street_type", Schema.FieldType.STRING)
-                    .addNullableField("w_suite_number", Schema.FieldType.STRING)
-                    .addNullableField("w_city", Schema.FieldType.STRING)
-                    .addNullableField("w_county", Schema.FieldType.STRING)
-                    .addNullableField("w_state", Schema.FieldType.STRING)
-                    .addNullableField("w_zip", Schema.FieldType.STRING)
-                    .addNullableField("w_country", Schema.FieldType.STRING)
-                    .addNullableField("w_gmt_offset", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema webPageSchema =
-            Schema.builder()
-                    .addField("wp_web_page_sk", Schema.FieldType.INT64)
-                    .addField("wp_web_page_id", Schema.FieldType.STRING)
-                    .addNullableField("wp_rec_start_date", Schema.FieldType.STRING)
-                    .addNullableField("wp_rec_end_date", Schema.FieldType.STRING)
-                    .addNullableField("wp_creation_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("wp_access_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("wp_autogen_flag", Schema.FieldType.STRING)
-                    .addNullableField("wp_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("wp_url", Schema.FieldType.STRING)
-                    .addNullableField("wp_type", Schema.FieldType.STRING)
-                    .addNullableField("wp_char_count", Schema.FieldType.INT64)
-                    .addNullableField("wp_link_count", Schema.FieldType.INT64)
-                    .addNullableField("wp_image_count", Schema.FieldType.INT64)
-                    .addNullableField("wp_max_ad_count", Schema.FieldType.INT64)
-                    .build();
-
-    private static Schema webReturnsSchema =
-            Schema.builder()
-                    .addNullableField("wr_returned_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_returned_time_sk", Schema.FieldType.INT64)
-                    .addField("wr_item_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_refunded_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_refunded_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_refunded_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_refunded_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_returning_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_returning_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_returning_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_returning_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_web_page_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_reason_sk", Schema.FieldType.INT64)
-                    .addField("wr_order_number", Schema.FieldType.INT64)
-                    .addNullableField("wr_return_quantity", Schema.FieldType.INT64)
-                    .addNullableField("wr_return_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_return_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_fee", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_return_ship_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_refunded_cash", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_reversed_charge", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_account_credit", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_net_loss", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema webSalesSchema =
-            Schema.builder()
-                    .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_ship_date_sk", Schema.FieldType.INT32)
-                    .addField("ws_item_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_bill_customer_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_bill_cdemo_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_bill_hdemo_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_bill_addr_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_ship_customer_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_ship_cdemo_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_ship_hdemo_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_ship_addr_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_web_page_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_web_site_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_ship_mode_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_warehouse_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_promo_sk", Schema.FieldType.INT32)
-                    .addField("ws_order_number", Schema.FieldType.INT64)
-                    .addNullableField("ws_quantity", Schema.FieldType.INT32)
-                    .addNullableField("ws_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_list_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_sales_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_ext_discount_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_ext_sales_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_ext_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_ext_list_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_ext_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_coupon_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_ext_ship_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_net_paid", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_net_paid_inc_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_net_paid_inc_ship", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_net_paid_inc_ship_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_net_profit", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema webSiteSchema =
-            Schema.builder()
-                    .addField("web_site_sk", Schema.FieldType.STRING)
-                    .addField("web_site_id", Schema.FieldType.STRING)
-                    .addNullableField("web_rec_start_date", Schema.FieldType.STRING)
-                    .addNullableField("web_rec_end_date", Schema.FieldType.STRING)
-                    .addNullableField("web_name", Schema.FieldType.STRING)
-                    .addNullableField("web_open_date_sk", Schema.FieldType.INT32)
-                    .addNullableField("web_close_date_sk", Schema.FieldType.INT32)
-                    .addNullableField("web_class", Schema.FieldType.STRING)
-                    .addNullableField("web_manager", Schema.FieldType.STRING)
-                    .addNullableField("web_mkt_id", Schema.FieldType.INT32)
-                    .addNullableField("web_mkt_class", Schema.FieldType.STRING)
-                    .addNullableField("web_mkt_desc", Schema.FieldType.STRING)
-                    .addNullableField("web_market_manager", Schema.FieldType.STRING)
-                    .addNullableField("web_company_id", Schema.FieldType.INT32)
-                    .addNullableField("web_company_name", Schema.FieldType.STRING)
-                    .addNullableField("web_street_number", Schema.FieldType.STRING)
-                    .addNullableField("web_street_name", Schema.FieldType.STRING)
-                    .addNullableField("web_street_type", Schema.FieldType.STRING)
-                    .addNullableField("web_suite_number", Schema.FieldType.STRING)
-                    .addNullableField("web_city", Schema.FieldType.STRING)
-                    .addNullableField("web_county", Schema.FieldType.STRING)
-                    .addNullableField("web_state", Schema.FieldType.STRING)
-                    .addNullableField("web_zip", Schema.FieldType.STRING)
-                    .addNullableField("web_country", Schema.FieldType.STRING)
-                    .addNullableField("web_gmt_offset", Schema.FieldType.DOUBLE)
-                    .addNullableField("web_tax_percentage", Schema.FieldType.DOUBLE)
-                    .build();
+      Schema tableSchema = schemaBuilder.build();
+      schemaMap.put(tableName, tableSchema);
+    }
+    return schemaMap;
+  }
+
+  /**
+   * Get all tpcds table schemas according to tpcds official specification. Some fields are set to
+   * be not nullable.
+   *
+   * @return A map of all tpcds table schemas with their table names as keys.
+   */
+  public static Map<String, Schema> getTpcdsSchemasImmutableMap() {
+    ImmutableMap<String, Schema> immutableSchemaMap =
+        ImmutableMap.<String, Schema>builder()
+            .put("call_center", callCenterSchema)
+            .put("catalog_page", catalogPageSchema)
+            .put("catalog_returns", catalogReturnsSchema)
+            .put("catalog_sales", catalogSalesSchema)
+            .put("customer", customerSchema)
+            .put("customer_address", customerAddressSchema)
+            .put("customer_demographics", customerDemographicsSchema)
+            .put("date_dim", dateDimSchema)
+            .put("household_demographics", householdDemographicsSchema)
+            .put("income_band", incomeBandSchema)
+            .put("inventory", inventorySchema)
+            .put("item", itemSchema)
+            .put("promotion", promotionSchema)
+            .put("reason", reasonSchema)
+            .put("ship_mode", shipModeSchema)
+            .put("store", storeSchema)
+            .put("store_returns", storeReturnsSchema)
+            .put("store_sales", storeSalesSchema)
+            .put("time_dim", timeDimSchema)
+            .put("warehouse", warehouseSchema)
+            .put("web_page", webPageSchema)
+            .put("web_returns", webReturnsSchema)
+            .put("web_sales", webSalesSchema)
+            .put("web_site", webSiteSchema)
+            .build();
+    return immutableSchemaMap;
+  }
+
+  public static Schema getCallCenterSchema() {
+    return callCenterSchema;
+  }
+
+  public static Schema getCatalogPageSchema() {
+    return catalogPageSchema;
+  }
+
+  public static Schema getCatalogReturnsSchema() {
+    return catalogReturnsSchema;
+  }
+
+  public static Schema getCatalogSalesSchema() {
+    return catalogSalesSchema;
+  }
+
+  public static Schema getCustomerSchema() {
+    return customerSchema;
+  }
+
+  public static Schema getCustomerAddressSchema() {
+    return customerAddressSchema;
+  }
+
+  public static Schema getCustomerDemographicsSchema() {
+    return customerDemographicsSchema;
+  }
+
+  public static Schema getDateDimSchema() {
+    return dateDimSchema;
+  }
+
+  public static Schema getHouseholdDemographicsSchema() {
+    return householdDemographicsSchema;
+  }
+
+  public static Schema getIncomeBandSchema() {
+    return incomeBandSchema;
+  }
+
+  public static Schema getInventorySchema() {
+    return inventorySchema;
+  }
+
+  public static Schema getItemSchema() {
+    return itemSchema;
+  }
+
+  public static Schema getPromotionSchema() {
+    return promotionSchema;
+  }
+
+  public static Schema getReasonSchema() {
+    return reasonSchema;
+  }
+
+  public static Schema getShipModeSchema() {
+    return shipModeSchema;
+  }
+
+  public static Schema getStoreSchema() {
+    return storeSchema;
+  }
+
+  public static Schema getStoreReturnsSchema() {
+    return storeReturnsSchema;
+  }
+
+  public static Schema getStoreSalesSchema() {
+    return storeSalesSchema;
+  }
+
+  public static Schema getTimeDimSchema() {
+    return timeDimSchema;
+  }
+
+  public static Schema getWarehouseSchema() {
+    return warehouseSchema;
+  }
+
+  public static Schema getWebpageSchema() {
+    return webPageSchema;
+  }
+
+  public static Schema getWebReturnsSchema() {
+    return webReturnsSchema;
+  }
+
+  public static Schema getWebSalesSchema() {
+    return webSalesSchema;
+  }
+
+  public static Schema getWebSiteSchema() {
+    return webSiteSchema;
+  }
+
+  private static Schema callCenterSchema =
+      Schema.builder()
+          .addField("cc_call_center_sk", Schema.FieldType.INT64)
+          .addField("cc_call_center_id", Schema.FieldType.STRING)
+          .addNullableField("cc_rec_start_date", Schema.FieldType.STRING)
+          .addNullableField("cc_rec_end_date", Schema.FieldType.STRING)
+          .addNullableField("cc_closed_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cc_open_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cc_name", Schema.FieldType.STRING)
+          .addNullableField("cc_class", Schema.FieldType.STRING)
+          .addNullableField("cc_employees", Schema.FieldType.INT64)
+          .addNullableField("cc_sq_ft", Schema.FieldType.INT64)
+          .addNullableField("cc_hours", Schema.FieldType.STRING)
+          .addNullableField("cc_manager", Schema.FieldType.STRING)
+          .addNullableField("cc_mkt_id", Schema.FieldType.INT64)
+          .addNullableField("cc_mkt_class", Schema.FieldType.STRING)
+          .addNullableField("cc_mkt_desc", Schema.FieldType.STRING)
+          .addNullableField("cc_market_manager", Schema.FieldType.STRING)
+          .addNullableField("cc_division", Schema.FieldType.INT64)
+          .addNullableField("cc_division_name", Schema.FieldType.STRING)
+          .addNullableField("cc_company", Schema.FieldType.INT64)
+          .addNullableField("cc_company_name", Schema.FieldType.STRING)
+          .addNullableField("cc_street_number", Schema.FieldType.STRING)
+          .addNullableField("cc_street_name", Schema.FieldType.STRING)
+          .addNullableField("cc_street_type", Schema.FieldType.STRING)
+          .addNullableField("cc_suite_number", Schema.FieldType.STRING)
+          .addNullableField("cc_city", Schema.FieldType.STRING)
+          .addNullableField("cc_county", Schema.FieldType.STRING)
+          .addNullableField("cc_state", Schema.FieldType.STRING)
+          .addNullableField("cc_zip", Schema.FieldType.STRING)
+          .addNullableField("cc_country", Schema.FieldType.STRING)
+          .addNullableField("cc_gmt_offset", Schema.FieldType.DOUBLE)
+          .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema catalogPageSchema =
+      Schema.builder()
+          .addField("cp_catalog_page_sk", Schema.FieldType.INT64)
+          .addField("cp_catalog_page_id", Schema.FieldType.STRING)
+          .addNullableField("cp_start_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cp_end_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cp_department", Schema.FieldType.STRING)
+          .addNullableField("cp_catalog_number", Schema.FieldType.INT64)
+          .addNullableField("cp_catalog_page_number", Schema.FieldType.INT64)
+          .addNullableField("cp_description", Schema.FieldType.STRING)
+          .addNullableField("cp_type", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema catalogReturnsSchema =
+      Schema.builder()
+          .addNullableField("cr_returned_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_returned_time_sk", Schema.FieldType.INT64)
+          .addField("cr_item_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_refunded_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_refunded_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_refunded_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_refunded_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_returning_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_returning_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_returning_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_returning_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_call_center_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_catalog_page_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_ship_mode_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_warehouse_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_reason_sk", Schema.FieldType.INT64)
+          .addField("cr_order_number", Schema.FieldType.INT64)
+          .addNullableField("cr_return_quantity", Schema.FieldType.INT64)
+          .addNullableField("cr_return_amount", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_return_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_fee", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_return_ship_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_refunded_cash", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_reversed_charge", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_store_credit", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_net_loss", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema catalogSalesSchema =
+      Schema.builder()
+          .addNullableField("cs_sold_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_sold_time_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_ship_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_bill_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_bill_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_bill_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_bill_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_ship_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_ship_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_ship_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_ship_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_call_center_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_catalog_page_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_ship_mode_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_warehouse_sk", Schema.FieldType.INT64)
+          .addField("cs_item_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_promo_sk", Schema.FieldType.INT64)
+          .addField("cs_order_number", Schema.FieldType.INT64)
+          .addNullableField("cs_quantity", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_list_price", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_sales_price", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_ext_discount_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_ext_sales_price", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_ext_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_ext_list_price", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_ext_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_coupon_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_ext_ship_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_net_paid", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_net_paid_inc_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_net_paid_inc_ship", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_net_paid_inc_ship_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_net_profit", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema customerSchema =
+      Schema.builder()
+          .addField("c_customer_sk", Schema.FieldType.INT64)
+          .addField("c_customer_id", Schema.FieldType.STRING)
+          .addNullableField("c_current_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("c_current_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("c_current_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("c_first_shipto_date_sk", Schema.FieldType.INT64)
+          .addNullableField("c_first_sales_date_sk", Schema.FieldType.INT64)
+          .addNullableField("c_salutation", Schema.FieldType.STRING)
+          .addNullableField("c_first_name", Schema.FieldType.STRING)
+          .addNullableField("c_last_name", Schema.FieldType.STRING)
+          .addNullableField("c_preferred_cust_flag", Schema.FieldType.STRING)
+          .addNullableField("c_birth_day", Schema.FieldType.INT64)
+          .addNullableField("c_birth_month", Schema.FieldType.INT64)
+          .addNullableField("c_birth_year", Schema.FieldType.INT64)
+          .addNullableField("c_birth_country", Schema.FieldType.STRING)
+          .addNullableField("c_login", Schema.FieldType.STRING)
+          .addNullableField("c_email_address", Schema.FieldType.STRING)
+          .addNullableField("c_last_review_date_sk", Schema.FieldType.INT64)
+          .build();
+
+  private static Schema customerAddressSchema =
+      Schema.builder()
+          .addField("ca_address_sk", Schema.FieldType.INT64)
+          .addField("ca_address_id", Schema.FieldType.STRING)
+          .addNullableField("ca_street_number", Schema.FieldType.STRING)
+          .addNullableField("ca_street_name", Schema.FieldType.STRING)
+          .addNullableField("ca_street_type", Schema.FieldType.STRING)
+          .addNullableField("ca_suite_number", Schema.FieldType.STRING)
+          .addNullableField("ca_city", Schema.FieldType.STRING)
+          .addNullableField("ca_county", Schema.FieldType.STRING)
+          .addNullableField("ca_state", Schema.FieldType.STRING)
+          .addNullableField("ca_zip", Schema.FieldType.STRING)
+          .addNullableField("ca_country", Schema.FieldType.STRING)
+          .addNullableField("ca_gmt_offset", Schema.FieldType.DOUBLE)
+          .addNullableField("ca_location_type", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema customerDemographicsSchema =
+      Schema.builder()
+          .addField("cd_demo_sk", Schema.FieldType.INT64)
+          .addNullableField("cd_gender", Schema.FieldType.STRING)
+          .addNullableField("cd_marital_status", Schema.FieldType.STRING)
+          .addNullableField("cd_education_status", Schema.FieldType.STRING)
+          .addNullableField("cd_purchase_estimate", Schema.FieldType.INT64)
+          .addNullableField("cd_credit_rating", Schema.FieldType.STRING)
+          .addNullableField("cd_dep_count", Schema.FieldType.INT64)
+          .addNullableField("cd_dep_employed_count", Schema.FieldType.INT64)
+          .addNullableField("cd_dep_college_count", Schema.FieldType.INT64)
+          .build();
+
+  private static Schema dateDimSchema =
+      Schema.builder()
+          .addField("d_date_sk", Schema.FieldType.INT64)
+          .addField("d_date_id", Schema.FieldType.STRING)
+          .addNullableField("d_date", Schema.FieldType.STRING)
+          .addNullableField("d_month_seq", Schema.FieldType.INT64)
+          .addNullableField("d_week_seq", Schema.FieldType.INT64)
+          .addNullableField("d_quarter_seq", Schema.FieldType.INT64)
+          .addNullableField("d_year", Schema.FieldType.INT64)
+          .addNullableField("d_dow", Schema.FieldType.INT64)
+          .addNullableField("d_moy", Schema.FieldType.INT64)
+          .addNullableField("d_dom", Schema.FieldType.INT64)
+          .addNullableField("d_qoy", Schema.FieldType.INT64)
+          .addNullableField("d_fy_year", Schema.FieldType.INT64)
+          .addNullableField("d_fy_quarter_seq", Schema.FieldType.INT64)
+          .addNullableField("d_fy_week_seq", Schema.FieldType.INT64)
+          .addNullableField("d_day_name", Schema.FieldType.STRING)
+          .addNullableField("d_quarter_name", Schema.FieldType.STRING)
+          .addNullableField("d_holiday", Schema.FieldType.STRING)
+          .addNullableField("d_weekend", Schema.FieldType.STRING)
+          .addNullableField("d_following_holiday", Schema.FieldType.STRING)
+          .addNullableField("d_first_dom", Schema.FieldType.INT64)
+          .addNullableField("d_last_dom", Schema.FieldType.INT64)
+          .addNullableField("d_same_day_ly", Schema.FieldType.INT64)
+          .addNullableField("d_same_day_lq", Schema.FieldType.INT64)
+          .addNullableField("d_current_day", Schema.FieldType.STRING)
+          .addNullableField("d_current_week", Schema.FieldType.STRING)
+          .addNullableField("d_current_month", Schema.FieldType.STRING)
+          .addNullableField("d_current_quarter", Schema.FieldType.STRING)
+          .addNullableField("d_current_year", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema householdDemographicsSchema =
+      Schema.builder()
+          .addField("hd_demo_sk", Schema.FieldType.INT64)
+          .addNullableField("hd_income_band_sk", Schema.FieldType.INT64)
+          .addNullableField("hd_buy_potential", Schema.FieldType.STRING)
+          .addNullableField("hd_dep_count", Schema.FieldType.INT64)
+          .addNullableField("hd_vehicle_count", Schema.FieldType.INT64)
+          .build();
+
+  private static Schema incomeBandSchema =
+      Schema.builder()
+          .addField("ib_income_band_sk", Schema.FieldType.INT64)
+          .addNullableField("ib_lower_bound", Schema.FieldType.INT64)
+          .addNullableField("ib_upper_bound", Schema.FieldType.INT64)
+          .build();
+
+  private static Schema inventorySchema =
+      Schema.builder()
+          .addField("inv_date_sk", Schema.FieldType.INT32)
+          .addField("inv_item_sk", Schema.FieldType.INT32)
+          .addField("inv_warehouse_sk", Schema.FieldType.INT32)
+          .addNullableField("inv_quantity_on_hand", Schema.FieldType.INT32)
+          .build();
+
+  private static Schema itemSchema =
+      Schema.builder()
+          .addField("i_item_sk", Schema.FieldType.INT64)
+          .addField("i_item_id", Schema.FieldType.STRING)
+          .addNullableField("i_rec_start_date", Schema.FieldType.STRING)
+          .addNullableField("i_rec_end_date", Schema.FieldType.STRING)
+          .addNullableField("i_item_desc", Schema.FieldType.STRING)
+          .addNullableField("i_current_price", Schema.FieldType.DOUBLE)
+          .addNullableField("i_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("i_brand_id", Schema.FieldType.INT64)
+          .addNullableField("i_brand", Schema.FieldType.STRING)
+          .addNullableField("i_class_id", Schema.FieldType.INT64)
+          .addNullableField("i_class", Schema.FieldType.STRING)
+          .addNullableField("i_category_id", Schema.FieldType.INT64)
+          .addNullableField("i_category", Schema.FieldType.STRING)
+          .addNullableField("i_manufact_id", Schema.FieldType.INT64)
+          .addNullableField("i_manufact", Schema.FieldType.STRING)
+          .addNullableField("i_size", Schema.FieldType.STRING)
+          .addNullableField("i_formulation", Schema.FieldType.STRING)
+          .addNullableField("i_color", Schema.FieldType.STRING)
+          .addNullableField("i_units", Schema.FieldType.STRING)
+          .addNullableField("i_container", Schema.FieldType.STRING)
+          .addNullableField("i_manager_id", Schema.FieldType.INT64)
+          .addNullableField("i_product_name", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema promotionSchema =
+      Schema.builder()
+          .addField("p_promo_sk", Schema.FieldType.INT64)
+          .addField("p_promo_id", Schema.FieldType.STRING)
+          .addNullableField("p_start_date_sk", Schema.FieldType.INT64)
+          .addNullableField("p_end_date_sk", Schema.FieldType.INT64)
+          .addNullableField("p_item_sk", Schema.FieldType.INT64)
+          .addNullableField("p_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("p_response_target", Schema.FieldType.INT64)
+          .addNullableField("p_promo_name", Schema.FieldType.STRING)
+          .addNullableField("p_channel_dmail", Schema.FieldType.STRING)
+          .addNullableField("p_channel_email", Schema.FieldType.STRING)
+          .addNullableField("p_channel_catalog", Schema.FieldType.STRING)
+          .addNullableField("p_channel_tv", Schema.FieldType.STRING)
+          .addNullableField("p_channel_radio", Schema.FieldType.STRING)
+          .addNullableField("p_channel_press", Schema.FieldType.STRING)
+          .addNullableField("p_channel_event", Schema.FieldType.STRING)
+          .addNullableField("p_channel_demo", Schema.FieldType.STRING)
+          .addNullableField("p_channel_details", Schema.FieldType.STRING)
+          .addNullableField("p_purpose", Schema.FieldType.STRING)
+          .addNullableField("p_discount_active", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema reasonSchema =
+      Schema.builder()
+          .addField("r_reason_sk", Schema.FieldType.INT64)
+          .addField("r_reason_id", Schema.FieldType.STRING)
+          .addNullableField("r_reason_desc", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema shipModeSchema =
+      Schema.builder()
+          .addField("sm_ship_mode_sk", Schema.FieldType.INT64)
+          .addField("sm_ship_mode_id", Schema.FieldType.STRING)
+          .addNullableField("sm_type", Schema.FieldType.STRING)
+          .addNullableField("sm_code", Schema.FieldType.STRING)
+          .addNullableField("sm_carrier", Schema.FieldType.STRING)
+          .addNullableField("sm_contract", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema storeSchema =
+      Schema.builder()
+          .addField("s_store_sk", Schema.FieldType.INT64)
+          .addField("s_store_id", Schema.FieldType.STRING)
+          .addNullableField("s_rec_start_date", Schema.FieldType.STRING)
+          .addNullableField("s_rec_end_date", Schema.FieldType.STRING)
+          .addNullableField("s_closed_date_sk", Schema.FieldType.INT64)
+          .addNullableField("s_store_name", Schema.FieldType.STRING)
+          .addNullableField("s_number_employees", Schema.FieldType.INT64)
+          .addNullableField("s_floor_space", Schema.FieldType.INT64)
+          .addNullableField("s_hours", Schema.FieldType.STRING)
+          .addNullableField("S_manager", Schema.FieldType.STRING)
+          .addNullableField("S_market_id", Schema.FieldType.INT64)
+          .addNullableField("S_geography_class", Schema.FieldType.STRING)
+          .addNullableField("S_market_desc", Schema.FieldType.STRING)
+          .addNullableField("s_market_manager", Schema.FieldType.STRING)
+          .addNullableField("s_division_id", Schema.FieldType.INT64)
+          .addNullableField("s_division_name", Schema.FieldType.STRING)
+          .addNullableField("s_company_id", Schema.FieldType.INT64)
+          .addNullableField("s_company_name", Schema.FieldType.STRING)
+          .addNullableField("s_street_number", Schema.FieldType.STRING)
+          .addNullableField("s_street_name", Schema.FieldType.STRING)
+          .addNullableField("s_street_type", Schema.FieldType.STRING)
+          .addNullableField("s_suite_number", Schema.FieldType.STRING)
+          .addNullableField("s_city", Schema.FieldType.STRING)
+          .addNullableField("s_county", Schema.FieldType.STRING)
+          .addNullableField("s_state", Schema.FieldType.STRING)
+          .addNullableField("s_zip", Schema.FieldType.STRING)
+          .addNullableField("s_country", Schema.FieldType.STRING)
+          .addNullableField("s_gmt_offset", Schema.FieldType.DOUBLE)
+          .addNullableField("s_tax_percentage", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema storeReturnsSchema =
+      Schema.builder()
+          .addNullableField("sr_returned_date_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_return_time_sk", Schema.FieldType.INT64)
+          .addField("sr_item_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_store_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_reason_sk", Schema.FieldType.INT64)
+          .addField("sr_ticket_number", Schema.FieldType.INT64)
+          .addNullableField("sr_return_quantity", Schema.FieldType.INT64)
+          .addNullableField("sr_return_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_return_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_fee", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_return_ship_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_refunded_cash", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_reversed_charge", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_store_credit", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_net_loss", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema storeSalesSchema =
+      Schema.builder()
+          .addNullableField("ss_sold_date_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_sold_time_sk", Schema.FieldType.INT64)
+          .addField("ss_item_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_store_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_promo_sk", Schema.FieldType.INT64)
+          .addField("ss_ticket_number", Schema.FieldType.INT64)
+          .addNullableField("ss_quantity", Schema.FieldType.INT64)
+          .addNullableField("ss_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_list_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_sales_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_ext_discount_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_ext_sales_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_ext_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_ext_list_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_ext_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_coupon_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_net_paid", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_net_paid_inc_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_net_profit", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema timeDimSchema =
+      Schema.builder()
+          .addField("t_time_sk", Schema.FieldType.INT64)
+          .addField("t_time_id", Schema.FieldType.STRING)
+          .addNullableField("t_time", Schema.FieldType.INT64)
+          .addNullableField("t_hour", Schema.FieldType.INT64)
+          .addNullableField("t_minute", Schema.FieldType.INT64)
+          .addNullableField("t_second", Schema.FieldType.INT64)
+          .addNullableField("t_am_pm", Schema.FieldType.STRING)
+          .addNullableField("t_shift", Schema.FieldType.STRING)
+          .addNullableField("t_sub_shift", Schema.FieldType.STRING)
+          .addNullableField("t_meal_time", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema warehouseSchema =
+      Schema.builder()
+          .addField("w_warehouse_sk", Schema.FieldType.INT64)
+          .addField("w_warehouse_id", Schema.FieldType.STRING)
+          .addNullableField("w_warehouse_name", Schema.FieldType.STRING)
+          .addNullableField("w_warehouse_sq_ft", Schema.FieldType.INT64)
+          .addNullableField("w_street_number", Schema.FieldType.STRING)
+          .addNullableField("w_street_name", Schema.FieldType.STRING)
+          .addNullableField("w_street_type", Schema.FieldType.STRING)
+          .addNullableField("w_suite_number", Schema.FieldType.STRING)
+          .addNullableField("w_city", Schema.FieldType.STRING)
+          .addNullableField("w_county", Schema.FieldType.STRING)
+          .addNullableField("w_state", Schema.FieldType.STRING)
+          .addNullableField("w_zip", Schema.FieldType.STRING)
+          .addNullableField("w_country", Schema.FieldType.STRING)
+          .addNullableField("w_gmt_offset", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema webPageSchema =
+      Schema.builder()
+          .addField("wp_web_page_sk", Schema.FieldType.INT64)
+          .addField("wp_web_page_id", Schema.FieldType.STRING)
+          .addNullableField("wp_rec_start_date", Schema.FieldType.STRING)
+          .addNullableField("wp_rec_end_date", Schema.FieldType.STRING)
+          .addNullableField("wp_creation_date_sk", Schema.FieldType.INT64)
+          .addNullableField("wp_access_date_sk", Schema.FieldType.INT64)
+          .addNullableField("wp_autogen_flag", Schema.FieldType.STRING)
+          .addNullableField("wp_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("wp_url", Schema.FieldType.STRING)
+          .addNullableField("wp_type", Schema.FieldType.STRING)
+          .addNullableField("wp_char_count", Schema.FieldType.INT64)
+          .addNullableField("wp_link_count", Schema.FieldType.INT64)
+          .addNullableField("wp_image_count", Schema.FieldType.INT64)
+          .addNullableField("wp_max_ad_count", Schema.FieldType.INT64)
+          .build();
+
+  private static Schema webReturnsSchema =
+      Schema.builder()
+          .addNullableField("wr_returned_date_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_returned_time_sk", Schema.FieldType.INT64)
+          .addField("wr_item_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_refunded_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_refunded_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_refunded_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_refunded_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_returning_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_returning_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_returning_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_returning_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_web_page_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_reason_sk", Schema.FieldType.INT64)
+          .addField("wr_order_number", Schema.FieldType.INT64)
+          .addNullableField("wr_return_quantity", Schema.FieldType.INT64)
+          .addNullableField("wr_return_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_return_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_fee", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_return_ship_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_refunded_cash", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_reversed_charge", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_account_credit", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_net_loss", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema webSalesSchema =
+      Schema.builder()
+          .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_ship_date_sk", Schema.FieldType.INT32)
+          .addField("ws_item_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_bill_customer_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_bill_cdemo_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_bill_hdemo_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_bill_addr_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_ship_customer_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_ship_cdemo_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_ship_hdemo_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_ship_addr_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_web_page_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_web_site_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_ship_mode_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_warehouse_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_promo_sk", Schema.FieldType.INT32)
+          .addField("ws_order_number", Schema.FieldType.INT64)
+          .addNullableField("ws_quantity", Schema.FieldType.INT32)
+          .addNullableField("ws_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_list_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_sales_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_ext_discount_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_ext_sales_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_ext_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_ext_list_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_ext_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_coupon_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_ext_ship_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_net_paid", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_net_paid_inc_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_net_paid_inc_ship", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_net_paid_inc_ship_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_net_profit", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema webSiteSchema =
+      Schema.builder()
+          .addField("web_site_sk", Schema.FieldType.STRING)
+          .addField("web_site_id", Schema.FieldType.STRING)
+          .addNullableField("web_rec_start_date", Schema.FieldType.STRING)
+          .addNullableField("web_rec_end_date", Schema.FieldType.STRING)
+          .addNullableField("web_name", Schema.FieldType.STRING)
+          .addNullableField("web_open_date_sk", Schema.FieldType.INT32)
+          .addNullableField("web_close_date_sk", Schema.FieldType.INT32)
+          .addNullableField("web_class", Schema.FieldType.STRING)
+          .addNullableField("web_manager", Schema.FieldType.STRING)
+          .addNullableField("web_mkt_id", Schema.FieldType.INT32)
+          .addNullableField("web_mkt_class", Schema.FieldType.STRING)
+          .addNullableField("web_mkt_desc", Schema.FieldType.STRING)
+          .addNullableField("web_market_manager", Schema.FieldType.STRING)
+          .addNullableField("web_company_id", Schema.FieldType.INT32)
+          .addNullableField("web_company_name", Schema.FieldType.STRING)
+          .addNullableField("web_street_number", Schema.FieldType.STRING)
+          .addNullableField("web_street_name", Schema.FieldType.STRING)
+          .addNullableField("web_street_type", Schema.FieldType.STRING)
+          .addNullableField("web_suite_number", Schema.FieldType.STRING)
+          .addNullableField("web_city", Schema.FieldType.STRING)
+          .addNullableField("web_county", Schema.FieldType.STRING)
+          .addNullableField("web_state", Schema.FieldType.STRING)
+          .addNullableField("web_zip", Schema.FieldType.STRING)
+          .addNullableField("web_country", Schema.FieldType.STRING)
+          .addNullableField("web_gmt_offset", Schema.FieldType.DOUBLE)
+          .addNullableField("web_tax_percentage", Schema.FieldType.DOUBLE)
+          .build();
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/package-info.java
similarity index 59%
copy from sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
copy to sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/package-info.java
index d1ddc9d..8c2f339 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/package-info.java
@@ -15,19 +15,5 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+/** TPC-DS test suite. */
 package org.apache.beam.sdk.tpcds;
-
-import com.google.auto.service.AutoService;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
-
-/** {@link AutoService} registrar for {@link TpcdsOptions}. */
-@AutoService(PipelineOptionsRegistrar.class)
-public class TpcdsOptionsRegistrar implements PipelineOptionsRegistrar{
-
-    @Override
-    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-        return ImmutableList.of(TpcdsOptions.class);
-    }
-}
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/QueryReaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/QueryReaderTest.java
index 5696410..42f7d5b 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/QueryReaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/QueryReaderTest.java
@@ -18,188 +18,193 @@
 package org.apache.beam.sdk.tpcds;
 
 import static org.junit.Assert.assertEquals;
+
 import org.junit.Test;
 
 public class QueryReaderTest {
-    private final String headers = "-- Licensed to the Apache Software Foundation (ASF) under one\n" +
-            "-- or more contributor license agreements.  See the NOTICE file\n" +
-            "-- distributed with this work for additional information\n" +
-            "-- regarding copyright ownership.  The ASF licenses this file\n" +
-            "-- to you under the Apache License, Version 2.0 (the\n" +
-            "-- \"License\"); you may not use this file except in compliance\n" +
-            "-- with the License.  You may obtain a copy of the License at\n" +
-            "--\n" +
-            "--     http://www.apache.org/licenses/LICENSE-2.0\n" +
-            "--\n" +
-            "-- Unless required by applicable law or agreed to in writing, software\n" +
-            "-- distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
-            "-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
-            "-- See the License for the specific language governing permissions and\n" +
-            "-- limitations under the License.\n";
+  private final String headers =
+      "-- Licensed to the Apache Software Foundation (ASF) under one\n"
+          + "-- or more contributor license agreements.  See the NOTICE file\n"
+          + "-- distributed with this work for additional information\n"
+          + "-- regarding copyright ownership.  The ASF licenses this file\n"
+          + "-- to you under the Apache License, Version 2.0 (the\n"
+          + "-- \"License\"); you may not use this file except in compliance\n"
+          + "-- with the License.  You may obtain a copy of the License at\n"
+          + "--\n"
+          + "--     http://www.apache.org/licenses/LICENSE-2.0\n"
+          + "--\n"
+          + "-- Unless required by applicable law or agreed to in writing, software\n"
+          + "-- distributed under the License is distributed on an \"AS IS\" BASIS,\n"
+          + "-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n"
+          + "-- See the License for the specific language governing permissions and\n"
+          + "-- limitations under the License.\n";
 
-    @Test
-    public void testQuery3String() throws Exception {
-        String query3String = QueryReader.readQuery("query3");
-        String expected = "select  dt.d_year \n" +
-                "       ,item.i_brand_id brand_id \n" +
-                "       ,item.i_brand brand\n" +
-                "       ,sum(ss_ext_sales_price) sum_agg\n" +
-                " from  date_dim dt \n" +
-                "      ,store_sales\n" +
-                "      ,item\n" +
-                " where dt.d_date_sk = store_sales.ss_sold_date_sk\n" +
-                "   and store_sales.ss_item_sk = item.i_item_sk\n" +
-                "   and item.i_manufact_id = 436\n" +
-                "   and dt.d_moy=12\n" +
-                " group by dt.d_year\n" +
-                "      ,item.i_brand\n" +
-                "      ,item.i_brand_id\n" +
-                " order by dt.d_year\n" +
-                "         ,sum_agg desc\n" +
-                "         ,brand_id\n" +
-                " limit 100";
-        String query3StringNoSpaces = query3String.replaceAll("\\s+", "");
-        String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
-        assertEquals(expectedNoSpaces, query3StringNoSpaces);
-    }
+  @Test
+  public void testQuery3String() throws Exception {
+    String query3String = QueryReader.readQuery("query3");
+    String expected =
+        "select  dt.d_year \n"
+            + "       ,item.i_brand_id brand_id \n"
+            + "       ,item.i_brand brand\n"
+            + "       ,sum(ss_ext_sales_price) sum_agg\n"
+            + " from  date_dim dt \n"
+            + "      ,store_sales\n"
+            + "      ,item\n"
+            + " where dt.d_date_sk = store_sales.ss_sold_date_sk\n"
+            + "   and store_sales.ss_item_sk = item.i_item_sk\n"
+            + "   and item.i_manufact_id = 436\n"
+            + "   and dt.d_moy=12\n"
+            + " group by dt.d_year\n"
+            + "      ,item.i_brand\n"
+            + "      ,item.i_brand_id\n"
+            + " order by dt.d_year\n"
+            + "         ,sum_agg desc\n"
+            + "         ,brand_id\n"
+            + " limit 100";
+    String query3StringNoSpaces = query3String.replaceAll("\\s+", "");
+    String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
+    assertEquals(expectedNoSpaces, query3StringNoSpaces);
+  }
 
-    @Test
-    public void testQuery4String() throws Exception {
-        String query4String = QueryReader.readQuery("query4");
-        String expected = "with year_total as (\n" +
-                " select c_customer_id customer_id\n" +
-                "       ,c_first_name customer_first_name\n" +
-                "       ,c_last_name customer_last_name\n" +
-                "       ,c_preferred_cust_flag customer_preferred_cust_flag\n" +
-                "       ,c_birth_country customer_birth_country\n" +
-                "       ,c_login customer_login\n" +
-                "       ,c_email_address customer_email_address\n" +
-                "       ,d_year dyear\n" +
-                "       ,sum(((ss_ext_list_price-ss_ext_wholesale_cost-ss_ext_discount_amt)+ss_ext_sales_price)/2) year_total\n" +
-                "       ,'s' sale_type\n" +
-                " from customer\n" +
-                "     ,store_sales\n" +
-                "     ,date_dim\n" +
-                " where c_customer_sk = ss_customer_sk\n" +
-                "   and ss_sold_date_sk = d_date_sk\n" +
-                " group by c_customer_id\n" +
-                "         ,c_first_name\n" +
-                "         ,c_last_name\n" +
-                "         ,c_preferred_cust_flag\n" +
-                "         ,c_birth_country\n" +
-                "         ,c_login\n" +
-                "         ,c_email_address\n" +
-                "         ,d_year\n" +
-                " union all\n" +
-                " select c_customer_id customer_id\n" +
-                "       ,c_first_name customer_first_name\n" +
-                "       ,c_last_name customer_last_name\n" +
-                "       ,c_preferred_cust_flag customer_preferred_cust_flag\n" +
-                "       ,c_birth_country customer_birth_country\n" +
-                "       ,c_login customer_login\n" +
-                "       ,c_email_address customer_email_address\n" +
-                "       ,d_year dyear\n" +
-                "       ,sum((((cs_ext_list_price-cs_ext_wholesale_cost-cs_ext_discount_amt)+cs_ext_sales_price)/2) ) year_total\n" +
-                "       ,'c' sale_type\n" +
-                " from customer\n" +
-                "     ,catalog_sales\n" +
-                "     ,date_dim\n" +
-                " where c_customer_sk = cs_bill_customer_sk\n" +
-                "   and cs_sold_date_sk = d_date_sk\n" +
-                " group by c_customer_id\n" +
-                "         ,c_first_name\n" +
-                "         ,c_last_name\n" +
-                "         ,c_preferred_cust_flag\n" +
-                "         ,c_birth_country\n" +
-                "         ,c_login\n" +
-                "         ,c_email_address\n" +
-                "         ,d_year\n" +
-                "union all\n" +
-                " select c_customer_id customer_id\n" +
-                "       ,c_first_name customer_first_name\n" +
-                "       ,c_last_name customer_last_name\n" +
-                "       ,c_preferred_cust_flag customer_preferred_cust_flag\n" +
-                "       ,c_birth_country customer_birth_country\n" +
-                "       ,c_login customer_login\n" +
-                "       ,c_email_address customer_email_address\n" +
-                "       ,d_year dyear\n" +
-                "       ,sum((((ws_ext_list_price-ws_ext_wholesale_cost-ws_ext_discount_amt)+ws_ext_sales_price)/2) ) year_total\n" +
-                "       ,'w' sale_type\n" +
-                " from customer\n" +
-                "     ,web_sales\n" +
-                "     ,date_dim\n" +
-                " where c_customer_sk = ws_bill_customer_sk\n" +
-                "   and ws_sold_date_sk = d_date_sk\n" +
-                " group by c_customer_id\n" +
-                "         ,c_first_name\n" +
-                "         ,c_last_name\n" +
-                "         ,c_preferred_cust_flag\n" +
-                "         ,c_birth_country\n" +
-                "         ,c_login\n" +
-                "         ,c_email_address\n" +
-                "         ,d_year\n" +
-                "         )\n" +
-                "  select  \n" +
-                "                  t_s_secyear.customer_id\n" +
-                "                 ,t_s_secyear.customer_first_name\n" +
-                "                 ,t_s_secyear.customer_last_name\n" +
-                "                 ,t_s_secyear.customer_email_address\n" +
-                " from year_total t_s_firstyear\n" +
-                "     ,year_total t_s_secyear\n" +
-                "     ,year_total t_c_firstyear\n" +
-                "     ,year_total t_c_secyear\n" +
-                "     ,year_total t_w_firstyear\n" +
-                "     ,year_total t_w_secyear\n" +
-                " where t_s_secyear.customer_id = t_s_firstyear.customer_id\n" +
-                "   and t_s_firstyear.customer_id = t_c_secyear.customer_id\n" +
-                "   and t_s_firstyear.customer_id = t_c_firstyear.customer_id\n" +
-                "   and t_s_firstyear.customer_id = t_w_firstyear.customer_id\n" +
-                "   and t_s_firstyear.customer_id = t_w_secyear.customer_id\n" +
-                "   and t_s_firstyear.sale_type = 's'\n" +
-                "   and t_c_firstyear.sale_type = 'c'\n" +
-                "   and t_w_firstyear.sale_type = 'w'\n" +
-                "   and t_s_secyear.sale_type = 's'\n" +
-                "   and t_c_secyear.sale_type = 'c'\n" +
-                "   and t_w_secyear.sale_type = 'w'\n" +
-                "   and t_s_firstyear.dyear =  2001\n" +
-                "   and t_s_secyear.dyear = 2001+1\n" +
-                "   and t_c_firstyear.dyear =  2001\n" +
-                "   and t_c_secyear.dyear =  2001+1\n" +
-                "   and t_w_firstyear.dyear = 2001\n" +
-                "   and t_w_secyear.dyear = 2001+1\n" +
-                "   and t_s_firstyear.year_total > 0\n" +
-                "   and t_c_firstyear.year_total > 0\n" +
-                "   and t_w_firstyear.year_total > 0\n" +
-                "   and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end\n" +
-                "           > case when t_s_firstyear.year_total > 0 then t_s_secyear.year_total / t_s_firstyear.year_total else null end\n" +
-                "   and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end\n" +
-                "           > case when t_w_firstyear.year_total > 0 then t_w_secyear.year_total / t_w_firstyear.year_total else null end\n" +
-                " order by t_s_secyear.customer_id\n" +
-                "         ,t_s_secyear.customer_first_name\n" +
-                "         ,t_s_secyear.customer_last_name\n" +
-                "         ,t_s_secyear.customer_email_address\n" +
-                "limit 100";
-        String query4StringNoSpaces = query4String.replaceAll("\\s+", "");
-        String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
-        assertEquals(expectedNoSpaces, query4StringNoSpaces);
-    }
+  @Test
+  public void testQuery4String() throws Exception {
+    String query4String = QueryReader.readQuery("query4");
+    String expected =
+        "with year_total as (\n"
+            + " select c_customer_id customer_id\n"
+            + "       ,c_first_name customer_first_name\n"
+            + "       ,c_last_name customer_last_name\n"
+            + "       ,c_preferred_cust_flag customer_preferred_cust_flag\n"
+            + "       ,c_birth_country customer_birth_country\n"
+            + "       ,c_login customer_login\n"
+            + "       ,c_email_address customer_email_address\n"
+            + "       ,d_year dyear\n"
+            + "       ,sum(((ss_ext_list_price-ss_ext_wholesale_cost-ss_ext_discount_amt)+ss_ext_sales_price)/2) year_total\n"
+            + "       ,'s' sale_type\n"
+            + " from customer\n"
+            + "     ,store_sales\n"
+            + "     ,date_dim\n"
+            + " where c_customer_sk = ss_customer_sk\n"
+            + "   and ss_sold_date_sk = d_date_sk\n"
+            + " group by c_customer_id\n"
+            + "         ,c_first_name\n"
+            + "         ,c_last_name\n"
+            + "         ,c_preferred_cust_flag\n"
+            + "         ,c_birth_country\n"
+            + "         ,c_login\n"
+            + "         ,c_email_address\n"
+            + "         ,d_year\n"
+            + " union all\n"
+            + " select c_customer_id customer_id\n"
+            + "       ,c_first_name customer_first_name\n"
+            + "       ,c_last_name customer_last_name\n"
+            + "       ,c_preferred_cust_flag customer_preferred_cust_flag\n"
+            + "       ,c_birth_country customer_birth_country\n"
+            + "       ,c_login customer_login\n"
+            + "       ,c_email_address customer_email_address\n"
+            + "       ,d_year dyear\n"
+            + "       ,sum((((cs_ext_list_price-cs_ext_wholesale_cost-cs_ext_discount_amt)+cs_ext_sales_price)/2) ) year_total\n"
+            + "       ,'c' sale_type\n"
+            + " from customer\n"
+            + "     ,catalog_sales\n"
+            + "     ,date_dim\n"
+            + " where c_customer_sk = cs_bill_customer_sk\n"
+            + "   and cs_sold_date_sk = d_date_sk\n"
+            + " group by c_customer_id\n"
+            + "         ,c_first_name\n"
+            + "         ,c_last_name\n"
+            + "         ,c_preferred_cust_flag\n"
+            + "         ,c_birth_country\n"
+            + "         ,c_login\n"
+            + "         ,c_email_address\n"
+            + "         ,d_year\n"
+            + "union all\n"
+            + " select c_customer_id customer_id\n"
+            + "       ,c_first_name customer_first_name\n"
+            + "       ,c_last_name customer_last_name\n"
+            + "       ,c_preferred_cust_flag customer_preferred_cust_flag\n"
+            + "       ,c_birth_country customer_birth_country\n"
+            + "       ,c_login customer_login\n"
+            + "       ,c_email_address customer_email_address\n"
+            + "       ,d_year dyear\n"
+            + "       ,sum((((ws_ext_list_price-ws_ext_wholesale_cost-ws_ext_discount_amt)+ws_ext_sales_price)/2) ) year_total\n"
+            + "       ,'w' sale_type\n"
+            + " from customer\n"
+            + "     ,web_sales\n"
+            + "     ,date_dim\n"
+            + " where c_customer_sk = ws_bill_customer_sk\n"
+            + "   and ws_sold_date_sk = d_date_sk\n"
+            + " group by c_customer_id\n"
+            + "         ,c_first_name\n"
+            + "         ,c_last_name\n"
+            + "         ,c_preferred_cust_flag\n"
+            + "         ,c_birth_country\n"
+            + "         ,c_login\n"
+            + "         ,c_email_address\n"
+            + "         ,d_year\n"
+            + "         )\n"
+            + "  select  \n"
+            + "                  t_s_secyear.customer_id\n"
+            + "                 ,t_s_secyear.customer_first_name\n"
+            + "                 ,t_s_secyear.customer_last_name\n"
+            + "                 ,t_s_secyear.customer_email_address\n"
+            + " from year_total t_s_firstyear\n"
+            + "     ,year_total t_s_secyear\n"
+            + "     ,year_total t_c_firstyear\n"
+            + "     ,year_total t_c_secyear\n"
+            + "     ,year_total t_w_firstyear\n"
+            + "     ,year_total t_w_secyear\n"
+            + " where t_s_secyear.customer_id = t_s_firstyear.customer_id\n"
+            + "   and t_s_firstyear.customer_id = t_c_secyear.customer_id\n"
+            + "   and t_s_firstyear.customer_id = t_c_firstyear.customer_id\n"
+            + "   and t_s_firstyear.customer_id = t_w_firstyear.customer_id\n"
+            + "   and t_s_firstyear.customer_id = t_w_secyear.customer_id\n"
+            + "   and t_s_firstyear.sale_type = 's'\n"
+            + "   and t_c_firstyear.sale_type = 'c'\n"
+            + "   and t_w_firstyear.sale_type = 'w'\n"
+            + "   and t_s_secyear.sale_type = 's'\n"
+            + "   and t_c_secyear.sale_type = 'c'\n"
+            + "   and t_w_secyear.sale_type = 'w'\n"
+            + "   and t_s_firstyear.dyear =  2001\n"
+            + "   and t_s_secyear.dyear = 2001+1\n"
+            + "   and t_c_firstyear.dyear =  2001\n"
+            + "   and t_c_secyear.dyear =  2001+1\n"
+            + "   and t_w_firstyear.dyear = 2001\n"
+            + "   and t_w_secyear.dyear = 2001+1\n"
+            + "   and t_s_firstyear.year_total > 0\n"
+            + "   and t_c_firstyear.year_total > 0\n"
+            + "   and t_w_firstyear.year_total > 0\n"
+            + "   and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end\n"
+            + "           > case when t_s_firstyear.year_total > 0 then t_s_secyear.year_total / t_s_firstyear.year_total else null end\n"
+            + "   and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end\n"
+            + "           > case when t_w_firstyear.year_total > 0 then t_w_secyear.year_total / t_w_firstyear.year_total else null end\n"
+            + " order by t_s_secyear.customer_id\n"
+            + "         ,t_s_secyear.customer_first_name\n"
+            + "         ,t_s_secyear.customer_last_name\n"
+            + "         ,t_s_secyear.customer_email_address\n"
+            + "limit 100";
+    String query4StringNoSpaces = query4String.replaceAll("\\s+", "");
+    String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
+    assertEquals(expectedNoSpaces, query4StringNoSpaces);
+  }
 
-    @Test
-    public void testQuery55String() throws Exception {
-        String query55String = QueryReader.readQuery("query55");
-        String expected = "select  i_brand_id brand_id, i_brand brand,\n" +
-                " \tsum(ss_ext_sales_price) ext_price\n" +
-                " from date_dim, store_sales, item\n" +
-                " where d_date_sk = ss_sold_date_sk\n" +
-                " \tand ss_item_sk = i_item_sk\n" +
-                " \tand i_manager_id=36\n" +
-                " \tand d_moy=12\n" +
-                " \tand d_year=2001\n" +
-                " group by i_brand, i_brand_id\n" +
-                " order by ext_price desc, i_brand_id\n" +
-                "limit 100";
-        String query55StringNoSpaces = query55String.replaceAll("\\s+", "");
-        String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
-        assertEquals(expectedNoSpaces, query55StringNoSpaces);
-    }
+  @Test
+  public void testQuery55String() throws Exception {
+    String query55String = QueryReader.readQuery("query55");
+    String expected =
+        "select  i_brand_id brand_id, i_brand brand,\n"
+            + " \tsum(ss_ext_sales_price) ext_price\n"
+            + " from date_dim, store_sales, item\n"
+            + " where d_date_sk = ss_sold_date_sk\n"
+            + " \tand ss_item_sk = i_item_sk\n"
+            + " \tand i_manager_id=36\n"
+            + " \tand d_moy=12\n"
+            + " \tand d_year=2001\n"
+            + " group by i_brand, i_brand_id\n"
+            + " order by ext_price desc, i_brand_id\n"
+            + "limit 100";
+    String query55StringNoSpaces = query55String.replaceAll("\\s+", "");
+    String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
+    assertEquals(expectedNoSpaces, query55StringNoSpaces);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
index 7748bee..1d597f0 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
@@ -18,134 +18,160 @@
 package org.apache.beam.sdk.tpcds;
 
 import static org.junit.Assert.assertEquals;
-import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-
+import org.junit.Test;
 
 public class TableSchemaJSONLoaderTest {
-    @Test
-    public void testStoreReturnsTable() throws Exception {
-        String storeReturnsSchemaString = TableSchemaJSONLoader.parseTableSchema("store_returns");
-        String expected = "sr_returned_date_sk bigint,"
-                + "sr_return_time_sk bigint,"
-                + "sr_item_sk bigint,"
-                + "sr_customer_sk bigint,"
-                + "sr_cdemo_sk bigint,"
-                + "sr_hdemo_sk bigint,"
-                + "sr_addr_sk bigint,"
-                + "sr_store_sk bigint,"
-                + "sr_reason_sk bigint,"
-                + "sr_ticket_number bigint,"
-                + "sr_return_quantity bigint,"
-                + "sr_return_amt double,"
-                + "sr_return_tax double,"
-                + "sr_return_amt_inc_tax double,"
-                + "sr_fee double,"
-                + "sr_return_ship_cost double,"
-                + "sr_refunded_cash double,"
-                + "sr_reversed_charge double,"
-                + "sr_store_credit double,"
-                + "sr_net_loss double";
-        assertEquals(expected, storeReturnsSchemaString);
-    }
+  @Test
+  public void testStoreReturnsTable() throws Exception {
+    String storeReturnsSchemaString = TableSchemaJSONLoader.parseTableSchema("store_returns");
+    String expected =
+        "sr_returned_date_sk bigint,"
+            + "sr_return_time_sk bigint,"
+            + "sr_item_sk bigint,"
+            + "sr_customer_sk bigint,"
+            + "sr_cdemo_sk bigint,"
+            + "sr_hdemo_sk bigint,"
+            + "sr_addr_sk bigint,"
+            + "sr_store_sk bigint,"
+            + "sr_reason_sk bigint,"
+            + "sr_ticket_number bigint,"
+            + "sr_return_quantity bigint,"
+            + "sr_return_amt double,"
+            + "sr_return_tax double,"
+            + "sr_return_amt_inc_tax double,"
+            + "sr_fee double,"
+            + "sr_return_ship_cost double,"
+            + "sr_refunded_cash double,"
+            + "sr_reversed_charge double,"
+            + "sr_store_credit double,"
+            + "sr_net_loss double";
+    assertEquals(expected, storeReturnsSchemaString);
+  }
 
-    @Test
-    public void testItemTable() throws Exception {
-        String itemSchemaString = TableSchemaJSONLoader.parseTableSchema("item");
-        String expected = "i_item_sk bigint,"
-                + "i_item_id varchar,"
-                + "i_rec_start_date varchar,"
-                + "i_rec_end_date varchar,"
-                + "i_item_desc varchar,"
-                + "i_current_price double,"
-                + "i_wholesale_cost double,"
-                + "i_brand_id bigint,"
-                + "i_brand varchar,"
-                + "i_class_id bigint,"
-                + "i_class varchar,"
-                + "i_category_id bigint,"
-                + "i_category varchar,"
-                + "i_manufact_id bigint,"
-                + "i_manufact varchar,"
-                + "i_size varchar,"
-                + "i_formulation varchar,"
-                + "i_color varchar,"
-                + "i_units varchar,"
-                + "i_container varchar,"
-                + "i_manager_id bigint,"
-                + "i_product_name varchar";
-        assertEquals(expected, itemSchemaString);
-    }
+  @Test
+  public void testItemTable() throws Exception {
+    String itemSchemaString = TableSchemaJSONLoader.parseTableSchema("item");
+    String expected =
+        "i_item_sk bigint,"
+            + "i_item_id varchar,"
+            + "i_rec_start_date varchar,"
+            + "i_rec_end_date varchar,"
+            + "i_item_desc varchar,"
+            + "i_current_price double,"
+            + "i_wholesale_cost double,"
+            + "i_brand_id bigint,"
+            + "i_brand varchar,"
+            + "i_class_id bigint,"
+            + "i_class varchar,"
+            + "i_category_id bigint,"
+            + "i_category varchar,"
+            + "i_manufact_id bigint,"
+            + "i_manufact varchar,"
+            + "i_size varchar,"
+            + "i_formulation varchar,"
+            + "i_color varchar,"
+            + "i_units varchar,"
+            + "i_container varchar,"
+            + "i_manager_id bigint,"
+            + "i_product_name varchar";
+    assertEquals(expected, itemSchemaString);
+  }
 
-    @Test
-    public void testDateDimTable() throws Exception {
-        String dateDimSchemaString = TableSchemaJSONLoader.parseTableSchema("date_dim");
-        String expected = "d_date_sk bigint,"
-                + "d_date_id varchar,"
-                + "d_date varchar,"
-                + "d_month_seq bigint,"
-                + "d_week_seq bigint,"
-                + "d_quarter_seq bigint,"
-                + "d_year bigint,"
-                + "d_dow bigint,"
-                + "d_moy bigint,"
-                + "d_dom bigint,"
-                + "d_qoy bigint,"
-                + "d_fy_year bigint,"
-                + "d_fy_quarter_seq bigint,"
-                + "d_fy_week_seq bigint,"
-                + "d_day_name varchar,"
-                + "d_quarter_name varchar,"
-                + "d_holiday varchar,"
-                + "d_weekend varchar,"
-                + "d_following_holiday varchar,"
-                + "d_first_dom bigint,"
-                + "d_last_dom bigint,"
-                + "d_same_day_ly bigint,"
-                + "d_same_day_lq bigint,"
-                + "d_current_day varchar,"
-                + "d_current_week varchar,"
-                + "d_current_month varchar,"
-                + "d_current_quarter varchar,"
-                + "d_current_year varchar";
-        assertEquals(expected, dateDimSchemaString);
-    }
+  @Test
+  public void testDateDimTable() throws Exception {
+    String dateDimSchemaString = TableSchemaJSONLoader.parseTableSchema("date_dim");
+    String expected =
+        "d_date_sk bigint,"
+            + "d_date_id varchar,"
+            + "d_date varchar,"
+            + "d_month_seq bigint,"
+            + "d_week_seq bigint,"
+            + "d_quarter_seq bigint,"
+            + "d_year bigint,"
+            + "d_dow bigint,"
+            + "d_moy bigint,"
+            + "d_dom bigint,"
+            + "d_qoy bigint,"
+            + "d_fy_year bigint,"
+            + "d_fy_quarter_seq bigint,"
+            + "d_fy_week_seq bigint,"
+            + "d_day_name varchar,"
+            + "d_quarter_name varchar,"
+            + "d_holiday varchar,"
+            + "d_weekend varchar,"
+            + "d_following_holiday varchar,"
+            + "d_first_dom bigint,"
+            + "d_last_dom bigint,"
+            + "d_same_day_ly bigint,"
+            + "d_same_day_lq bigint,"
+            + "d_current_day varchar,"
+            + "d_current_week varchar,"
+            + "d_current_month varchar,"
+            + "d_current_quarter varchar,"
+            + "d_current_year varchar";
+    assertEquals(expected, dateDimSchemaString);
+  }
 
-    @Test
-    public void testWarehouseTable() throws Exception {
-        String warehouseSchemaString = TableSchemaJSONLoader.parseTableSchema("warehouse");
-        String expected = "w_warehouse_sk bigint,"
-                + "w_warehouse_id varchar,"
-                + "w_warehouse_name varchar,"
-                + "w_warehouse_sq_ft bigint,"
-                + "w_street_number varchar,"
-                + "w_street_name varchar,"
-                + "w_street_type varchar,"
-                + "w_suite_number varchar,"
-                + "w_city varchar,"
-                + "w_county varchar,"
-                + "w_state varchar,"
-                + "w_zip varchar,"
-                + "w_country varchar,"
-                + "w_gmt_offset double";
-        assertEquals(expected, warehouseSchemaString);
-    }
+  @Test
+  public void testWarehouseTable() throws Exception {
+    String warehouseSchemaString = TableSchemaJSONLoader.parseTableSchema("warehouse");
+    String expected =
+        "w_warehouse_sk bigint,"
+            + "w_warehouse_id varchar,"
+            + "w_warehouse_name varchar,"
+            + "w_warehouse_sq_ft bigint,"
+            + "w_street_number varchar,"
+            + "w_street_name varchar,"
+            + "w_street_type varchar,"
+            + "w_suite_number varchar,"
+            + "w_city varchar,"
+            + "w_county varchar,"
+            + "w_state varchar,"
+            + "w_zip varchar,"
+            + "w_country varchar,"
+            + "w_gmt_offset double";
+    assertEquals(expected, warehouseSchemaString);
+  }
 
-    @Test
-    public void testGetAllTableNames() {
-        List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
-        Collections.sort(tableNames);
-        List<String> expectedTableNames = Arrays.asList("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics",
-                "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim",
-                "warehouse", "web_page", "web_returns", "web_sales", "web_site");
+  @Test
+  public void testGetAllTableNames() {
+    List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
+    Collections.sort(tableNames);
+    List<String> expectedTableNames =
+        Arrays.asList(
+            "call_center",
+            "catalog_page",
+            "catalog_returns",
+            "catalog_sales",
+            "customer",
+            "customer_address",
+            "customer_demographics",
+            "date_dim",
+            "household_demographics",
+            "income_band",
+            "inventory",
+            "item",
+            "promotion",
+            "reason",
+            "ship_mode",
+            "store",
+            "store_returns",
+            "store_sales",
+            "time_dim",
+            "warehouse",
+            "web_page",
+            "web_returns",
+            "web_sales",
+            "web_site");
 
-        assertEquals(expectedTableNames.size(), tableNames.size());
+    assertEquals(expectedTableNames.size(), tableNames.size());
 
-        for (int i = 0; i < tableNames.size(); i++) {
-            assertEquals(expectedTableNames.get(i), tableNames.get(i));
-        }
+    for (int i = 0; i < tableNames.size(); i++) {
+      assertEquals(expectedTableNames.get(i), tableNames.get(i));
     }
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsParametersReaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsParametersReaderTest.java
index 3f8c951..2cd1b0b 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsParametersReaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsParametersReaderTest.java
@@ -17,76 +17,76 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import static org.junit.Assert.assertEquals;
+
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
 public class TpcdsParametersReaderTest {
-    private TpcdsOptions tpcdsOptions;
-    private TpcdsOptions tpcdsOptionsError;
+  private TpcdsOptions tpcdsOptions;
+  private TpcdsOptions tpcdsOptionsError;
 
-    @Before
-    public void initializeTpcdsOptions() {
-        tpcdsOptions = PipelineOptionsFactory.as(TpcdsOptions.class);
-        tpcdsOptionsError = PipelineOptionsFactory.as(TpcdsOptions.class);
+  @Before
+  public void initializeTpcdsOptions() {
+    tpcdsOptions = PipelineOptionsFactory.as(TpcdsOptions.class);
+    tpcdsOptionsError = PipelineOptionsFactory.as(TpcdsOptions.class);
 
-        tpcdsOptions.setDataSize("1G");
-        tpcdsOptions.setQueries("1,2,3");
-        tpcdsOptions.setTpcParallel(2);
+    tpcdsOptions.setDataSize("1G");
+    tpcdsOptions.setQueries("1,2,3");
+    tpcdsOptions.setTpcParallel(2);
 
-        tpcdsOptionsError.setDataSize("5G");
-        tpcdsOptionsError.setQueries("0,100");
-        tpcdsOptionsError.setTpcParallel(0);
-    }
+    tpcdsOptionsError.setDataSize("5G");
+    tpcdsOptionsError.setQueries("0,100");
+    tpcdsOptionsError.setTpcParallel(0);
+  }
 
-    @Test
-    public void testGetAndCheckDataSize() throws Exception {
-        String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
-        String expected = "1G";
-        assertEquals(expected, dataSize);
-    }
+  @Test
+  public void testGetAndCheckDataSize() throws Exception {
+    String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+    String expected = "1G";
+    assertEquals(expected, dataSize);
+  }
 
-    @Test( expected = Exception.class)
-    public void testGetAndCheckDataSizeException() throws Exception {
-        TpcdsParametersReader.getAndCheckDataSize(tpcdsOptionsError);
-    }
+  @Test(expected = Exception.class)
+  public void testGetAndCheckDataSizeException() throws Exception {
+    TpcdsParametersReader.getAndCheckDataSize(tpcdsOptionsError);
+  }
 
-    @Test
-    public void testGetAndCheckQueries() throws Exception {
-        TpcdsOptions tpcdsOptionsAll = PipelineOptionsFactory.as(TpcdsOptions.class);
-        tpcdsOptionsAll.setQueries("all");
-        String[] queryNameArray = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptionsAll);
-        String[] expected = new String[99];
-        for (int i = 0; i < 99; i++) {
-            expected[i] = "query" + (i + 1);
-        }
-        Assert.assertArrayEquals(expected, queryNameArray);
+  @Test
+  public void testGetAndCheckQueries() throws Exception {
+    TpcdsOptions tpcdsOptionsAll = PipelineOptionsFactory.as(TpcdsOptions.class);
+    tpcdsOptionsAll.setQueries("all");
+    String[] queryNameArray = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptionsAll);
+    String[] expected = new String[99];
+    for (int i = 0; i < 99; i++) {
+      expected[i] = "query" + (i + 1);
     }
+    Assert.assertArrayEquals(expected, queryNameArray);
+  }
 
-    @Test
-    public void testGetAndCheckAllQueries() throws Exception {
-        String[] queryNameArray = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
-        String[] expected = {"query1", "query2", "query3"};
-        Assert.assertArrayEquals(expected, queryNameArray);
-    }
+  @Test
+  public void testGetAndCheckAllQueries() throws Exception {
+    String[] queryNameArray = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
+    String[] expected = {"query1", "query2", "query3"};
+    Assert.assertArrayEquals(expected, queryNameArray);
+  }
 
-    @Test( expected = Exception.class)
-    public void testGetAndCheckQueriesException() throws Exception {
-        TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptionsError);
-    }
+  @Test(expected = Exception.class)
+  public void testGetAndCheckQueriesException() throws Exception {
+    TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptionsError);
+  }
 
-    @Test
-    public void testGetAndCheckTpcParallel() throws Exception {
-        int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
-        int expected = 2;
-        assertEquals(expected, nThreads);
-    }
+  @Test
+  public void testGetAndCheckTpcParallel() throws Exception {
+    int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
+    int expected = 2;
+    assertEquals(expected, nThreads);
+  }
 
-    @Test( expected = Exception.class)
-    public void ttestGetAndCheckTpcParallelException() throws Exception {
-        TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptionsError);
-    }
+  @Test(expected = Exception.class)
+  public void ttestGetAndCheckTpcParallelException() throws Exception {
+    TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptionsError);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java
index 05402f2..9c43935 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java
@@ -17,108 +17,107 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-import org.apache.beam.sdk.schemas.Schema;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Map;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+import org.junit.Before;
+import org.junit.Test;
 
 public class TpcdsSchemasTest {
-    private Map<String, Schema> schemaMap;
-    private Map<String, Schema> immutableSchemaMap;
+  private Map<String, Schema> schemaMap;
+  private Map<String, Schema> immutableSchemaMap;
 
-    @Before
-    public void initializeMaps() throws Exception {
-        schemaMap = TpcdsSchemas.getTpcdsSchemas();
-        immutableSchemaMap = TpcdsSchemas.getTpcdsSchemasImmutableMap();
-    }
+  @Before
+  public void initializeMaps() throws Exception {
+    schemaMap = TpcdsSchemas.getTpcdsSchemas();
+    immutableSchemaMap = TpcdsSchemas.getTpcdsSchemasImmutableMap();
+  }
 
-    @Test
-    public void testCallCenterSchema() throws Exception {
-        Schema callCenterSchema =
-                Schema.builder()
-                        .addField("cc_call_center_sk", Schema.FieldType.INT64)
-                        .addField("cc_call_center_id", Schema.FieldType.STRING)
-                        .addNullableField("cc_rec_start_date", Schema.FieldType.STRING)
-                        .addNullableField("cc_rec_end_date", Schema.FieldType.STRING)
-                        .addNullableField("cc_closed_date_sk", Schema.FieldType.INT64)
-                        .addNullableField("cc_open_date_sk", Schema.FieldType.INT64)
-                        .addNullableField("cc_name", Schema.FieldType.STRING)
-                        .addNullableField("cc_class", Schema.FieldType.STRING)
-                        .addNullableField("cc_employees", Schema.FieldType.INT64)
-                        .addNullableField("cc_sq_ft", Schema.FieldType.INT64)
-                        .addNullableField("cc_hours", Schema.FieldType.STRING)
-                        .addNullableField("cc_manager", Schema.FieldType.STRING)
-                        .addNullableField("cc_mkt_id", Schema.FieldType.INT64)
-                        .addNullableField("cc_mkt_class", Schema.FieldType.STRING)
-                        .addNullableField("cc_mkt_desc", Schema.FieldType.STRING)
-                        .addNullableField("cc_market_manager", Schema.FieldType.STRING)
-                        .addNullableField("cc_division", Schema.FieldType.INT64)
-                        .addNullableField("cc_division_name", Schema.FieldType.STRING)
-                        .addNullableField("cc_company", Schema.FieldType.INT64)
-                        .addNullableField("cc_company_name", Schema.FieldType.STRING)
-                        .addNullableField("cc_street_number", Schema.FieldType.STRING)
-                        .addNullableField("cc_street_name", Schema.FieldType.STRING)
-                        .addNullableField("cc_street_type", Schema.FieldType.STRING)
-                        .addNullableField("cc_suite_number", Schema.FieldType.STRING)
-                        .addNullableField("cc_city", Schema.FieldType.STRING)
-                        .addNullableField("cc_county", Schema.FieldType.STRING)
-                        .addNullableField("cc_state", Schema.FieldType.STRING)
-                        .addNullableField("cc_zip", Schema.FieldType.STRING)
-                        .addNullableField("cc_country", Schema.FieldType.STRING)
-                        .addNullableField("cc_gmt_offset", Schema.FieldType.DOUBLE)
-                        .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
-                        .build();
+  @Test
+  public void testCallCenterSchema() throws Exception {
+    Schema callCenterSchema =
+        Schema.builder()
+            .addField("cc_call_center_sk", Schema.FieldType.INT64)
+            .addField("cc_call_center_id", Schema.FieldType.STRING)
+            .addNullableField("cc_rec_start_date", Schema.FieldType.STRING)
+            .addNullableField("cc_rec_end_date", Schema.FieldType.STRING)
+            .addNullableField("cc_closed_date_sk", Schema.FieldType.INT64)
+            .addNullableField("cc_open_date_sk", Schema.FieldType.INT64)
+            .addNullableField("cc_name", Schema.FieldType.STRING)
+            .addNullableField("cc_class", Schema.FieldType.STRING)
+            .addNullableField("cc_employees", Schema.FieldType.INT64)
+            .addNullableField("cc_sq_ft", Schema.FieldType.INT64)
+            .addNullableField("cc_hours", Schema.FieldType.STRING)
+            .addNullableField("cc_manager", Schema.FieldType.STRING)
+            .addNullableField("cc_mkt_id", Schema.FieldType.INT64)
+            .addNullableField("cc_mkt_class", Schema.FieldType.STRING)
+            .addNullableField("cc_mkt_desc", Schema.FieldType.STRING)
+            .addNullableField("cc_market_manager", Schema.FieldType.STRING)
+            .addNullableField("cc_division", Schema.FieldType.INT64)
+            .addNullableField("cc_division_name", Schema.FieldType.STRING)
+            .addNullableField("cc_company", Schema.FieldType.INT64)
+            .addNullableField("cc_company_name", Schema.FieldType.STRING)
+            .addNullableField("cc_street_number", Schema.FieldType.STRING)
+            .addNullableField("cc_street_name", Schema.FieldType.STRING)
+            .addNullableField("cc_street_type", Schema.FieldType.STRING)
+            .addNullableField("cc_suite_number", Schema.FieldType.STRING)
+            .addNullableField("cc_city", Schema.FieldType.STRING)
+            .addNullableField("cc_county", Schema.FieldType.STRING)
+            .addNullableField("cc_state", Schema.FieldType.STRING)
+            .addNullableField("cc_zip", Schema.FieldType.STRING)
+            .addNullableField("cc_country", Schema.FieldType.STRING)
+            .addNullableField("cc_gmt_offset", Schema.FieldType.DOUBLE)
+            .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
+            .build();
 
-        assertNotEquals(schemaMap.get("call_center"), callCenterSchema);
-        assertEquals(immutableSchemaMap.get("call_center"), callCenterSchema);
-    }
+    assertNotEquals(schemaMap.get("call_center"), callCenterSchema);
+    assertEquals(immutableSchemaMap.get("call_center"), callCenterSchema);
+  }
 
-    @Test
-    public void testCatalogPageSchemaNullable() throws Exception {
-        Schema catalogPageSchemaNullable =
-                Schema.builder()
-                        .addNullableField("cp_catalog_page_sk", Schema.FieldType.INT64)
-                        .addNullableField("cp_catalog_page_id", Schema.FieldType.STRING)
-                        .addNullableField("cp_start_date_sk", Schema.FieldType.INT64)
-                        .addNullableField("cp_end_date_sk", Schema.FieldType.INT64)
-                        .addNullableField("cp_department", Schema.FieldType.STRING)
-                        .addNullableField("cp_catalog_number", Schema.FieldType.INT64)
-                        .addNullableField("cp_catalog_page_number", Schema.FieldType.INT64)
-                        .addNullableField("cp_description", Schema.FieldType.STRING)
-                        .addNullableField("cp_type", Schema.FieldType.STRING)
-                        .build();
+  @Test
+  public void testCatalogPageSchemaNullable() throws Exception {
+    Schema catalogPageSchemaNullable =
+        Schema.builder()
+            .addNullableField("cp_catalog_page_sk", Schema.FieldType.INT64)
+            .addNullableField("cp_catalog_page_id", Schema.FieldType.STRING)
+            .addNullableField("cp_start_date_sk", Schema.FieldType.INT64)
+            .addNullableField("cp_end_date_sk", Schema.FieldType.INT64)
+            .addNullableField("cp_department", Schema.FieldType.STRING)
+            .addNullableField("cp_catalog_number", Schema.FieldType.INT64)
+            .addNullableField("cp_catalog_page_number", Schema.FieldType.INT64)
+            .addNullableField("cp_description", Schema.FieldType.STRING)
+            .addNullableField("cp_type", Schema.FieldType.STRING)
+            .build();
 
-        assertEquals(schemaMap.get("catalog_page"), catalogPageSchemaNullable);
-        assertNotEquals(schemaMap.get("catalog_page"), TpcdsSchemas.getCatalogPageSchema());
-        assertEquals(immutableSchemaMap.get("catalog_page"), TpcdsSchemas.getCatalogPageSchema());
-    }
+    assertEquals(schemaMap.get("catalog_page"), catalogPageSchemaNullable);
+    assertNotEquals(schemaMap.get("catalog_page"), TpcdsSchemas.getCatalogPageSchema());
+    assertEquals(immutableSchemaMap.get("catalog_page"), TpcdsSchemas.getCatalogPageSchema());
+  }
 
-    @Test
-    public void testCustomerAddressSchemaNullable() throws Exception {
-        Schema customerAddressSchemaNullable =
-                Schema.builder()
-                        .addNullableField("ca_address_sk", Schema.FieldType.INT64)
-                        .addNullableField("ca_address_id", Schema.FieldType.STRING)
-                        .addNullableField("ca_street_number", Schema.FieldType.STRING)
-                        .addNullableField("ca_street_name", Schema.FieldType.STRING)
-                        .addNullableField("ca_street_type", Schema.FieldType.STRING)
-                        .addNullableField("ca_suite_number", Schema.FieldType.STRING)
-                        .addNullableField("ca_city", Schema.FieldType.STRING)
-                        .addNullableField("ca_county", Schema.FieldType.STRING)
-                        .addNullableField("ca_state", Schema.FieldType.STRING)
-                        .addNullableField("ca_zip", Schema.FieldType.STRING)
-                        .addNullableField("ca_country", Schema.FieldType.STRING)
-                        .addNullableField("ca_gmt_offset", Schema.FieldType.DOUBLE)
-                        .addNullableField("ca_location_type", Schema.FieldType.STRING)
-                        .build();
+  @Test
+  public void testCustomerAddressSchemaNullable() throws Exception {
+    Schema customerAddressSchemaNullable =
+        Schema.builder()
+            .addNullableField("ca_address_sk", Schema.FieldType.INT64)
+            .addNullableField("ca_address_id", Schema.FieldType.STRING)
+            .addNullableField("ca_street_number", Schema.FieldType.STRING)
+            .addNullableField("ca_street_name", Schema.FieldType.STRING)
+            .addNullableField("ca_street_type", Schema.FieldType.STRING)
+            .addNullableField("ca_suite_number", Schema.FieldType.STRING)
+            .addNullableField("ca_city", Schema.FieldType.STRING)
+            .addNullableField("ca_county", Schema.FieldType.STRING)
+            .addNullableField("ca_state", Schema.FieldType.STRING)
+            .addNullableField("ca_zip", Schema.FieldType.STRING)
+            .addNullableField("ca_country", Schema.FieldType.STRING)
+            .addNullableField("ca_gmt_offset", Schema.FieldType.DOUBLE)
+            .addNullableField("ca_location_type", Schema.FieldType.STRING)
+            .build();
 
-        assertEquals(schemaMap.get("customer_address"), customerAddressSchemaNullable);
-        assertNotEquals(schemaMap.get("customer_address"), TpcdsSchemas.getCustomerAddressSchema());
-        assertEquals(immutableSchemaMap.get("customer_address"), TpcdsSchemas.getCustomerAddressSchema());
-    }
+    assertEquals(schemaMap.get("customer_address"), customerAddressSchemaNullable);
+    assertNotEquals(schemaMap.get("customer_address"), TpcdsSchemas.getCustomerAddressSchema());
+    assertEquals(
+        immutableSchemaMap.get("customer_address"), TpcdsSchemas.getCustomerAddressSchema());
+  }
 }

[beam] 03/04: [BEAM-11712] Fix static analysis warnings and typos on TPC-DS module

Posted by ie...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8fe0c5cee1bab62680ebd92a51aed8f3da80e190
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Tue Apr 13 14:20:54 2021 +0200

    [BEAM-11712] Fix static analysis warnings and typos on TPC-DS module
---
 .../apache/beam/sdk/tpcds/BeamSqlEnvRunner.java    |   3 +-
 .../java/org/apache/beam/sdk/tpcds/CsvToRow.java   |   4 +-
 .../org/apache/beam/sdk/tpcds/QueryReader.java     |   3 +-
 .../java/org/apache/beam/sdk/tpcds/RowToCsv.java   |   2 +-
 .../apache/beam/sdk/tpcds/SqlTransformRunner.java  |   2 +-
 .../beam/sdk/tpcds/TableSchemaJSONLoader.java      |   6 +-
 .../beam/sdk/tpcds/TpcdsParametersReader.java      |   4 +-
 .../org/apache/beam/sdk/tpcds/TpcdsRunResult.java  |  11 +--
 .../org/apache/beam/sdk/tpcds/TpcdsSchemas.java    | 102 ++++++++++-----------
 .../beam/sdk/tpcds/TableSchemaJSONLoaderTest.java  |   3 +-
 .../apache/beam/sdk/tpcds/TpcdsSchemasTest.java    |   6 +-
 11 files changed, 68 insertions(+), 78 deletions(-)

diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
index 69e676f..304fdd2 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
@@ -211,8 +211,7 @@ public class BeamSqlEnvRunner {
         // Transform the result from PCollection<Row> into PCollection<String>, and write it to the
         // location where results are stored.
         PCollection<String> rowStrings =
-            rows.apply(
-                MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()));
+            rows.apply(MapElements.into(TypeDescriptors.strings()).via(Row::toString));
         rowStrings.apply(
             TextIO.write()
                 .to(
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java
index d66b128..d6c8ed8 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java
@@ -33,8 +33,8 @@ import org.apache.commons.csv.CSVFormat;
  */
 public class CsvToRow extends PTransform<PCollection<String>, PCollection<Row>>
     implements Serializable {
-  private Schema schema;
-  private CSVFormat csvFormat;
+  private final Schema schema;
+  private final CSVFormat csvFormat;
 
   public CSVFormat getCsvFormat() {
     return csvFormat;
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
index 7b00a37..c6f3253 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
@@ -35,7 +35,6 @@ public class QueryReader {
    */
   public static String readQuery(String queryFileName) throws Exception {
     String path = "queries/" + queryFileName + ".sql";
-    String query = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
-    return query;
+    return Resources.toString(Resources.getResource(path), Charsets.UTF_8);
   }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java
index 40a8cc5..a087948 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java
@@ -33,7 +33,7 @@ import org.apache.commons.csv.CSVFormat;
  */
 public class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>
     implements Serializable {
-  private CSVFormat csvFormat;
+  private final CSVFormat csvFormat;
 
   public RowToCsv(CSVFormat csvFormat) {
     this.csvFormat = csvFormat;
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
index 4f56c1a..bea0261 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
@@ -189,7 +189,7 @@ public class SqlTransformRunner {
       try {
         tables
             .apply(SqlTransform.query(queryString))
-            .apply(MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
+            .apply(MapElements.into(TypeDescriptors.strings()).via(Row::toString))
             .apply(
                 TextIO.write()
                     .to(
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
index b6f8733..0b95853 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.tpcds;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -69,8 +68,7 @@ public class TableSchemaJSONLoader {
           // If the key of the pair is "type", make some modification before appending it to the
           // schemaStringBuilder, then append a comma.
           String typeName = (String) pair.getValue();
-          if (typeName.toLowerCase().equals("identifier")
-              || typeName.toLowerCase().equals("integer")) {
+          if (typeName.equalsIgnoreCase("identifier") || typeName.equalsIgnoreCase("integer")) {
             // Use long type to represent int, prevent overflow
             schemaStringBuilder.append("bigint");
           } else if (typeName.contains("decimal")) {
@@ -106,7 +104,7 @@ public class TableSchemaJSONLoader {
    *
    * @return The list of names of all tables.
    */
-  public static List<String> getAllTableNames() throws IOException, URISyntaxException {
+  public static List<String> getAllTableNames() throws IOException {
     ClassLoader classLoader = TableSchemaJSONLoader.class.getClassLoader();
     if (classLoader == null) {
       throw new RuntimeException("Can't get classloader from TableSchemaJSONLoader.");
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
index 8928292..8d7d1a7 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
@@ -47,7 +47,7 @@ public class TpcdsParametersReader {
 
   /**
    * Get and check queries entered by user. This has to be a string of numbers separated by commas
-   * or "all" which means run all 99 queiries. All query numbers have to be between 1 and 99.
+   * or "all" which means run all 99 queries. All query numbers have to be between 1 and 99.
    *
    * @param tpcdsOptions TpcdsOptions object constructed from user input
    * @return An array of query names, for example "1,2,7" will be output as "query1,query2,query7"
@@ -57,7 +57,7 @@ public class TpcdsParametersReader {
     String queryNums = tpcdsOptions.getQueries();
 
     String[] queryNumArr;
-    if (queryNums.toLowerCase().equals("all")) {
+    if (queryNums.equalsIgnoreCase("all")) {
       // All 99 TPC-DS queries need to be executed.
       queryNumArr = new String[99];
       for (int i = 0; i < 99; i++) {
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
index c89a34a..55fdac1 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
@@ -50,14 +50,12 @@ public class TpcdsRunResult {
 
   public Date getStartDate() {
     Timestamp startTimeStamp = new Timestamp(startTime);
-    Date startDate = new Date(startTimeStamp.getTime());
-    return startDate;
+    return new Date(startTimeStamp.getTime());
   }
 
   public Date getEndDate() {
     Timestamp endTimeStamp = new Timestamp(endTime);
-    Date endDate = new Date(endTimeStamp.getTime());
-    return endDate;
+    return new Date(endTimeStamp.getTime());
   }
 
   public double getElapsedTime() {
@@ -80,8 +78,7 @@ public class TpcdsRunResult {
   public String getQueryName() {
     String jobName = getJobName();
     int endIndex = jobName.indexOf("result");
-    String queryName = jobName.substring(0, endIndex);
-    return queryName;
+    return jobName.substring(0, endIndex);
   }
 
   public String getDataSize() throws Exception {
@@ -89,7 +86,7 @@ public class TpcdsRunResult {
     return TpcdsParametersReader.getAndCheckDataSize(pipelineOptions.as(TpcdsOptions.class));
   }
 
-  public String getDialect() throws Exception {
+  public String getDialect() {
     PipelineOptions pipelineOptions = getPipelineOptions();
     String queryPlannerClassName =
         pipelineOptions.as(BeamSqlPipelineOptions.class).getPlannerName();
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java
index 7f3d874..9398dfa 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java
@@ -73,34 +73,32 @@ public class TpcdsSchemas {
    * @return A map of all tpcds table schemas with their table names as keys.
    */
   public static Map<String, Schema> getTpcdsSchemasImmutableMap() {
-    ImmutableMap<String, Schema> immutableSchemaMap =
-        ImmutableMap.<String, Schema>builder()
-            .put("call_center", callCenterSchema)
-            .put("catalog_page", catalogPageSchema)
-            .put("catalog_returns", catalogReturnsSchema)
-            .put("catalog_sales", catalogSalesSchema)
-            .put("customer", customerSchema)
-            .put("customer_address", customerAddressSchema)
-            .put("customer_demographics", customerDemographicsSchema)
-            .put("date_dim", dateDimSchema)
-            .put("household_demographics", householdDemographicsSchema)
-            .put("income_band", incomeBandSchema)
-            .put("inventory", inventorySchema)
-            .put("item", itemSchema)
-            .put("promotion", promotionSchema)
-            .put("reason", reasonSchema)
-            .put("ship_mode", shipModeSchema)
-            .put("store", storeSchema)
-            .put("store_returns", storeReturnsSchema)
-            .put("store_sales", storeSalesSchema)
-            .put("time_dim", timeDimSchema)
-            .put("warehouse", warehouseSchema)
-            .put("web_page", webPageSchema)
-            .put("web_returns", webReturnsSchema)
-            .put("web_sales", webSalesSchema)
-            .put("web_site", webSiteSchema)
-            .build();
-    return immutableSchemaMap;
+    return ImmutableMap.<String, Schema>builder()
+        .put("call_center", callCenterSchema)
+        .put("catalog_page", catalogPageSchema)
+        .put("catalog_returns", catalogReturnsSchema)
+        .put("catalog_sales", catalogSalesSchema)
+        .put("customer", customerSchema)
+        .put("customer_address", customerAddressSchema)
+        .put("customer_demographics", customerDemographicsSchema)
+        .put("date_dim", dateDimSchema)
+        .put("household_demographics", householdDemographicsSchema)
+        .put("income_band", incomeBandSchema)
+        .put("inventory", inventorySchema)
+        .put("item", itemSchema)
+        .put("promotion", promotionSchema)
+        .put("reason", reasonSchema)
+        .put("ship_mode", shipModeSchema)
+        .put("store", storeSchema)
+        .put("store_returns", storeReturnsSchema)
+        .put("store_sales", storeSalesSchema)
+        .put("time_dim", timeDimSchema)
+        .put("warehouse", warehouseSchema)
+        .put("web_page", webPageSchema)
+        .put("web_returns", webReturnsSchema)
+        .put("web_sales", webSalesSchema)
+        .put("web_site", webSiteSchema)
+        .build();
   }
 
   public static Schema getCallCenterSchema() {
@@ -199,7 +197,7 @@ public class TpcdsSchemas {
     return webSiteSchema;
   }
 
-  private static Schema callCenterSchema =
+  private static final Schema callCenterSchema =
       Schema.builder()
           .addField("cc_call_center_sk", Schema.FieldType.INT64)
           .addField("cc_call_center_id", Schema.FieldType.STRING)
@@ -234,7 +232,7 @@ public class TpcdsSchemas {
           .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
           .build();
 
-  private static Schema catalogPageSchema =
+  private static final Schema catalogPageSchema =
       Schema.builder()
           .addField("cp_catalog_page_sk", Schema.FieldType.INT64)
           .addField("cp_catalog_page_id", Schema.FieldType.STRING)
@@ -247,7 +245,7 @@ public class TpcdsSchemas {
           .addNullableField("cp_type", Schema.FieldType.STRING)
           .build();
 
-  private static Schema catalogReturnsSchema =
+  private static final Schema catalogReturnsSchema =
       Schema.builder()
           .addNullableField("cr_returned_date_sk", Schema.FieldType.INT64)
           .addNullableField("cr_returned_time_sk", Schema.FieldType.INT64)
@@ -278,7 +276,7 @@ public class TpcdsSchemas {
           .addNullableField("cr_net_loss", Schema.FieldType.DOUBLE)
           .build();
 
-  private static Schema catalogSalesSchema =
+  private static final Schema catalogSalesSchema =
       Schema.builder()
           .addNullableField("cs_sold_date_sk", Schema.FieldType.INT64)
           .addNullableField("cs_sold_time_sk", Schema.FieldType.INT64)
@@ -316,7 +314,7 @@ public class TpcdsSchemas {
           .addNullableField("cs_net_profit", Schema.FieldType.DOUBLE)
           .build();
 
-  private static Schema customerSchema =
+  private static final Schema customerSchema =
       Schema.builder()
           .addField("c_customer_sk", Schema.FieldType.INT64)
           .addField("c_customer_id", Schema.FieldType.STRING)
@@ -338,7 +336,7 @@ public class TpcdsSchemas {
           .addNullableField("c_last_review_date_sk", Schema.FieldType.INT64)
           .build();
 
-  private static Schema customerAddressSchema =
+  private static final Schema customerAddressSchema =
       Schema.builder()
           .addField("ca_address_sk", Schema.FieldType.INT64)
           .addField("ca_address_id", Schema.FieldType.STRING)
@@ -355,7 +353,7 @@ public class TpcdsSchemas {
           .addNullableField("ca_location_type", Schema.FieldType.STRING)
           .build();
 
-  private static Schema customerDemographicsSchema =
+  private static final Schema customerDemographicsSchema =
       Schema.builder()
           .addField("cd_demo_sk", Schema.FieldType.INT64)
           .addNullableField("cd_gender", Schema.FieldType.STRING)
@@ -368,7 +366,7 @@ public class TpcdsSchemas {
           .addNullableField("cd_dep_college_count", Schema.FieldType.INT64)
           .build();
 
-  private static Schema dateDimSchema =
+  private static final Schema dateDimSchema =
       Schema.builder()
           .addField("d_date_sk", Schema.FieldType.INT64)
           .addField("d_date_id", Schema.FieldType.STRING)
@@ -400,7 +398,7 @@ public class TpcdsSchemas {
           .addNullableField("d_current_year", Schema.FieldType.STRING)
           .build();
 
-  private static Schema householdDemographicsSchema =
+  private static final Schema householdDemographicsSchema =
       Schema.builder()
           .addField("hd_demo_sk", Schema.FieldType.INT64)
           .addNullableField("hd_income_band_sk", Schema.FieldType.INT64)
@@ -409,14 +407,14 @@ public class TpcdsSchemas {
           .addNullableField("hd_vehicle_count", Schema.FieldType.INT64)
           .build();
 
-  private static Schema incomeBandSchema =
+  private static final Schema incomeBandSchema =
       Schema.builder()
           .addField("ib_income_band_sk", Schema.FieldType.INT64)
           .addNullableField("ib_lower_bound", Schema.FieldType.INT64)
           .addNullableField("ib_upper_bound", Schema.FieldType.INT64)
           .build();
 
-  private static Schema inventorySchema =
+  private static final Schema inventorySchema =
       Schema.builder()
           .addField("inv_date_sk", Schema.FieldType.INT32)
           .addField("inv_item_sk", Schema.FieldType.INT32)
@@ -424,7 +422,7 @@ public class TpcdsSchemas {
           .addNullableField("inv_quantity_on_hand", Schema.FieldType.INT32)
           .build();
 
-  private static Schema itemSchema =
+  private static final Schema itemSchema =
       Schema.builder()
           .addField("i_item_sk", Schema.FieldType.INT64)
           .addField("i_item_id", Schema.FieldType.STRING)
@@ -450,7 +448,7 @@ public class TpcdsSchemas {
           .addNullableField("i_product_name", Schema.FieldType.STRING)
           .build();
 
-  private static Schema promotionSchema =
+  private static final Schema promotionSchema =
       Schema.builder()
           .addField("p_promo_sk", Schema.FieldType.INT64)
           .addField("p_promo_id", Schema.FieldType.STRING)
@@ -473,14 +471,14 @@ public class TpcdsSchemas {
           .addNullableField("p_discount_active", Schema.FieldType.STRING)
           .build();
 
-  private static Schema reasonSchema =
+  private static final Schema reasonSchema =
       Schema.builder()
           .addField("r_reason_sk", Schema.FieldType.INT64)
           .addField("r_reason_id", Schema.FieldType.STRING)
           .addNullableField("r_reason_desc", Schema.FieldType.STRING)
           .build();
 
-  private static Schema shipModeSchema =
+  private static final Schema shipModeSchema =
       Schema.builder()
           .addField("sm_ship_mode_sk", Schema.FieldType.INT64)
           .addField("sm_ship_mode_id", Schema.FieldType.STRING)
@@ -490,7 +488,7 @@ public class TpcdsSchemas {
           .addNullableField("sm_contract", Schema.FieldType.STRING)
           .build();
 
-  private static Schema storeSchema =
+  private static final Schema storeSchema =
       Schema.builder()
           .addField("s_store_sk", Schema.FieldType.INT64)
           .addField("s_store_id", Schema.FieldType.STRING)
@@ -523,7 +521,7 @@ public class TpcdsSchemas {
           .addNullableField("s_tax_percentage", Schema.FieldType.DOUBLE)
           .build();
 
-  private static Schema storeReturnsSchema =
+  private static final Schema storeReturnsSchema =
       Schema.builder()
           .addNullableField("sr_returned_date_sk", Schema.FieldType.INT64)
           .addNullableField("sr_return_time_sk", Schema.FieldType.INT64)
@@ -547,7 +545,7 @@ public class TpcdsSchemas {
           .addNullableField("sr_net_loss", Schema.FieldType.DOUBLE)
           .build();
 
-  private static Schema storeSalesSchema =
+  private static final Schema storeSalesSchema =
       Schema.builder()
           .addNullableField("ss_sold_date_sk", Schema.FieldType.INT64)
           .addNullableField("ss_sold_time_sk", Schema.FieldType.INT64)
@@ -574,7 +572,7 @@ public class TpcdsSchemas {
           .addNullableField("ss_net_profit", Schema.FieldType.DOUBLE)
           .build();
 
-  private static Schema timeDimSchema =
+  private static final Schema timeDimSchema =
       Schema.builder()
           .addField("t_time_sk", Schema.FieldType.INT64)
           .addField("t_time_id", Schema.FieldType.STRING)
@@ -588,7 +586,7 @@ public class TpcdsSchemas {
           .addNullableField("t_meal_time", Schema.FieldType.STRING)
           .build();
 
-  private static Schema warehouseSchema =
+  private static final Schema warehouseSchema =
       Schema.builder()
           .addField("w_warehouse_sk", Schema.FieldType.INT64)
           .addField("w_warehouse_id", Schema.FieldType.STRING)
@@ -606,7 +604,7 @@ public class TpcdsSchemas {
           .addNullableField("w_gmt_offset", Schema.FieldType.DOUBLE)
           .build();
 
-  private static Schema webPageSchema =
+  private static final Schema webPageSchema =
       Schema.builder()
           .addField("wp_web_page_sk", Schema.FieldType.INT64)
           .addField("wp_web_page_id", Schema.FieldType.STRING)
@@ -624,7 +622,7 @@ public class TpcdsSchemas {
           .addNullableField("wp_max_ad_count", Schema.FieldType.INT64)
           .build();
 
-  private static Schema webReturnsSchema =
+  private static final Schema webReturnsSchema =
       Schema.builder()
           .addNullableField("wr_returned_date_sk", Schema.FieldType.INT64)
           .addNullableField("wr_returned_time_sk", Schema.FieldType.INT64)
@@ -652,7 +650,7 @@ public class TpcdsSchemas {
           .addNullableField("wr_net_loss", Schema.FieldType.DOUBLE)
           .build();
 
-  private static Schema webSalesSchema =
+  private static final Schema webSalesSchema =
       Schema.builder()
           .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32)
           .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32)
@@ -690,7 +688,7 @@ public class TpcdsSchemas {
           .addNullableField("ws_net_profit", Schema.FieldType.DOUBLE)
           .build();
 
-  private static Schema webSiteSchema =
+  private static final Schema webSiteSchema =
       Schema.builder()
           .addField("web_site_sk", Schema.FieldType.STRING)
           .addField("web_site_id", Schema.FieldType.STRING)
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
index 651d6f0..45f6f91 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.tpcds;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -140,7 +139,7 @@ public class TableSchemaJSONLoaderTest {
   }
 
   @Test
-  public void testGetAllTableNames() throws IOException, URISyntaxException {
+  public void testGetAllTableNames() throws IOException {
     List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
     Collections.sort(tableNames);
     List<String> expectedTableNames =
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java
index 9c43935..9f2397c 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java
@@ -36,7 +36,7 @@ public class TpcdsSchemasTest {
   }
 
   @Test
-  public void testCallCenterSchema() throws Exception {
+  public void testCallCenterSchema() {
     Schema callCenterSchema =
         Schema.builder()
             .addField("cc_call_center_sk", Schema.FieldType.INT64)
@@ -77,7 +77,7 @@ public class TpcdsSchemasTest {
   }
 
   @Test
-  public void testCatalogPageSchemaNullable() throws Exception {
+  public void testCatalogPageSchemaNullable() {
     Schema catalogPageSchemaNullable =
         Schema.builder()
             .addNullableField("cp_catalog_page_sk", Schema.FieldType.INT64)
@@ -97,7 +97,7 @@ public class TpcdsSchemasTest {
   }
 
   @Test
-  public void testCustomerAddressSchemaNullable() throws Exception {
+  public void testCustomerAddressSchemaNullable() {
     Schema customerAddressSchemaNullable =
         Schema.builder()
             .addNullableField("ca_address_sk", Schema.FieldType.INT64)

[beam] 04/04: Merge pull request #14373: [BEAM-11712] Run TPC-DS via BeamSQL and Spark runner

Posted by ie...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b3ef2035abf9ca2dd94a11a1a6aa4440df28adb9
Merge: f805f1c 8fe0c5c
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Tue Apr 13 14:22:12 2021 +0200

    Merge pull request #14373: [BEAM-11712] Run TPC-DS via BeamSQL and Spark runner

 sdks/java/testing/tpcds/README.md                  |   68 +
 sdks/java/testing/tpcds/build.gradle               |  108 +-
 .../apache/beam/sdk/tpcds/BeamSqlEnvRunner.java    |  327 +++--
 .../java/org/apache/beam/sdk/tpcds/BeamTpcds.java  |   50 +-
 .../java/org/apache/beam/sdk/tpcds/CsvToRow.java   |   47 +-
 .../org/apache/beam/sdk/tpcds/QueryReader.java     |   51 +-
 .../java/org/apache/beam/sdk/tpcds/RowToCsv.java   |   38 +-
 .../apache/beam/sdk/tpcds/SqlTransformRunner.java  |  314 +++--
 .../apache/beam/sdk/tpcds/SummaryGenerator.java    |  219 ++--
 .../beam/sdk/tpcds/TableSchemaJSONLoader.java      |  162 +--
 .../org/apache/beam/sdk/tpcds/TpcdsOptions.java    |   40 +-
 .../beam/sdk/tpcds/TpcdsOptionsRegistrar.java      |   10 +-
 .../beam/sdk/tpcds/TpcdsParametersReader.java      |  136 +-
 .../java/org/apache/beam/sdk/tpcds/TpcdsRun.java   |   54 +-
 .../org/apache/beam/sdk/tpcds/TpcdsRunResult.java  |  120 +-
 .../org/apache/beam/sdk/tpcds/TpcdsSchemas.java    | 1336 ++++++++++----------
 ...pcdsOptionsRegistrar.java => package-info.java} |   16 +-
 .../org/apache/beam/sdk/tpcds/QueryReaderTest.java |  361 +++---
 .../beam/sdk/tpcds/TableSchemaJSONLoaderTest.java  |  261 ++--
 .../beam/sdk/tpcds/TpcdsParametersReaderTest.java  |  110 +-
 .../apache/beam/sdk/tpcds/TpcdsSchemasTest.java    |  183 ++-
 21 files changed, 2158 insertions(+), 1853 deletions(-)