You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2021/04/13 12:23:23 UTC

[beam] 01/04: [BEAM-11712] Make up-to-date build file and codestyle

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 28eec3f9d484fb7e9484c8f8c57c30a4fdaf867b
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Tue Mar 30 11:16:39 2021 +0200

    [BEAM-11712] Make up-to-date build file and codestyle
---
 sdks/java/testing/tpcds/build.gradle               |   99 +-
 .../apache/beam/sdk/tpcds/BeamSqlEnvRunner.java    |  328 +++--
 .../java/org/apache/beam/sdk/tpcds/BeamTpcds.java  |   54 +-
 .../java/org/apache/beam/sdk/tpcds/CsvToRow.java   |   47 +-
 .../org/apache/beam/sdk/tpcds/QueryReader.java     |   87 +-
 .../java/org/apache/beam/sdk/tpcds/RowToCsv.java   |   38 +-
 .../apache/beam/sdk/tpcds/SqlTransformRunner.java  |  317 +++--
 .../apache/beam/sdk/tpcds/SummaryGenerator.java    |  219 ++--
 .../beam/sdk/tpcds/TableSchemaJSONLoader.java      |  171 ++-
 .../org/apache/beam/sdk/tpcds/TpcdsOptions.java    |   26 +-
 .../beam/sdk/tpcds/TpcdsOptionsRegistrar.java      |   10 +-
 .../beam/sdk/tpcds/TpcdsParametersReader.java      |  136 +-
 .../java/org/apache/beam/sdk/tpcds/TpcdsRun.java   |   54 +-
 .../org/apache/beam/sdk/tpcds/TpcdsRunResult.java  |  123 +-
 .../org/apache/beam/sdk/tpcds/TpcdsSchemas.java    | 1338 ++++++++++----------
 ...pcdsOptionsRegistrar.java => package-info.java} |   16 +-
 .../org/apache/beam/sdk/tpcds/QueryReaderTest.java |  361 +++---
 .../beam/sdk/tpcds/TableSchemaJSONLoaderTest.java  |  260 ++--
 .../beam/sdk/tpcds/TpcdsParametersReaderTest.java  |  110 +-
 .../apache/beam/sdk/tpcds/TpcdsSchemasTest.java    |  183 ++-
 20 files changed, 2138 insertions(+), 1839 deletions(-)

diff --git a/sdks/java/testing/tpcds/build.gradle b/sdks/java/testing/tpcds/build.gradle
index c52ec7a..6237776 100644
--- a/sdks/java/testing/tpcds/build.gradle
+++ b/sdks/java/testing/tpcds/build.gradle
@@ -16,38 +16,97 @@
  * limitations under the License.
  */
 
-plugins {
-    id 'java'
-}
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+        automaticModuleName: 'org.apache.beam.sdk.tpcds',
+        exportJavadoc: false,
+        archivesBaseName: 'beam-sdks-java-tpcds',
+)
 
-description = "Apache Beam :: SDKs :: Java :: TPC-DS Benchark"
+description = "Apache Beam :: SDKs :: Java :: TPC-DS"
 
-version '2.24.0-SNAPSHOT'
+// When running via Gradle, this property can be used to pass commandline arguments
+// to the TPD-DS run
+def tpcdsArgsProperty = "tpcds.args"
 
-sourceCompatibility = 1.8
+// When running via Gradle, this property sets the runner dependency
+def tpcdsRunnerProperty = "tpcds.runner"
+def tpcdsRunnerDependency = project.findProperty(tpcdsRunnerProperty)
+        ?: ":runners:direct-java"
+def shouldProvideSpark = ":runners:spark".equals(tpcdsRunnerDependency)
+def isDataflowRunner = ":runners:google-cloud-dataflow-java".equals(tpcdsRunnerDependency)
+def runnerConfiguration = ":runners:direct-java".equals(tpcdsRunnerDependency) ? "shadow" : null
+
+if (isDataflowRunner) {
+    /*
+     * We need to rely on manually specifying these evaluationDependsOn to ensure that
+     * the following projects are evaluated before we evaluate this project. This is because
+     * we are attempting to reference a property from the project directly.
+     */
+    evaluationDependsOn(":runners:google-cloud-dataflow-java:worker:legacy-worker")
+}
 
-repositories {
-    mavenCentral()
+configurations {
+    // A configuration for running the TPC-DS launcher directly from Gradle, which
+    // uses Gradle to put the appropriate dependencies on the Classpath rather than
+    // bundling them into a fat jar
+    gradleRun
 }
 
 dependencies {
-    compile 'com.googlecode.json-simple:json-simple:1.1.1'
-    compile project(path: ":sdks:java:core", configuration: "shadow")
-    compile project(path: ":runners:google-cloud-dataflow-java")
-    compile project(":sdks:java:io:google-cloud-platform")
+    compile library.java.vendored_guava_26_0_jre
+    compile library.java.vendored_calcite_1_20_0
+    compile library.java.commons_csv
+    compile library.java.slf4j_api
+    compile "com.googlecode.json-simple:json-simple:1.1.1"
+    compile "com.alibaba:fastjson:1.2.69"
     compile project(":sdks:java:extensions:sql")
     compile project(":sdks:java:extensions:sql:zetasql")
-    compile group: 'com.google.auto.service', name: 'auto-service', version: '1.0-rc1'
-    testCompile group: 'junit', name: 'junit', version: '4.12'
+    compile project(path: ":runners:google-cloud-dataflow-java")
+    compile project(path: ":sdks:java:core", configuration: "shadow")
+    testRuntimeClasspath library.java.slf4j_jdk14
+    testCompile project(path: ":sdks:java:io:google-cloud-platform", configuration: "testRuntime")
+    gradleRun project(project.path)
+    gradleRun project(path: tpcdsRunnerDependency, configuration: runnerConfiguration)
+
+    // The Spark runner requires the user to provide a Spark dependency. For self-contained
+    // runs with the Spark runner, we can provide such a dependency. This is deliberately phrased
+    // to not hardcode any runner other than :runners:direct-java
+    if (shouldProvideSpark) {
+        gradleRun library.java.spark_core, {
+            exclude group:"org.slf4j", module:"jul-to-slf4j"
+        }
+        gradleRun library.java.spark_sql
+        gradleRun library.java.spark_streaming
+    }
 }
 
-// When running via Gradle, this property can be used to pass commandline arguments
-// to the tpcds run
-def tpcdsArgsProperty = "tpcds.args"
+if (shouldProvideSpark) {
+    configurations.gradleRun {
+        // Using Spark runner causes a StackOverflowError if slf4j-jdk14 is on the classpath
+        exclude group: "org.slf4j", module: "slf4j-jdk14"
+    }
+}
 
 task run(type: JavaExec) {
-    main = "org.apache.beam.sdk.tpcds.BeamTpcds"
-    classpath = sourceSets.main.runtimeClasspath
     def tpcdsArgsStr = project.findProperty(tpcdsArgsProperty) ?: ""
-    args tpcdsArgsStr.split()
+    def tpcdsArgsList = new ArrayList<String>()
+    Collections.addAll(tpcdsArgsList, tpcdsArgsStr.split())
+
+    if (isDataflowRunner) {
+        dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
+
+        def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?:
+                project(":runners:google-cloud-dataflow-java:worker:legacy-worker")
+                        .shadowJar.archivePath
+        // Provide job with a customizable worker jar.
+        // With legacy worker jar, containerImage is set to empty (i.e. to use the internal build).
+        // More context and discussions can be found in PR#6694.
+        tpcdsArgsList.add("--dataflowWorkerJar=${dataflowWorkerJar}".toString())
+        tpcdsArgsList.add('--workerHarnessContainerImage=')
+    }
+
+    main = "org.apache.beam.sdk.tpcds.BeamTpcds"
+    classpath = configurations.gradleRun
+    args tpcdsArgsList.toArray()
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
index f94b748..43b97d2 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
@@ -18,9 +18,16 @@
 package org.apache.beam.sdk.tpcds;
 
 import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.SqlTransform;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -34,155 +41,198 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class executes jobs using BeamSqlEnv, it uses BeamSqlEnv.executeDdl and BeamSqlEnv.parseQuery to run queries.
+ * This class executes jobs using BeamSqlEnv, it uses BeamSqlEnv.executeDdl and
+ * BeamSqlEnv.parseQuery to run queries.
  */
 public class BeamSqlEnvRunner {
-    private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
-    private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
-    private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
-    private static final List<String> SUMMARY_HEADERS_LIST = Arrays.asList("Query Name", "Job Name", "Data Size", "Dialect", "Status", "Start Time", "End Time", "Elapsed Time(sec)");
-
-    private static final Logger Log = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
-
-    private static String buildTableCreateStatement(String tableName) {
-        String createStatement = "CREATE EXTERNAL TABLE " + tableName + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
-        return createStatement;
+  private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
+  private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
+  private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
+  private static final List<String> SUMMARY_HEADERS_LIST =
+      Arrays.asList(
+          "Query Name",
+          "Job Name",
+          "Data Size",
+          "Dialect",
+          "Status",
+          "Start Time",
+          "End Time",
+          "Elapsed Time(sec)");
+
+  private static final Logger LOG = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
+
+  private static String buildTableCreateStatement(String tableName) {
+    String createStatement =
+        "CREATE EXTERNAL TABLE "
+            + tableName
+            + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
+    return createStatement;
+  }
+
+  private static String buildDataLocation(String dataSize, String tableName) {
+    String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
+    return dataLocation;
+  }
+
+  /**
+   * Register all tables into BeamSqlEnv, set their schemas, and set the locations where their
+   * corresponding data are stored. Currently this method is not supported by ZetaSQL planner.
+   */
+  private static void registerAllTablesByBeamSqlEnv(BeamSqlEnv env, String dataSize)
+      throws Exception {
+    List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
+    for (String tableName : tableNames) {
+      String createStatement = buildTableCreateStatement(tableName);
+      String tableSchema = TableSchemaJSONLoader.parseTableSchema(tableName);
+      String dataLocation = buildDataLocation(dataSize, tableName);
+      env.executeDdl(String.format(createStatement, tableSchema, dataLocation));
     }
-
-    private static String buildDataLocation(String dataSize, String tableName) {
-        String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-        return dataLocation;
+  }
+
+  /**
+   * Register all tables into InMemoryMetaStore, set their schemas, and set the locations where
+   * their corresponding data are stored.
+   */
+  private static void registerAllTablesByInMemoryMetaStore(
+      InMemoryMetaStore inMemoryMetaStore, String dataSize) throws Exception {
+    JSONObject properties = new JSONObject();
+    properties.put("csvformat", "InformixUnload");
+
+    Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
+    for (Map.Entry<String, Schema> entry : schemaMap.entrySet()) {
+      String tableName = entry.getKey();
+      String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
+      Schema tableSchema = schemaMap.get(tableName);
+      Table table =
+          Table.builder()
+              .name(tableName)
+              .schema(tableSchema)
+              .location(dataLocation)
+              .properties(properties)
+              .type("text")
+              .build();
+      inMemoryMetaStore.createTable(table);
     }
-
-    /** Register all tables into BeamSqlEnv, set their schemas, and set the locations where their corresponding data are stored.
-     *  Currently this method is not supported by ZetaSQL planner. */
-    private static void registerAllTablesByBeamSqlEnv(BeamSqlEnv env, String dataSize) throws Exception {
-        List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
-        for (String tableName : tableNames) {
-            String createStatement = buildTableCreateStatement(tableName);
-            String tableSchema = TableSchemaJSONLoader.parseTableSchema(tableName);
-            String dataLocation = buildDataLocation(dataSize, tableName);
-            env.executeDdl(String.format(createStatement, tableSchema, dataLocation));
-        }
+  }
+
+  /**
+   * Print the summary table after all jobs are finished.
+   *
+   * @param completion A collection of all TpcdsRunResult that are from finished jobs.
+   * @param numOfResults The number of results in the collection.
+   * @throws Exception
+   */
+  private static void printExecutionSummary(
+      CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
+    List<List<String>> summaryRowsList = new ArrayList<>();
+    for (int i = 0; i < numOfResults; i++) {
+      TpcdsRunResult tpcdsRunResult = completion.take().get();
+      List<String> list = new ArrayList<>();
+      list.add(tpcdsRunResult.getQueryName());
+      list.add(tpcdsRunResult.getJobName());
+      list.add(tpcdsRunResult.getDataSize());
+      list.add(tpcdsRunResult.getDialect());
+      list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
+      list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
+      list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString() : "");
+      list.add(
+          tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
+      summaryRowsList.add(list);
     }
 
-    /** Register all tables into InMemoryMetaStore, set their schemas, and set the locations where their corresponding data are stored. */
-    private static void registerAllTablesByInMemoryMetaStore(InMemoryMetaStore inMemoryMetaStore, String dataSize) throws Exception {
-        JSONObject properties = new JSONObject();
-        properties.put("csvformat", "InformixUnload");
-
-        Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
-        for (String tableName : schemaMap.keySet()) {
-            String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-            Schema tableSchema = schemaMap.get(tableName);
-            Table table = Table.builder().name(tableName).schema(tableSchema).location(dataLocation).properties(properties).type("text").build();
-            inMemoryMetaStore.createTable(table);
-        }
+    System.out.println(SUMMARY_START);
+    System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+  }
+
+  /**
+   * This is the alternative method in BeamTpcds.main method. Run job using BeamSqlEnv.parseQuery()
+   * method. (Doesn't perform well when running query96).
+   *
+   * @param args Command line arguments
+   * @throws Exception
+   */
+  public static void runUsingBeamSqlEnv(String[] args) throws Exception {
+    InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
+    inMemoryMetaStore.registerProvider(new TextTableProvider());
+
+    TpcdsOptions tpcdsOptions =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+
+    String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+    String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
+    int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
+
+    // Using ExecutorService and CompletionService to fulfill multi-threading functionality
+    ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+    CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
+
+    // Directly create all tables and register them into inMemoryMetaStore before creating
+    // BeamSqlEnv object.
+    registerAllTablesByInMemoryMetaStore(inMemoryMetaStore, dataSize);
+
+    BeamSqlPipelineOptions beamSqlPipelineOptions = tpcdsOptions.as(BeamSqlPipelineOptions.class);
+    BeamSqlEnv env =
+        BeamSqlEnv.builder(inMemoryMetaStore)
+            .setPipelineOptions(beamSqlPipelineOptions)
+            .setQueryPlannerClassName(beamSqlPipelineOptions.getPlannerName())
+            .build();
+
+    // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
+    Pipeline[] pipelines = new Pipeline[queryNameArr.length];
+
+    // Execute all queries, transform the each result into a PCollection<String>, write them into
+    // the txt file and store in a GCP directory.
+    for (int i = 0; i < queryNameArr.length; i++) {
+      // For each query, get a copy of pipelineOptions from command line arguments, cast
+      // tpcdsOptions as a DataflowPipelineOptions object to read and set required parameters for
+      // pipeline execution.
+      TpcdsOptions tpcdsOptionsCopy =
+          PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+      DataflowPipelineOptions dataflowPipelineOptionsCopy =
+          tpcdsOptionsCopy.as(DataflowPipelineOptions.class);
+
+      // Set a unique job name using the time stamp so that multiple different pipelines can run
+      // together.
+      dataflowPipelineOptionsCopy.setJobName(
+          queryNameArr[i] + "result" + System.currentTimeMillis());
+
+      pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
+      String queryString = QueryReader.readQuery(queryNameArr[i]);
+
+      try {
+        // Query execution
+        PCollection<Row> rows =
+            BeamSqlRelUtils.toPCollection(pipelines[i], env.parseQuery(queryString));
+
+        // Transform the result from PCollection<Row> into PCollection<String>, and write it to the
+        // location where results are stored.
+        PCollection<String> rowStrings =
+            rows.apply(
+                MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()));
+        rowStrings.apply(
+            TextIO.write()
+                .to(
+                    RESULT_DIRECTORY
+                        + "/"
+                        + dataSize
+                        + "/"
+                        + pipelines[i].getOptions().getJobName())
+                .withSuffix(".txt")
+                .withNumShards(1));
+      } catch (Exception e) {
+        LOG.error("{} failed to execute", queryNameArr[i]);
+        e.printStackTrace();
+      }
+
+      completion.submit(new TpcdsRun(pipelines[i]));
     }
 
-    /**
-     * Print the summary table after all jobs are finished.
-     * @param completion A collection of all TpcdsRunResult that are from finished jobs.
-     * @param numOfResults The number of results in the collection.
-     * @throws Exception
-     */
-    private static void printExecutionSummary(CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
-        List<List<String>> summaryRowsList = new ArrayList<>();
-        for (int i = 0; i < numOfResults; i++) {
-            TpcdsRunResult tpcdsRunResult = completion.take().get();
-            List<String> list = new ArrayList<>();
-            list.add(tpcdsRunResult.getQueryName());
-            list.add(tpcdsRunResult.getJobName());
-            list.add(tpcdsRunResult.getDataSize());
-            list.add(tpcdsRunResult.getDialect());
-            list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
-            list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
-            list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString(): "");
-            list.add(tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
-            summaryRowsList.add(list);
-        }
-
-        System.out.println(SUMMARY_START);
-        System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
-    }
+    executor.shutdown();
 
-    /**
-     * This is the alternative method in BeamTpcds.main method. Run job using BeamSqlEnv.parseQuery() method. (Doesn't perform well when running query96).
-     * @param args Command line arguments
-     * @throws Exception
-     */
-    public static void runUsingBeamSqlEnv(String[] args) throws Exception {
-        InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
-        inMemoryMetaStore.registerProvider(new TextTableProvider());
-
-        TpcdsOptions tpcdsOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
-
-        String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
-        String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
-        int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
-
-        // Using ExecutorService and CompletionService to fulfill multi-threading functionality
-        ExecutorService executor = Executors.newFixedThreadPool(nThreads);
-        CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
-
-        // Directly create all tables and register them into inMemoryMetaStore before creating BeamSqlEnv object.
-        registerAllTablesByInMemoryMetaStore(inMemoryMetaStore, dataSize);
-
-        BeamSqlPipelineOptions beamSqlPipelineOptions = tpcdsOptions.as(BeamSqlPipelineOptions.class);
-        BeamSqlEnv env =
-                BeamSqlEnv
-                        .builder(inMemoryMetaStore)
-                        .setPipelineOptions(beamSqlPipelineOptions)
-                        .setQueryPlannerClassName(beamSqlPipelineOptions.getPlannerName())
-                        .build();
-
-        // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
-        Pipeline[] pipelines = new Pipeline[queryNameArr.length];
-
-        // Execute all queries, transform the each result into a PCollection<String>, write them into the txt file and store in a GCP directory.
-        for (int i = 0; i < queryNameArr.length; i++) {
-            // For each query, get a copy of pipelineOptions from command line arguments, cast tpcdsOptions as a DataflowPipelineOptions object to read and set required parameters for pipeline execution.
-            TpcdsOptions tpcdsOptionsCopy = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
-            DataflowPipelineOptions dataflowPipelineOptionsCopy = tpcdsOptionsCopy.as(DataflowPipelineOptions.class);
-
-            // Set a unique job name using the time stamp so that multiple different pipelines can run together.
-            dataflowPipelineOptionsCopy.setJobName(queryNameArr[i] + "result" + System.currentTimeMillis());
-
-            pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
-            String queryString = QueryReader.readQuery(queryNameArr[i]);
-
-            try {
-                // Query execution
-                PCollection<Row> rows = BeamSqlRelUtils.toPCollection(pipelines[i], env.parseQuery(queryString));
-
-                // Transform the result from PCollection<Row> into PCollection<String>, and write it to the location where results are stored.
-                PCollection<String> rowStrings = rows.apply(MapElements
-                        .into(TypeDescriptors.strings())
-                        .via((Row row) -> row.toString()));
-                rowStrings.apply(TextIO.write().to(RESULT_DIRECTORY + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName()).withSuffix(".txt").withNumShards(1));
-            } catch (Exception e) {
-                Log.error("{} failed to execute", queryNameArr[i]);
-                e.printStackTrace();
-            }
-
-            completion.submit(new TpcdsRun(pipelines[i]));
-        }
-
-        executor.shutdown();
-
-        printExecutionSummary(completion, queryNameArr.length);
-    }
+    printExecutionSummary(completion, queryNameArr.length);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
index a7f67de..6b25f65 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
@@ -17,43 +17,33 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-
 /**
  * To execute this main() method, run the following example command from the command line.
  *
- * ./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
- *         --queries=3,26,55 \
- *         --tpcParallel=2 \
- *         --project=apache-beam-testing \
- *         --stagingLocation=gs://beamsql_tpcds_1/staging \
- *         --tempLocation=gs://beamsql_tpcds_2/temp \
- *         --runner=DataflowRunner \
- *         --region=us-west1 \
- *         --maxNumWorkers=10"
- *
+ * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=3,26,55 \
+ * --tpcParallel=2 \ --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging
+ * \ --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \
+ * --maxNumWorkers=10"
  *
- * To run query using ZetaSQL planner (currently query96 can be run using ZetaSQL), set the plannerName as below. If not specified, the default planner is Calcite.
+ * <p>To run query using ZetaSQL planner (currently query96 can be run using ZetaSQL), set the
+ * plannerName as below. If not specified, the default planner is Calcite.
  *
- * ./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
- *         --queries=96 \
- *         --tpcParallel=2 \
- *         --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \
- *         --project=apache-beam-testing \
- *         --stagingLocation=gs://beamsql_tpcds_1/staging \
- *         --tempLocation=gs://beamsql_tpcds_2/temp \
- *         --runner=DataflowRunner \
- *         --region=us-west1 \
- *         --maxNumWorkers=10"
+ * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=96 \
+ * --tpcParallel=2 \ --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \
+ * --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging \
+ * --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \
+ * --maxNumWorkers=10"
  */
 public class BeamTpcds {
-    /**
-     * The main method can choose to run either SqlTransformRunner.runUsingSqlTransform() or BeamSqlEnvRunner.runUsingBeamSqlEnv()
-     * Currently the former has better performance so it is chosen.
-     *
-     * @param args Command line arguments
-     * @throws Exception
-     */
-    public static void main(String[] args) throws Exception {
-        SqlTransformRunner.runUsingSqlTransform(args);
-    }
+  /**
+   * The main method can choose to run either SqlTransformRunner.runUsingSqlTransform() or
+   * BeamSqlEnvRunner.runUsingBeamSqlEnv() Currently the former has better performance so it is
+   * chosen.
+   *
+   * @param args Command line arguments
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    SqlTransformRunner.runUsingSqlTransform(args);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java
index 22519b2..d66b128 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLines2BeamRows;
+
+import java.io.Serializable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.FlatMapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -25,34 +28,30 @@ import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.commons.csv.CSVFormat;
 
-import java.io.Serializable;
-
-import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLines2BeamRows;
-
 /**
- * A readConverter class for TextTable that can read csv file and transform it to PCollection<Row>
+ * A readConverter class for TextTable that can read csv file and transform it to PCollection<Row>.
  */
 public class CsvToRow extends PTransform<PCollection<String>, PCollection<Row>>
-        implements Serializable {
-    private Schema schema;
-    private CSVFormat csvFormat;
+    implements Serializable {
+  private Schema schema;
+  private CSVFormat csvFormat;
 
-    public CSVFormat getCsvFormat() {
-        return csvFormat;
-    }
+  public CSVFormat getCsvFormat() {
+    return csvFormat;
+  }
 
-    public CsvToRow(Schema schema, CSVFormat csvFormat) {
-        this.schema = schema;
-        this.csvFormat = csvFormat;
-    }
+  public CsvToRow(Schema schema, CSVFormat csvFormat) {
+    this.schema = schema;
+    this.csvFormat = csvFormat;
+  }
 
-    @Override
-    public PCollection<Row> expand(PCollection<String> input) {
-        return input
-                .apply(
-                        "csvToRow",
-                        FlatMapElements.into(TypeDescriptors.rows())
-                                .via(s -> csvLines2BeamRows(csvFormat, s, schema)))
-                .setRowSchema(schema);
-    }
+  @Override
+  public PCollection<Row> expand(PCollection<String> input) {
+    return input
+        .apply(
+            "csvToRow",
+            FlatMapElements.into(TypeDescriptors.rows())
+                .via(s -> csvLines2BeamRows(csvFormat, s, schema)))
+        .setRowSchema(schema);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
index 1666c78..ca4cf63 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
@@ -19,41 +19,66 @@ package org.apache.beam.sdk.tpcds;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileReader;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
 
 /**
- * The QueryReader reads query file (the file's extension is '.sql' and content doesn't end with a ';'), write the query as a string and return it.
+ * The QueryReader reads query file (the file's extension is '.sql' and content doesn't end with a
+ * ';'), write the query as a string and return it.
  */
 public class QueryReader {
-    /**
-     * Reads a query file (.sql), return the query as a string.
-     * @param queryFileName The name of the query file (such as "query1, query5...") which is stored in resource/queries directory
-     * @return The query string stored in this file.
-     * @throws Exception
-     */
-    public static String readQuery(String queryFileName) throws Exception {
-        // Prepare the file reader.
-        String queryFilePath = Objects.requireNonNull(QueryReader.class.getClassLoader().getResource("queries/" + queryFileName + ".sql")).getPath();
-        File queryFile = new File(queryFilePath);
-        FileReader fileReader = new FileReader(queryFile);
-        BufferedReader reader = new BufferedReader(fileReader);
-
-        // Read the file into stringBuilder.
-        StringBuilder stringBuilder = new StringBuilder();
-        String line;
-        String ls = System.getProperty("line.separator");
-        while ((line = reader.readLine()) != null) {
-            stringBuilder.append(line);
-            stringBuilder.append(ls);
-        }
-
-        // Delete the last new line separator.
-        stringBuilder.deleteCharAt(stringBuilder.length() - 1);
-        reader.close();
-
-        String queryString = stringBuilder.toString();
-
-        return queryString;
+  public static String readQuery(String queryFileName) throws Exception {
+    String path = "queries/" + queryFileName + ".sql";
+    String query = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
+    return query;
+  }
+
+  /**
+   * Reads a query file (.sql), return the query as a string.
+   *
+   * @param queryFileName The name of the query file (such as "query1, query5...") which is stored
+   *     in resource/queries directory
+   * @return The query string stored in this file.
+   * @throws Exception
+   */
+  public static String readQuery2(String queryFileName) throws Exception {
+
+    // Prepare the file reader.
+    ClassLoader classLoader = QueryReader.class.getClassLoader();
+    if (classLoader == null) {
+      throw new RuntimeException("Can't get classloader from QueryReader.");
+    }
+    String path = "queries/" + queryFileName + ".sql";
+
+    URL resource = classLoader.getResource(path);
+    if (resource == null) {
+      throw new RuntimeException("Resource for " + path + " can't be null.");
     }
+    String queryFilePath = Objects.requireNonNull(resource).getPath();
+    File queryFile = new File(queryFilePath);
+    Reader fileReader =
+        new InputStreamReader(new FileInputStream(queryFile), StandardCharsets.UTF_8);
+    BufferedReader reader = new BufferedReader(fileReader);
+
+    // Read the file into stringBuilder.
+    StringBuilder stringBuilder = new StringBuilder();
+    String line;
+    String ls = System.getProperty("line.separator");
+    while ((line = reader.readLine()) != null) {
+      stringBuilder.append(line);
+      stringBuilder.append(ls);
+    }
+
+    // Delete the last new line separator.
+    stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+    reader.close();
+
+    return stringBuilder.toString();
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java
index 3bae75d..40a8cc5 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
+
+import java.io.Serializable;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -24,29 +27,26 @@ import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.commons.csv.CSVFormat;
 
-import java.io.Serializable;
-
-import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
-
 /**
- * A writeConverter class for TextTable that can transform PCollection<Row> into PCollection<String>, the format of string is determined by CSVFormat
+ * A writeConverter class for TextTable that can transform PCollection<Row> into
+ * PCollection<String>, the format of string is determined by CSVFormat.
  */
 public class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>
-        implements Serializable {
-    private CSVFormat csvFormat;
+    implements Serializable {
+  private CSVFormat csvFormat;
 
-    public RowToCsv(CSVFormat csvFormat) {
-        this.csvFormat = csvFormat;
-    }
+  public RowToCsv(CSVFormat csvFormat) {
+    this.csvFormat = csvFormat;
+  }
 
-    public CSVFormat getCsvFormat() {
-        return csvFormat;
-    }
+  public CSVFormat getCsvFormat() {
+    return csvFormat;
+  }
 
-    @Override
-    public PCollection<String> expand(PCollection<Row> input) {
-        return input.apply(
-                "rowToCsv",
-                MapElements.into(TypeDescriptors.strings()).via(row -> beamRow2CsvLine(row, csvFormat)));
-    }
+  @Override
+  public PCollection<String> expand(PCollection<Row> input) {
+    return input.apply(
+        "rowToCsv",
+        MapElements.into(TypeDescriptors.strings()).via(row -> beamRow2CsvLine(row, csvFormat)));
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
index a40cd12..34274f9 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
@@ -17,6 +17,14 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
@@ -27,155 +35,184 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.*;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.commons.csv.CSVFormat;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class executes jobs using PCollection and SqlTransform, it uses SqlTransform.query to run queries.
+ * This class executes jobs using PCollection and SqlTransform, it uses SqlTransform.query to run
+ * queries.
  */
 public class SqlTransformRunner {
-    private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
-    private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
-    private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
-    private static final List<String> SUMMARY_HEADERS_LIST = Arrays.asList("Query Name", "Job Name", "Data Size", "Dialect", "Status", "Start Time", "End Time", "Elapsed Time(sec)");
-
-    private static final Logger Log = LoggerFactory.getLogger(SqlTransform.class);
-
-    /**
-     * Get all tables (in the form of TextTable) needed for a specific query execution
-     * @param pipeline The pipeline that will be run to execute the query
-     * @param csvFormat The csvFormat to construct readConverter (CsvToRow) and writeConverter (RowToCsv)
-     * @param queryName The name of the query which will be executed (for example: query3, query55, query96)
-     * @return A PCollectionTuple which is constructed by all tables needed for running query.
-     * @throws Exception
-     */
-    private static PCollectionTuple getTables(Pipeline pipeline, CSVFormat csvFormat, String queryName) throws Exception {
-        Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
-        TpcdsOptions tpcdsOptions = pipeline.getOptions().as(TpcdsOptions.class);
-        String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
-        String queryString = QueryReader.readQuery(queryName);
-
-        PCollectionTuple tables = PCollectionTuple.empty(pipeline);
-        for (Map.Entry<String, Schema> tableSchema : schemaMap.entrySet()) {
-            String tableName = tableSchema.getKey();
-
-            // Only when queryString contains tableName, the table is relevant to this query and will be added. This can avoid reading unnecessary data files.
-            if (queryString.contains(tableName)) {
-                // This is location path where the data are stored
-                String filePattern = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-
-                PCollection<Row> table =
-                        new TextTable(
-                                tableSchema.getValue(),
-                                filePattern,
-                                new CsvToRow(tableSchema.getValue(), csvFormat),
-                                new RowToCsv(csvFormat))
-                                .buildIOReader(pipeline.begin())
-                                .setCoder(SchemaCoder.of(tableSchema.getValue()))
-                                .setName(tableSchema.getKey());
-
-                tables = tables.and(new TupleTag<>(tableName), table);
-            }
-        }
-        return tables;
+  private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
+  private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
+  private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
+  private static final List<String> SUMMARY_HEADERS_LIST =
+      Arrays.asList(
+          "Query Name",
+          "Job Name",
+          "Data Size",
+          "Dialect",
+          "Status",
+          "Start Time",
+          "End Time",
+          "Elapsed Time(sec)");
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqlTransformRunner.class);
+
+  /**
+   * Get all tables (in the form of TextTable) needed for a specific query execution.
+   *
+   * @param pipeline The pipeline that will be run to execute the query
+   * @param csvFormat The csvFormat to construct readConverter (CsvToRow) and writeConverter
+   *     (RowToCsv)
+   * @param queryName The name of the query which will be executed (for example: query3, query55,
+   *     query96)
+   * @return A PCollectionTuple which is constructed by all tables needed for running query.
+   * @throws Exception
+   */
+  private static PCollectionTuple getTables(
+      Pipeline pipeline, CSVFormat csvFormat, String queryName) throws Exception {
+    Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
+    TpcdsOptions tpcdsOptions = pipeline.getOptions().as(TpcdsOptions.class);
+    String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+    String queryString = QueryReader.readQuery(queryName);
+
+    PCollectionTuple tables = PCollectionTuple.empty(pipeline);
+    for (Map.Entry<String, Schema> tableSchema : schemaMap.entrySet()) {
+      String tableName = tableSchema.getKey();
+      System.out.println("tableName = " + tableName);
+
+      // Only when queryString contains tableName, the table is relevant to this query and will be
+      // added. This can avoid reading unnecessary data files.
+      if (queryString.contains(tableName)) {
+        // This is location path where the data are stored
+        String filePattern = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
+        System.out.println("filePattern = " + filePattern);
+
+        PCollection<Row> table =
+            new TextTable(
+                    tableSchema.getValue(),
+                    filePattern,
+                    new CsvToRow(tableSchema.getValue(), csvFormat),
+                    new RowToCsv(csvFormat))
+                .buildIOReader(pipeline.begin())
+                .setCoder(SchemaCoder.of(tableSchema.getValue()))
+                .setName(tableSchema.getKey());
+
+        tables = tables.and(new TupleTag<>(tableName), table);
+      }
     }
-
-    /**
-     * Print the summary table after all jobs are finished.
-     * @param completion A collection of all TpcdsRunResult that are from finished jobs.
-     * @param numOfResults The number of results in the collection.
-     * @throws Exception
-     */
-    private static void printExecutionSummary(CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
-        List<List<String>> summaryRowsList = new ArrayList<>();
-        for (int i = 0; i < numOfResults; i++) {
-            TpcdsRunResult tpcdsRunResult = completion.take().get();
-            List<String> list = new ArrayList<>();
-            list.add(tpcdsRunResult.getQueryName());
-            list.add(tpcdsRunResult.getJobName());
-            list.add(tpcdsRunResult.getDataSize());
-            list.add(tpcdsRunResult.getDialect());
-            // If the job is not successful, leave the run time related field blank
-            list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
-            list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
-            list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString(): "");
-            list.add(tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
-            summaryRowsList.add(list);
-        }
-
-        System.out.println(SUMMARY_START);
-        System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+    return tables;
+  }
+
+  /**
+   * Print the summary table after all jobs are finished.
+   *
+   * @param completion A collection of all TpcdsRunResult that are from finished jobs.
+   * @param numOfResults The number of results in the collection.
+   * @throws Exception
+   */
+  private static void printExecutionSummary(
+      CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
+    List<List<String>> summaryRowsList = new ArrayList<>();
+    for (int i = 0; i < numOfResults; i++) {
+      TpcdsRunResult tpcdsRunResult = completion.take().get();
+      List<String> list = new ArrayList<>();
+      list.add(tpcdsRunResult.getQueryName());
+      list.add(tpcdsRunResult.getJobName());
+      list.add(tpcdsRunResult.getDataSize());
+      list.add(tpcdsRunResult.getDialect());
+      // If the job is not successful, leave the run time related field blank
+      list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
+      list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
+      list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString() : "");
+      list.add(
+          tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
+      summaryRowsList.add(list);
     }
 
-    /**
-     * This is the default method in BeamTpcds.main method. Run job using SqlTranform.query() method.
-     * @param args Command line arguments
-     * @throws Exception
-     */
-    public static void runUsingSqlTransform(String[] args) throws Exception {
-        TpcdsOptions tpcdsOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
-
-        String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
-        String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
-        int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
-
-        // Using ExecutorService and CompletionService to fulfill multi-threading functionality
-        ExecutorService executor = Executors.newFixedThreadPool(nThreads);
-        CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
-
-        // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
-        Pipeline[] pipelines = new Pipeline[queryNameArr.length];
-        CSVFormat csvFormat = CSVFormat.MYSQL.withDelimiter('|').withNullString("");
-
-        // Execute all queries, transform the each result into a PCollection<String>, write them into the txt file and store in a GCP directory.
-        for (int i = 0; i < queryNameArr.length; i++) {
-            // For each query, get a copy of pipelineOptions from command line arguments.
-            TpcdsOptions tpcdsOptionsCopy = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
-
-            // Cast tpcdsOptions as a BeamSqlPipelineOptions object to read and set queryPlanner (the default one is Calcite, can change to ZetaSQL).
-            BeamSqlPipelineOptions beamSqlPipelineOptionsCopy = tpcdsOptionsCopy.as(BeamSqlPipelineOptions.class);
-
-            // Finally, cast BeamSqlPipelineOptions as a DataflowPipelineOptions object to read and set other required pipeline optionsparameters .
-            DataflowPipelineOptions dataflowPipelineOptionsCopy = beamSqlPipelineOptionsCopy.as(DataflowPipelineOptions.class);
-
-            // Set a unique job name using the time stamp so that multiple different pipelines can run together.
-            dataflowPipelineOptionsCopy.setJobName(queryNameArr[i] + "result" + System.currentTimeMillis());
-
-            pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
-            String queryString = QueryReader.readQuery(queryNameArr[i]);
-            PCollectionTuple tables = getTables(pipelines[i], csvFormat, queryNameArr[i]);
-
-            try {
-                tables
-                        .apply(
-                                SqlTransform.query(queryString))
-                        .apply(
-                                MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
-                        .apply(TextIO.write()
-                                .to(RESULT_DIRECTORY + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName())
-                                .withSuffix(".txt")
-                                .withNumShards(1));
-            } catch (Exception e) {
-                Log.error("{} failed to execute", queryNameArr[i]);
-                e.printStackTrace();
-            }
-
-            completion.submit(new TpcdsRun(pipelines[i]));
-        }
-
-        executor.shutdown();
-
-        printExecutionSummary(completion, queryNameArr.length);
+    System.out.println(SUMMARY_START);
+    System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+  }
+
+  /**
+   * This is the default method in BeamTpcds.main method. Run job using SqlTranform.query() method.
+   *
+   * @param args Command line arguments
+   * @throws Exception
+   */
+  public static void runUsingSqlTransform(String[] args) throws Exception {
+    TpcdsOptions tpcdsOptions =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+
+    String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+    String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
+    int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
+
+    // Using ExecutorService and CompletionService to fulfill multi-threading functionality
+    ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+    CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
+
+    // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
+    Pipeline[] pipelines = new Pipeline[queryNameArr.length];
+    CSVFormat csvFormat = CSVFormat.MYSQL.withDelimiter('|').withNullString("");
+
+    // Execute all queries, transform the each result into a PCollection<String>, write them into
+    // the txt file and store in a GCP directory.
+    for (int i = 0; i < queryNameArr.length; i++) {
+      // For each query, get a copy of pipelineOptions from command line arguments.
+      TpcdsOptions tpcdsOptionsCopy =
+          PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+
+      // Cast tpcdsOptions as a BeamSqlPipelineOptions object to read and set queryPlanner (the
+      // default one is Calcite, can change to ZetaSQL).
+      BeamSqlPipelineOptions beamSqlPipelineOptionsCopy =
+          tpcdsOptionsCopy.as(BeamSqlPipelineOptions.class);
+
+      // Finally, cast BeamSqlPipelineOptions as a DataflowPipelineOptions object to read and set
+      // other required pipeline optionsparameters .
+      DataflowPipelineOptions dataflowPipelineOptionsCopy =
+          beamSqlPipelineOptionsCopy.as(DataflowPipelineOptions.class);
+
+      // Set a unique job name using the time stamp so that multiple different pipelines can run
+      // together.
+      dataflowPipelineOptionsCopy.setJobName(
+          queryNameArr[i] + "result" + System.currentTimeMillis());
+
+      pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
+      String queryString = QueryReader.readQuery(queryNameArr[i]);
+      PCollectionTuple tables = getTables(pipelines[i], csvFormat, queryNameArr[i]);
+
+      try {
+        tables
+            .apply(SqlTransform.query(queryString))
+            .apply(MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
+            .apply(
+                TextIO.write()
+                    .to(
+                        RESULT_DIRECTORY
+                            + "/"
+                            + dataSize
+                            + "/"
+                            + pipelines[i].getOptions().getJobName())
+                    .withSuffix(".txt")
+                    .withNumShards(1));
+      } catch (Exception e) {
+        LOG.error("{} failed to execute", queryNameArr[i]);
+        e.printStackTrace();
+      }
+
+      completion.submit(new TpcdsRun(pipelines[i]));
     }
+
+    executor.shutdown();
+
+    printExecutionSummary(completion, queryNameArr.length);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
index bddb6a8..36f7a28 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
@@ -20,134 +20,157 @@ package org.apache.beam.sdk.tpcds;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
-/**
- * Generate the tpcds queries execution summary on the command line after finishing all jobs.
- */
+/** Generate the tpcds queries execution summary on the command line after finishing all jobs. */
 public class SummaryGenerator {
-    private static final int PADDING_SIZE = 2;
-    private static final String NEW_LINE = "\n";
-    private static final String TABLE_JOINT_SYMBOL = "+";
-    private static final String TABLE_V_SPLIT_SYMBOL = "|";
-    private static final String TABLE_H_SPLIT_SYMBOL = "-";
+  private static final int PADDING_SIZE = 2;
+  private static final String NEW_LINE = "\n";
+  private static final String TABLE_JOINT_SYMBOL = "+";
+  private static final String TABLE_V_SPLIT_SYMBOL = "|";
+  private static final String TABLE_H_SPLIT_SYMBOL = "-";
 
-    public static String generateTable(List<String> headersList, List<List<String>> rowsList,int... overRiddenHeaderHeight) {
-        StringBuilder stringBuilder = new StringBuilder();
+  public static String generateTable(
+      List<String> headersList, List<List<String>> rowsList, int... overRiddenHeaderHeight) {
+    StringBuilder stringBuilder = new StringBuilder();
 
-        int rowHeight = overRiddenHeaderHeight.length > 0 ? overRiddenHeaderHeight[0] : 1;
+    int rowHeight = overRiddenHeaderHeight.length > 0 ? overRiddenHeaderHeight[0] : 1;
 
-        Map<Integer,Integer> columnMaxWidthMapping = getMaximumWidthofTable(headersList, rowsList);
+    Map<Integer, Integer> columnMaxWidthMapping = getMaximumWidthofTable(headersList, rowsList);
 
-        stringBuilder.append(NEW_LINE);
-        stringBuilder.append(NEW_LINE);
-        createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
-        stringBuilder.append(NEW_LINE);
+    stringBuilder.append(NEW_LINE);
+    stringBuilder.append(NEW_LINE);
+    createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+    stringBuilder.append(NEW_LINE);
 
-        for (int headerIndex = 0; headerIndex < headersList.size(); headerIndex++) {
-            fillCell(stringBuilder, headersList.get(headerIndex), headerIndex, columnMaxWidthMapping);
-        }
-
-        stringBuilder.append(NEW_LINE);
+    for (int headerIndex = 0; headerIndex < headersList.size(); headerIndex++) {
+      fillCell(stringBuilder, headersList.get(headerIndex), headerIndex, columnMaxWidthMapping);
+    }
 
-        createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+    stringBuilder.append(NEW_LINE);
 
-        for (List<String> row : rowsList) {
-            for (int i = 0; i < rowHeight; i++) {
-                stringBuilder.append(NEW_LINE);
-            }
-            for (int cellIndex = 0; cellIndex < row.size(); cellIndex++) {
-                fillCell(stringBuilder, row.get(cellIndex), cellIndex, columnMaxWidthMapping);
-            }
-        }
+    createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
 
+    for (List<String> row : rowsList) {
+      for (int i = 0; i < rowHeight; i++) {
         stringBuilder.append(NEW_LINE);
-        createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
-        stringBuilder.append(NEW_LINE);
-        stringBuilder.append(NEW_LINE);
-
-        return stringBuilder.toString();
+      }
+      for (int cellIndex = 0; cellIndex < row.size(); cellIndex++) {
+        fillCell(stringBuilder, row.get(cellIndex), cellIndex, columnMaxWidthMapping);
+      }
     }
 
-    private static void fillSpace(StringBuilder stringBuilder, int length) {
-        for (int i = 0; i < length; i++) {
-            stringBuilder.append(" ");
-        }
-    }
+    stringBuilder.append(NEW_LINE);
+    createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+    stringBuilder.append(NEW_LINE);
+    stringBuilder.append(NEW_LINE);
 
-    /** Add a rowLine at the beginning, the middle between headersList and rowLists, the end of the summary table. */
-    private static void createRowLine(StringBuilder stringBuilder,int headersListSize, Map<Integer,Integer> columnMaxWidthMapping) {
-        for (int i = 0; i < headersListSize; i++) {
-            if(i == 0) {
-                stringBuilder.append(TABLE_JOINT_SYMBOL);
-            }
-
-            for (int j = 0; j < columnMaxWidthMapping.get(i) + PADDING_SIZE * 2 ; j++) {
-                stringBuilder.append(TABLE_H_SPLIT_SYMBOL);
-            }
-            stringBuilder.append(TABLE_JOINT_SYMBOL);
-        }
-    }
+    return stringBuilder.toString();
+  }
 
-    /** Get the width of the summary table. */
-    private static Map<Integer,Integer> getMaximumWidthofTable(List<String> headersList, List<List<String>> rowsList) {
-        Map<Integer,Integer> columnMaxWidthMapping = new HashMap<>();
+  private static void fillSpace(StringBuilder stringBuilder, int length) {
+    for (int i = 0; i < length; i++) {
+      stringBuilder.append(" ");
+    }
+  }
+
+  /**
+   * Add a rowLine at the beginning, the middle between headersList and rowLists, the end of the
+   * summary table.
+   */
+  private static void createRowLine(
+      StringBuilder stringBuilder,
+      int headersListSize,
+      Map<Integer, Integer> columnMaxWidthMapping) {
+    for (int i = 0; i < headersListSize; i++) {
+      if (i == 0) {
+        stringBuilder.append(TABLE_JOINT_SYMBOL);
+      }
+
+      int columnMaxWidth = Optional.ofNullable(columnMaxWidthMapping.get(i)).orElse(0);
+      for (int j = 0; j < columnMaxWidth + PADDING_SIZE * 2; j++) {
+        stringBuilder.append(TABLE_H_SPLIT_SYMBOL);
+      }
+      stringBuilder.append(TABLE_JOINT_SYMBOL);
+    }
+  }
 
-        for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
-            columnMaxWidthMapping.put(columnIndex, 0);
-        }
+  /** Get the width of the summary table. */
+  private static Map<Integer, Integer> getMaximumWidthofTable(
+      List<String> headersList, List<List<String>> rowsList) {
+    Map<Integer, Integer> columnMaxWidthMapping = new HashMap<>();
 
-        for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
-            if(headersList.get(columnIndex).length() > columnMaxWidthMapping.get(columnIndex)) {
-                columnMaxWidthMapping.put(columnIndex, headersList.get(columnIndex).length());
-            }
-        }
+    for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+      columnMaxWidthMapping.put(columnIndex, 0);
+    }
 
-        for (List<String> row : rowsList) {
-            for (int columnIndex = 0; columnIndex < row.size(); columnIndex++) {
-                if(row.get(columnIndex).length() > columnMaxWidthMapping.get(columnIndex)) {
-                    columnMaxWidthMapping.put(columnIndex, row.get(columnIndex).length());
-                }
-            }
-        }
+    for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+      Integer columnMaxWidth =
+          Optional.ofNullable(columnMaxWidthMapping.get(columnIndex)).orElse(0);
+      if (headersList.get(columnIndex).length() > columnMaxWidth) {
+        columnMaxWidthMapping.put(columnIndex, headersList.get(columnIndex).length());
+      }
+    }
 
-        for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
-            if(columnMaxWidthMapping.get(columnIndex) % 2 != 0) {
-                columnMaxWidthMapping.put(columnIndex, columnMaxWidthMapping.get(columnIndex) + 1);
-            }
+    for (List<String> row : rowsList) {
+      for (int columnIndex = 0; columnIndex < row.size(); columnIndex++) {
+        Integer columnMaxWidth =
+            Optional.ofNullable(columnMaxWidthMapping.get(columnIndex)).orElse(0);
+        if (row.get(columnIndex).length() > columnMaxWidth) {
+          columnMaxWidthMapping.put(columnIndex, row.get(columnIndex).length());
         }
-
-        return columnMaxWidthMapping;
+      }
     }
 
-    private static int getOptimumCellPadding(int cellIndex,int datalength,Map<Integer,Integer> columnMaxWidthMapping,int cellPaddingSize) {
-        if(datalength % 2 != 0) {
-            datalength++;
-        }
+    for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+      int columnMaxWidth = Optional.ofNullable(columnMaxWidthMapping.get(columnIndex)).orElse(0);
+      if (columnMaxWidth % 2 != 0) {
+        columnMaxWidthMapping.put(columnIndex, columnMaxWidth + 1);
+      }
+    }
 
-        if(datalength < columnMaxWidthMapping.get(cellIndex)) {
-            cellPaddingSize = cellPaddingSize + (columnMaxWidthMapping.get(cellIndex) - datalength) / 2;
-        }
+    return columnMaxWidthMapping;
+  }
 
-        return cellPaddingSize;
+  private static int getOptimumCellPadding(
+      int cellIndex,
+      int datalength,
+      Map<Integer, Integer> columnMaxWidthMapping,
+      int cellPaddingSize) {
+    if (datalength % 2 != 0) {
+      datalength++;
     }
 
-    /** Use space to fill a single cell with optimum cell padding size. */
-    private static void fillCell(StringBuilder stringBuilder,String cell,int cellIndex,Map<Integer,Integer> columnMaxWidthMapping) {
+    int columnMaxWidth = Optional.ofNullable(columnMaxWidthMapping.get(cellIndex)).orElse(0);
+    if (datalength < columnMaxWidth) {
+      cellPaddingSize = cellPaddingSize + (columnMaxWidth - datalength) / 2;
+    }
 
-        int cellPaddingSize = getOptimumCellPadding(cellIndex, cell.length(), columnMaxWidthMapping, PADDING_SIZE);
+    return cellPaddingSize;
+  }
 
-        if(cellIndex == 0) {
-            stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
-        }
+  /** Use space to fill a single cell with optimum cell padding size. */
+  private static void fillCell(
+      StringBuilder stringBuilder,
+      String cell,
+      int cellIndex,
+      Map<Integer, Integer> columnMaxWidthMapping) {
 
-        fillSpace(stringBuilder, cellPaddingSize);
-        stringBuilder.append(cell);
-        if(cell.length() % 2 != 0) {
-            stringBuilder.append(" ");
-        }
+    int cellPaddingSize =
+        getOptimumCellPadding(cellIndex, cell.length(), columnMaxWidthMapping, PADDING_SIZE);
 
-        fillSpace(stringBuilder, cellPaddingSize);
+    if (cellIndex == 0) {
+      stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+    }
 
-        stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+    fillSpace(stringBuilder, cellPaddingSize);
+    stringBuilder.append(cell);
+    if (cell.length() % 2 != 0) {
+      stringBuilder.append(" ");
     }
+
+    fillSpace(stringBuilder, cellPaddingSize);
+
+    stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
index 420386c..3a2371d 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
@@ -17,96 +17,127 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.FileNameUtils;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-
 import java.io.File;
-import java.io.FileReader;
+import java.net.URL;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.ArrayList;
-
+import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.FileNameUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
 
 /**
- * TableSchemaJSONLoader can get all table's names from resource/schemas directory and parse a table's schema into a string.
+ * TableSchemaJSONLoader can get all table's names from resource/schemas directory and parse a
+ * table's schema into a string.
  */
 public class TableSchemaJSONLoader {
-    /**
-     * Read a table schema json file from resource/schemas directory, parse the file into a string which can be utilized by BeamSqlEnv.executeDdl method.
-     * @param tableName The name of the json file to be read (fo example: item, store_sales).
-     * @return A string that matches the format in BeamSqlEnv.executeDdl method, such as "d_date_sk bigint, d_date_id varchar"
-     * @throws Exception
-     */
-    public static String parseTableSchema(String tableName) throws Exception {
-        String tableFilePath = Objects.requireNonNull(TableSchemaJSONLoader.class.getClassLoader().getResource("schemas/" + tableName +".json")).getPath();
+  public static String readQuery(String tableName) throws Exception {
+    String path = "schemas/" + tableName + ".json";
+    String fixture = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
+    return fixture;
+  }
 
-        JSONObject jsonObject = (JSONObject) new JSONParser().parse(new FileReader(new File(tableFilePath)));
-        JSONArray jsonArray = (JSONArray) jsonObject.get("schema");
+  /**
+   * Read a table schema json file from resource/schemas directory, parse the file into a string
+   * which can be utilized by BeamSqlEnv.executeDdl method.
+   *
+   * @param tableName The name of the json file to be read (fo example: item, store_sales).
+   * @return A string that matches the format in BeamSqlEnv.executeDdl method, such as "d_date_sk
+   *     bigint, d_date_id varchar"
+   * @throws Exception
+   */
+  @SuppressWarnings({"rawtypes", "DefaultCharset"})
+  public static String parseTableSchema(String tableName) throws Exception {
+    String path = "schemas/" + tableName + ".json";
+    String schema = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
+    System.out.println("schema = " + schema);
 
-        // Iterate each element in jsonArray to construct the schema string
-        StringBuilder schemaStringBuilder = new StringBuilder();
+    JSONObject jsonObject = (JSONObject) new JSONParser().parse(schema);
+    JSONArray jsonArray = (JSONArray) jsonObject.get("schema");
+    if (jsonArray == null) {
+      throw new RuntimeException("Can't get Json array for \"schema\" key.");
+    }
 
-        Iterator jsonArrIterator = jsonArray.iterator();
-        Iterator<Map.Entry> recordIterator;
-        while (jsonArrIterator.hasNext()) {
-            recordIterator = ((Map) jsonArrIterator.next()).entrySet().iterator();
-            while (recordIterator.hasNext()) {
-                Map.Entry pair = recordIterator.next();
+    // Iterate each element in jsonArray to construct the schema string
+    StringBuilder schemaStringBuilder = new StringBuilder();
 
-                if (pair.getKey().equals("type")) {
-                    // If the key of the pair is "type", make some modification before appending it to the schemaStringBuilder, then append a comma.
-                    String typeName = (String) pair.getValue();
-                    if (typeName.toLowerCase().equals("identifier") || typeName.toLowerCase().equals("integer")) {
-                        // Use long type to represent int, prevent overflow
-                        schemaStringBuilder.append("bigint");
-                    } else if (typeName.contains("decimal")) {
-                        // Currently Beam SQL doesn't handle "decimal" type properly, use "double" to replace it for now.
-                        schemaStringBuilder.append("double");
-                    } else {
-                        // Currently Beam SQL doesn't handle "date" type properly, use "varchar" replace it for now.
-                        schemaStringBuilder.append("varchar");
-                    }
-                    schemaStringBuilder.append(',');
-                } else {
-                    // If the key of the pair is "name", directly append it to the StringBuilder, then append a space.
-                    schemaStringBuilder.append((pair.getValue()));
-                    schemaStringBuilder.append(' ');
-                }
-            }
-        }
+    Iterator jsonArrIterator = jsonArray.iterator();
+    Iterator<Map.Entry> recordIterator;
+    while (jsonArrIterator.hasNext()) {
+      recordIterator = ((Map) jsonArrIterator.next()).entrySet().iterator();
+      while (recordIterator.hasNext()) {
+        Map.Entry pair = recordIterator.next();
 
-        // Delete the last ',' in schema string
-        if (schemaStringBuilder.length() > 0) {
-            schemaStringBuilder.deleteCharAt(schemaStringBuilder.length() - 1);
+        if (pair.getKey().equals("type")) {
+          // If the key of the pair is "type", make some modification before appending it to the
+          // schemaStringBuilder, then append a comma.
+          String typeName = (String) pair.getValue();
+          if (typeName.toLowerCase().equals("identifier")
+              || typeName.toLowerCase().equals("integer")) {
+            // Use long type to represent int, prevent overflow
+            schemaStringBuilder.append("bigint");
+          } else if (typeName.contains("decimal")) {
+            // Currently Beam SQL doesn't handle "decimal" type properly, use "double" to replace it
+            // for now.
+            schemaStringBuilder.append("double");
+          } else {
+            // Currently Beam SQL doesn't handle "date" type properly, use "varchar" replace it for
+            // now.
+            schemaStringBuilder.append("varchar");
+          }
+          schemaStringBuilder.append(',');
+        } else {
+          // If the key of the pair is "name", directly append it to the StringBuilder, then append
+          // a space.
+          schemaStringBuilder.append((pair.getValue()));
+          schemaStringBuilder.append(' ');
         }
+      }
+    }
 
-        String schemaString = schemaStringBuilder.toString();
-
-        return schemaString;
+    // Delete the last ',' in schema string
+    if (schemaStringBuilder.length() > 0) {
+      schemaStringBuilder.deleteCharAt(schemaStringBuilder.length() - 1);
     }
 
-    /**
-     * Get all tables' names. Tables are stored in resource/schemas directory in the form of json files, such as "item.json", "store_sales.json", they'll be converted to "item", "store_sales".
-     * @return The list of names of all tables.
-     */
-    public static List<String> getAllTableNames() {
-        String tableDirPath = Objects.requireNonNull(TableSchemaJSONLoader.class.getClassLoader().getResource("schemas")).getPath();
-        File tableDir = new File(tableDirPath);
-        File[] tableDirListing = tableDir.listFiles();
+    String schemaString = schemaStringBuilder.toString();
 
-        List<String> tableNames = new ArrayList<>();
+    return schemaString;
+  }
 
-        if (tableDirListing != null) {
-            for (File file : tableDirListing) {
-                // Remove the .json extension in file name
-                tableNames.add(FileNameUtils.getBaseName((file.getName())));
-            }
-        }
+  /**
+   * Get all tables' names. Tables are stored in resource/schemas directory in the form of json
+   * files, such as "item.json", "store_sales.json", they'll be converted to "item", "store_sales".
+   *
+   * @return The list of names of all tables.
+   */
+  public static List<String> getAllTableNames() {
+    ClassLoader classLoader = TableSchemaJSONLoader.class.getClassLoader();
+    if (classLoader == null) {
+      throw new RuntimeException("Can't get classloader from TableSchemaJSONLoader.");
+    }
+    URL resource = classLoader.getResource("schemas");
+    if (resource == null) {
+      throw new RuntimeException("Resource for \"schemas\" can't be null.");
+    }
+    String tableDirPath = Objects.requireNonNull(resource).getPath();
+    File tableDir = new File(tableDirPath);
+    File[] tableDirListing = tableDir.listFiles();
 
-        return tableNames;
+    List<String> tableNames = new ArrayList<>();
+
+    if (tableDirListing != null) {
+      for (File file : tableDirListing) {
+        // Remove the .json extension in file name
+        tableNames.add(FileNameUtils.getBaseName((file.getName())));
+      }
     }
+
+    return tableNames;
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
index 1c567dd..c693dfd 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
@@ -21,22 +21,24 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 
-/** Options used to configure TPC-DS test */
+/** Options used to configure TPC-DS test. */
 public interface TpcdsOptions extends PipelineOptions {
-    @Description("The size of TPC-DS data to run query on, user input should contain the unit, such as '1G', '10G'")
-    String getDataSize();
+  @Description(
+      "The size of TPC-DS data to run query on, user input should contain the unit, such as '1G', '10G'")
+  String getDataSize();
 
-    void setDataSize(String dataSize);
+  void setDataSize(String dataSize);
 
-    // Set the return type to be String since reading from the command line (user input will be like "1,2,55" which represent TPC-DS query1, query3, query55)
-    @Description("The queries numbers, read user input as string, numbers separated by commas")
-    String getQueries();
+  // Set the return type to be String since reading from the command line (user input will be like
+  // "1,2,55" which represent TPC-DS query1, query3, query55)
+  @Description("The queries numbers, read user input as string, numbers separated by commas")
+  String getQueries();
 
-    void setQueries(String queries);
+  void setQueries(String queries);
 
-    @Description("The number of queries to run in parallel")
-    @Default.Integer(1)
-    Integer getTpcParallel();
+  @Description("The number of queries to run in parallel")
+  @Default.Integer(1)
+  Integer getTpcParallel();
 
-    void setTpcParallel(Integer parallelism);
+  void setTpcParallel(Integer parallelism);
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
index d1ddc9d..40f4f86 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
@@ -24,10 +24,10 @@ import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.Immutabl
 
 /** {@link AutoService} registrar for {@link TpcdsOptions}. */
 @AutoService(PipelineOptionsRegistrar.class)
-public class TpcdsOptionsRegistrar implements PipelineOptionsRegistrar{
+public class TpcdsOptionsRegistrar implements PipelineOptionsRegistrar {
 
-    @Override
-    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-        return ImmutableList.of(TpcdsOptions.class);
-    }
+  @Override
+  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+    return ImmutableList.of(TpcdsOptions.class);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
index c281341..8928292 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
@@ -22,86 +22,88 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-/**
- * Get and check the TpcdsOptions' parameters, throw exceptions when user input is invalid
- */
+/** Get and check the TpcdsOptions' parameters, throw exceptions when user input is invalid. */
 public class TpcdsParametersReader {
-    /** The data sizes that have been supported. */
-    private static final Set<String> supportedDataSizes = Stream.of("1G", "10G", "100G").collect(Collectors.toCollection(HashSet::new));
-
-    /**
-     * Get and check dataSize entered by user. This dataSize has to have been supported.
-     *
-     * @param tpcdsOptions TpcdsOptions object constructed from user input
-     * @return The dateSize user entered, if it is contained in supportedDataSizes set.
-     * @throws Exception
-     */
-    public static String getAndCheckDataSize(TpcdsOptions tpcdsOptions) throws Exception {
-        String dataSize = tpcdsOptions.getDataSize();
+  /** The data sizes that have been supported. */
+  private static final Set<String> supportedDataSizes =
+      Stream.of("1G", "10G", "100G").collect(Collectors.toCollection(HashSet::new));
 
-        if (!supportedDataSizes.contains(dataSize)) {
-            throw new Exception("The data size you entered has not been supported.");
-        }
+  /**
+   * Get and check dataSize entered by user. This dataSize has to have been supported.
+   *
+   * @param tpcdsOptions TpcdsOptions object constructed from user input
+   * @return The dateSize user entered, if it is contained in supportedDataSizes set.
+   * @throws Exception
+   */
+  public static String getAndCheckDataSize(TpcdsOptions tpcdsOptions) throws Exception {
+    String dataSize = tpcdsOptions.getDataSize();
 
-        return dataSize;
+    if (!supportedDataSizes.contains(dataSize)) {
+      throw new Exception("The data size you entered has not been supported.");
     }
 
-    /**
-     * Get and check queries entered by user. This has to be a string of numbers separated by commas or "all" which means run all 99 queiries.
-     * All query numbers have to be between 1 and 99.
-     *
-     * @param tpcdsOptions TpcdsOptions object constructed from user input
-     * @return An array of query names, for example "1,2,7" will be output as "query1,query2,query7"
-     * @throws Exception
-     */
-    public static String[] getAndCheckQueryNameArray(TpcdsOptions tpcdsOptions) throws Exception {
-        String queryNums = tpcdsOptions.getQueries();
+    return dataSize;
+  }
 
-        String[] queryNumArr;
-        if (queryNums.toLowerCase().equals("all")) {
-            // All 99 TPC-DS queries need to be executed.
-            queryNumArr = new String[99];
-            for (int i = 0; i < 99; i++) {
-                queryNumArr[i] = Integer.toString(i + 1);
-            }
-        } else {
-            // Split user input queryNums by spaces and commas, get an array of all query numbers.
-            queryNumArr = queryNums.split("[\\s,]+");
+  /**
+   * Get and check queries entered by user. This has to be a string of numbers separated by commas
+   * or "all" which means run all 99 queiries. All query numbers have to be between 1 and 99.
+   *
+   * @param tpcdsOptions TpcdsOptions object constructed from user input
+   * @return An array of query names, for example "1,2,7" will be output as "query1,query2,query7"
+   * @throws Exception
+   */
+  public static String[] getAndCheckQueryNameArray(TpcdsOptions tpcdsOptions) throws Exception {
+    String queryNums = tpcdsOptions.getQueries();
 
-            for (String queryNumStr : queryNumArr) {
-                try {
-                    int queryNum = Integer.parseInt(queryNumStr);
-                    if (queryNum < 1 || queryNum > 99) {
-                        throw new Exception("The queries you entered contains invalid query number, please provide integers between 1 and 99.");
-                    }
-                } catch (NumberFormatException e) {
-                    System.out.println("The queries you entered should be integers, please provide integers between 1 and 99.");
-                }
-            }
-        }
+    String[] queryNumArr;
+    if (queryNums.toLowerCase().equals("all")) {
+      // All 99 TPC-DS queries need to be executed.
+      queryNumArr = new String[99];
+      for (int i = 0; i < 99; i++) {
+        queryNumArr[i] = Integer.toString(i + 1);
+      }
+    } else {
+      // Split user input queryNums by spaces and commas, get an array of all query numbers.
+      queryNumArr = queryNums.split("[\\s,]+");
 
-        String[] queryNameArr = new String[queryNumArr.length];
-        for (int i = 0; i < queryNumArr.length; i++) {
-            queryNameArr[i] = "query" + queryNumArr[i];
+      for (String queryNumStr : queryNumArr) {
+        try {
+          int queryNum = Integer.parseInt(queryNumStr);
+          if (queryNum < 1 || queryNum > 99) {
+            throw new Exception(
+                "The queries you entered contains invalid query number, please provide integers between 1 and 99.");
+          }
+        } catch (NumberFormatException e) {
+          System.out.println(
+              "The queries you entered should be integers, please provide integers between 1 and 99.");
         }
+      }
+    }
 
-        return queryNameArr;
+    String[] queryNameArr = new String[queryNumArr.length];
+    for (int i = 0; i < queryNumArr.length; i++) {
+      queryNameArr[i] = "query" + queryNumArr[i];
     }
 
-    /**
-     * Get and check TpcParallel entered by user. This has to be an integer between 1 and 99.
-     *
-     * @param tpcdsOptions TpcdsOptions object constructed from user input.
-     * @return The TpcParallel user entered.
-     * @throws Exception
-     */
-    public static int getAndCheckTpcParallel(TpcdsOptions tpcdsOptions) throws Exception {
-        int nThreads = tpcdsOptions.getTpcParallel();
+    return queryNameArr;
+  }
 
-        if (nThreads < 1 || nThreads > 99) {
-            throw new Exception("The TpcParallel your entered is invalid, please provide an integer between 1 and 99.");
-        }
+  /**
+   * Get and check TpcParallel entered by user. This has to be an integer between 1 and 99.
+   *
+   * @param tpcdsOptions TpcdsOptions object constructed from user input.
+   * @return The TpcParallel user entered.
+   * @throws Exception
+   */
+  public static int getAndCheckTpcParallel(TpcdsOptions tpcdsOptions) throws Exception {
+    int nThreads = tpcdsOptions.getTpcParallel();
 
-        return nThreads;
+    if (nThreads < 1 || nThreads > 99) {
+      throw new Exception(
+          "The TpcParallel your entered is invalid, please provide an integer between 1 and 99.");
     }
+
+    return nThreads;
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
index 1070a88..4b80aa8 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
@@ -17,40 +17,42 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import java.util.concurrent.Callable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
-import java.util.concurrent.Callable;
 
-/**
- * To fulfill multi-threaded execution
- */
+/** To fulfill multi-threaded execution. */
 public class TpcdsRun implements Callable<TpcdsRunResult> {
-    private final Pipeline pipeline;
+  private final Pipeline pipeline;
 
-    public TpcdsRun (Pipeline pipeline) {
-        this.pipeline = pipeline;
-    }
-
-    @Override
-    public TpcdsRunResult call() {
-        TpcdsRunResult tpcdsRunResult;
+  public TpcdsRun(Pipeline pipeline) {
+    this.pipeline = pipeline;
+  }
 
-        try {
-            PipelineResult pipelineResult = pipeline.run();
-            long startTimeStamp = System.currentTimeMillis();
-            State state = pipelineResult.waitUntilFinish();
-            long endTimeStamp = System.currentTimeMillis();
+  @Override
+  public TpcdsRunResult call() {
+    TpcdsRunResult tpcdsRunResult;
 
-            // Make sure to set the job status to be successful only when pipelineResult's final state is DONE.
-            boolean isSuccessful = state == State.DONE;
-            tpcdsRunResult = new TpcdsRunResult(isSuccessful, startTimeStamp, endTimeStamp, pipeline.getOptions(), pipelineResult);
-        } catch (Exception e) {
-            // If the pipeline execution failed, return a result with failed status but don't interrupt other threads.
-            e.printStackTrace();
-            tpcdsRunResult = new TpcdsRunResult(false, 0, 0, pipeline.getOptions(), null);
-        }
+    try {
+      PipelineResult pipelineResult = pipeline.run();
+      long startTimeStamp = System.currentTimeMillis();
+      State state = pipelineResult.waitUntilFinish();
+      long endTimeStamp = System.currentTimeMillis();
 
-        return tpcdsRunResult;
+      // Make sure to set the job status to be successful only when pipelineResult's final state is
+      // DONE.
+      boolean isSuccessful = state == State.DONE;
+      tpcdsRunResult =
+          new TpcdsRunResult(
+              isSuccessful, startTimeStamp, endTimeStamp, pipeline.getOptions(), pipelineResult);
+    } catch (Exception e) {
+      // If the pipeline execution failed, return a result with failed status but don't interrupt
+      // other threads.
+      e.printStackTrace();
+      tpcdsRunResult = new TpcdsRunResult(false, 0, 0, pipeline.getOptions(), null);
     }
+
+    return tpcdsRunResult;
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
index 0e22dce..c89a34a 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
@@ -17,76 +17,89 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import java.sql.Timestamp;
+import java.util.Date;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import java.sql.Timestamp;
-import java.util.Date;
-
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 public class TpcdsRunResult {
-    private boolean isSuccessful;
-    private long startTime;
-    private long endTime;
-    private PipelineOptions pipelineOptions;
-    private PipelineResult pipelineResult;
+  private final boolean isSuccessful;
+  private final long startTime;
+  private final long endTime;
+  private final PipelineOptions pipelineOptions;
+  private final @Nullable PipelineResult pipelineResult;
 
-    public TpcdsRunResult(boolean isSuccessful, long startTime, long endTime, PipelineOptions pipelineOptions, PipelineResult pipelineResult) {
-        this.isSuccessful = isSuccessful;
-        this.startTime = startTime;
-        this.endTime = endTime;
-        this.pipelineOptions = pipelineOptions;
-        this.pipelineResult = pipelineResult;
-    }
+  public TpcdsRunResult(
+      boolean isSuccessful,
+      long startTime,
+      long endTime,
+      PipelineOptions pipelineOptions,
+      @Nullable PipelineResult pipelineResult) {
+    this.isSuccessful = isSuccessful;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.pipelineOptions = pipelineOptions;
+    this.pipelineResult = pipelineResult;
+  }
 
-    public boolean getIsSuccessful() { return isSuccessful; }
+  public boolean getIsSuccessful() {
+    return isSuccessful;
+  }
 
-    public Date getStartDate() {
-        Timestamp startTimeStamp = new Timestamp(startTime);
-        Date startDate = new Date(startTimeStamp.getTime());
-        return startDate;
-    }
+  public Date getStartDate() {
+    Timestamp startTimeStamp = new Timestamp(startTime);
+    Date startDate = new Date(startTimeStamp.getTime());
+    return startDate;
+  }
 
-    public Date getEndDate() {
-        Timestamp endTimeStamp = new Timestamp(endTime);
-        Date endDate = new Date(endTimeStamp.getTime());
-        return endDate;
-    }
+  public Date getEndDate() {
+    Timestamp endTimeStamp = new Timestamp(endTime);
+    Date endDate = new Date(endTimeStamp.getTime());
+    return endDate;
+  }
 
-    public double getElapsedTime() {
-        return (endTime - startTime) / 1000.0;
-    }
+  public double getElapsedTime() {
+    return (endTime - startTime) / 1000.0;
+  }
 
-    public PipelineOptions getPipelineOptions() { return pipelineOptions; }
+  public PipelineOptions getPipelineOptions() {
+    return pipelineOptions;
+  }
 
-    public PipelineResult getPipelineResult() { return pipelineResult; }
+  public @Nullable PipelineResult getPipelineResult() {
+    return pipelineResult;
+  }
 
-    public String getJobName() {
-        PipelineOptions pipelineOptions = getPipelineOptions();
-        return pipelineOptions.getJobName();
-    }
+  public String getJobName() {
+    PipelineOptions pipelineOptions = getPipelineOptions();
+    return pipelineOptions.getJobName();
+  }
 
-    public String getQueryName() {
-        String jobName = getJobName();
-        int endIndex = jobName.indexOf("result");
-        String queryName = jobName.substring(0, endIndex);
-        return queryName;
-    }
+  public String getQueryName() {
+    String jobName = getJobName();
+    int endIndex = jobName.indexOf("result");
+    String queryName = jobName.substring(0, endIndex);
+    return queryName;
+  }
 
-    public String getDataSize() throws Exception {
-        PipelineOptions pipelineOptions = getPipelineOptions();
-        return TpcdsParametersReader.getAndCheckDataSize(pipelineOptions.as(TpcdsOptions.class));
-    }
+  public String getDataSize() throws Exception {
+    PipelineOptions pipelineOptions = getPipelineOptions();
+    return TpcdsParametersReader.getAndCheckDataSize(pipelineOptions.as(TpcdsOptions.class));
+  }
 
-    public String getDialect() throws Exception {
-        PipelineOptions pipelineOptions = getPipelineOptions();
-        String queryPlannerClassName = pipelineOptions.as(BeamSqlPipelineOptions.class).getPlannerName();
-        String dialect;
-        if (queryPlannerClassName.equals("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")) {
-            dialect = "ZetaSQL";
-        } else {
-            dialect = "Calcite";
-        }
-        return dialect;
+  public String getDialect() throws Exception {
+    PipelineOptions pipelineOptions = getPipelineOptions();
+    String queryPlannerClassName =
+        pipelineOptions.as(BeamSqlPipelineOptions.class).getPlannerName();
+    String dialect;
+    if (queryPlannerClassName.equals(
+        "org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")) {
+      dialect = "ZetaSQL";
+    } else {
+      dialect = "Calcite";
     }
+    return dialect;
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java
index b776551..7f3d874 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java
@@ -17,656 +17,706 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
 public class TpcdsSchemas {
-    /**
-     * Get all tpcds table schemas automatically by reading json files.
-     * In this case all field will be nullable, this is a bit different from the tpcds specification, but doesn't affect query execution.
-     *
-     * @return A map of all tpcds table schemas with their table names as keys.
-     * @throws Exception
-     */
-    public static Map<String, Schema> getTpcdsSchemas() throws Exception {
-        Map<String, Schema> schemaMap = new HashMap<>();
-
-        List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
-        for (String tableName : tableNames) {
-            Schema.Builder schemaBuilder = Schema.builder();
-
-            String tableSchemaString = TableSchemaJSONLoader.parseTableSchema(tableName);
-            String[] nameTypePairs = tableSchemaString.split(",");
-
-            for (String nameTypePairString : nameTypePairs) {
-                String[] nameTypePair = nameTypePairString.split("\\s+");
-                String name = nameTypePair[0];
-                String type = nameTypePair[1];
-
-                Schema.FieldType fieldType;
-                if (type.equals("bigint")) {
-                    fieldType = Schema.FieldType.INT64;
-                } else if (type.equals("double")) {
-                    fieldType = Schema.FieldType.DOUBLE;
-                } else {
-                    fieldType = Schema.FieldType.STRING;
-                }
-
-                schemaBuilder.addNullableField(name, fieldType);
-            }
-
-            Schema tableSchema = schemaBuilder.build();
-            schemaMap.put(tableName,tableSchema);
+  /**
+   * Get all tpcds table schemas automatically by reading json files. In this case all field will be
+   * nullable, this is a bit different from the tpcds specification, but doesn't affect query
+   * execution.
+   *
+   * @return A map of all tpcds table schemas with their table names as keys.
+   * @throws Exception
+   */
+  @SuppressWarnings("StringSplitter")
+  public static Map<String, Schema> getTpcdsSchemas() throws Exception {
+    Map<String, Schema> schemaMap = new HashMap<>();
+
+    List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
+    for (String tableName : tableNames) {
+      Schema.Builder schemaBuilder = Schema.builder();
+
+      String tableSchemaString = TableSchemaJSONLoader.parseTableSchema(tableName);
+      String[] nameTypePairs = tableSchemaString.split(",");
+
+      for (String nameTypePairString : nameTypePairs) {
+        String[] nameTypePair = nameTypePairString.split("\\s+");
+        String name = nameTypePair[0];
+        String type = nameTypePair[1];
+
+        Schema.FieldType fieldType;
+        if (type.equals("bigint")) {
+          fieldType = Schema.FieldType.INT64;
+        } else if (type.equals("double")) {
+          fieldType = Schema.FieldType.DOUBLE;
+        } else {
+          fieldType = Schema.FieldType.STRING;
         }
-        return schemaMap;
-    }
 
-    /**
-     * Get all tpcds table schemas according to tpcds official specification. Some fields are set to be not nullable.
-     *
-     * @return A map of all tpcds table schemas with their table names as keys.
-     */
-    public static Map<String, Schema> getTpcdsSchemasImmutableMap() {
-        ImmutableMap<String, Schema> immutableSchemaMap =
-                ImmutableMap.<String, Schema> builder()
-                        .put("call_center", callCenterSchema)
-                        .put("catalog_page", catalogPageSchema)
-                        .put("catalog_returns", catalogReturnsSchema)
-                        .put("catalog_sales", catalogSalesSchema)
-                        .put("customer", customerSchema)
-                        .put("customer_address", customerAddressSchema)
-                        .put("customer_demographics", customerDemographicsSchema)
-                        .put("date_dim", dateDimSchema)
-                        .put("household_demographics", householdDemographicsSchema)
-                        .put("income_band", incomeBandSchema)
-                        .put("inventory", inventorySchema)
-                        .put("item", itemSchema)
-                        .put("promotion", promotionSchema)
-                        .put("reason", reasonSchema)
-                        .put("ship_mode", shipModeSchema)
-                        .put("store", storeSchema)
-                        .put("store_returns", storeReturnsSchema)
-                        .put("store_sales", storeSalesSchema)
-                        .put("time_dim", timeDimSchema)
-                        .put("warehouse", warehouseSchema)
-                        .put("web_page", webPageSchema)
-                        .put("web_returns", webReturnsSchema)
-                        .put("web_sales", webSalesSchema)
-                        .put("web_site", webSiteSchema)
-                        .build();
-        return immutableSchemaMap;
-    }
+        schemaBuilder.addNullableField(name, fieldType);
+      }
 
-    public static Schema getCallCenterSchema() { return callCenterSchema; }
-
-    public static Schema getCatalogPageSchema() { return catalogPageSchema; }
-
-    public static Schema getCatalogReturnsSchema() { return catalogReturnsSchema; }
-
-    public static Schema getCatalogSalesSchema() { return catalogSalesSchema; }
-
-    public static Schema getCustomerSchema() { return customerSchema; }
-
-    public static Schema getCustomerAddressSchema() { return customerAddressSchema; }
-
-    public static Schema getCustomerDemographicsSchema() { return customerDemographicsSchema; }
-
-    public static Schema getDateDimSchema() { return dateDimSchema; }
-
-    public static Schema getHouseholdDemographicsSchema() { return householdDemographicsSchema; }
-
-    public static Schema getIncomeBandSchema() { return incomeBandSchema; }
-
-    public static Schema getInventorySchema() { return inventorySchema; }
-
-    public static Schema getItemSchema() { return itemSchema; }
-
-    public static Schema getPromotionSchema() { return promotionSchema; }
-
-    public static Schema getReasonSchema() { return reasonSchema; }
-
-    public static Schema getShipModeSchema() { return shipModeSchema; }
-
-    public static Schema getStoreSchema() { return storeSchema; }
-
-    public static Schema getStoreReturnsSchema() { return storeReturnsSchema; }
-
-    public static Schema getStoreSalesSchema() { return storeSalesSchema; }
-
-    public static Schema getTimeDimSchema() { return timeDimSchema; }
-
-    public static Schema getWarehouseSchema() { return warehouseSchema; }
-
-    public static Schema getWebpageSchema() { return webPageSchema; }
-
-    public static Schema getWebReturnsSchema() { return webReturnsSchema; }
-
-    public static Schema getWebSalesSchema() { return webSalesSchema; }
-
-    public static Schema getWebSiteSchema() { return webSiteSchema; }
-
-    private static Schema callCenterSchema =
-            Schema.builder()
-                    .addField("cc_call_center_sk", Schema.FieldType.INT64)
-                    .addField("cc_call_center_id", Schema.FieldType.STRING)
-                    .addNullableField("cc_rec_start_date", Schema.FieldType.STRING)
-                    .addNullableField("cc_rec_end_date", Schema.FieldType.STRING)
-                    .addNullableField("cc_closed_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cc_open_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cc_name", Schema.FieldType.STRING)
-                    .addNullableField("cc_class", Schema.FieldType.STRING)
-                    .addNullableField("cc_employees", Schema.FieldType.INT64)
-                    .addNullableField("cc_sq_ft", Schema.FieldType.INT64)
-                    .addNullableField("cc_hours", Schema.FieldType.STRING)
-                    .addNullableField("cc_manager", Schema.FieldType.STRING)
-                    .addNullableField("cc_mkt_id", Schema.FieldType.INT64)
-                    .addNullableField("cc_mkt_class", Schema.FieldType.STRING)
-                    .addNullableField("cc_mkt_desc", Schema.FieldType.STRING)
-                    .addNullableField("cc_market_manager", Schema.FieldType.STRING)
-                    .addNullableField("cc_division", Schema.FieldType.INT64)
-                    .addNullableField("cc_division_name", Schema.FieldType.STRING)
-                    .addNullableField("cc_company", Schema.FieldType.INT64)
-                    .addNullableField("cc_company_name", Schema.FieldType.STRING)
-                    .addNullableField("cc_street_number", Schema.FieldType.STRING)
-                    .addNullableField("cc_street_name", Schema.FieldType.STRING)
-                    .addNullableField("cc_street_type", Schema.FieldType.STRING)
-                    .addNullableField("cc_suite_number", Schema.FieldType.STRING)
-                    .addNullableField("cc_city", Schema.FieldType.STRING)
-                    .addNullableField("cc_county", Schema.FieldType.STRING)
-                    .addNullableField("cc_state", Schema.FieldType.STRING)
-                    .addNullableField("cc_zip", Schema.FieldType.STRING)
-                    .addNullableField("cc_country", Schema.FieldType.STRING)
-                    .addNullableField("cc_gmt_offset", Schema.FieldType.DOUBLE)
-                    .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema catalogPageSchema =
-            Schema.builder()
-                    .addField("cp_catalog_page_sk", Schema.FieldType.INT64)
-                    .addField("cp_catalog_page_id", Schema.FieldType.STRING)
-                    .addNullableField("cp_start_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cp_end_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cp_department", Schema.FieldType.STRING)
-                    .addNullableField("cp_catalog_number", Schema.FieldType.INT64)
-                    .addNullableField("cp_catalog_page_number", Schema.FieldType.INT64)
-                    .addNullableField("cp_description", Schema.FieldType.STRING)
-                    .addNullableField("cp_type", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema catalogReturnsSchema =
-            Schema.builder()
-                    .addNullableField("cr_returned_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_returned_time_sk", Schema.FieldType.INT64)
-                    .addField("cr_item_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_refunded_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_refunded_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_refunded_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_refunded_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_returning_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_returning_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_returning_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_returning_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_call_center_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_catalog_page_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_ship_mode_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_warehouse_sk", Schema.FieldType.INT64)
-                    .addNullableField("cr_reason_sk", Schema.FieldType.INT64)
-                    .addField("cr_order_number", Schema.FieldType.INT64)
-                    .addNullableField("cr_return_quantity", Schema.FieldType.INT64)
-                    .addNullableField("cr_return_amount", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_return_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_fee", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_return_ship_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_refunded_cash", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_reversed_charge", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_store_credit", Schema.FieldType.DOUBLE)
-                    .addNullableField("cr_net_loss", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema catalogSalesSchema =
-            Schema.builder()
-                    .addNullableField("cs_sold_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_sold_time_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_ship_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_bill_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_bill_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_bill_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_bill_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_ship_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_ship_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_ship_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_ship_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_call_center_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_catalog_page_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_ship_mode_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_warehouse_sk", Schema.FieldType.INT64)
-                    .addField("cs_item_sk", Schema.FieldType.INT64)
-                    .addNullableField("cs_promo_sk", Schema.FieldType.INT64)
-                    .addField("cs_order_number", Schema.FieldType.INT64)
-                    .addNullableField("cs_quantity", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_list_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_sales_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_ext_discount_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_ext_sales_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_ext_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_ext_list_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_ext_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_coupon_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_ext_ship_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_net_paid", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_net_paid_inc_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_net_paid_inc_ship", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_net_paid_inc_ship_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("cs_net_profit", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema customerSchema =
-            Schema.builder()
-                    .addField("c_customer_sk", Schema.FieldType.INT64)
-                    .addField("c_customer_id", Schema.FieldType.STRING)
-                    .addNullableField("c_current_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("c_current_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("c_current_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("c_first_shipto_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("c_first_sales_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("c_salutation", Schema.FieldType.STRING)
-                    .addNullableField("c_first_name", Schema.FieldType.STRING)
-                    .addNullableField("c_last_name", Schema.FieldType.STRING)
-                    .addNullableField("c_preferred_cust_flag", Schema.FieldType.STRING)
-                    .addNullableField("c_birth_day", Schema.FieldType.INT64)
-                    .addNullableField("c_birth_month", Schema.FieldType.INT64)
-                    .addNullableField("c_birth_year", Schema.FieldType.INT64)
-                    .addNullableField("c_birth_country", Schema.FieldType.STRING)
-                    .addNullableField("c_login", Schema.FieldType.STRING)
-                    .addNullableField("c_email_address", Schema.FieldType.STRING)
-                    .addNullableField("c_last_review_date_sk", Schema.FieldType.INT64)
-                    .build();
-
-    private static Schema customerAddressSchema =
-            Schema.builder()
-                    .addField("ca_address_sk", Schema.FieldType.INT64)
-                    .addField("ca_address_id", Schema.FieldType.STRING)
-                    .addNullableField("ca_street_number", Schema.FieldType.STRING)
-                    .addNullableField("ca_street_name", Schema.FieldType.STRING)
-                    .addNullableField("ca_street_type", Schema.FieldType.STRING)
-                    .addNullableField("ca_suite_number", Schema.FieldType.STRING)
-                    .addNullableField("ca_city", Schema.FieldType.STRING)
-                    .addNullableField("ca_county", Schema.FieldType.STRING)
-                    .addNullableField("ca_state", Schema.FieldType.STRING)
-                    .addNullableField("ca_zip", Schema.FieldType.STRING)
-                    .addNullableField("ca_country", Schema.FieldType.STRING)
-                    .addNullableField("ca_gmt_offset", Schema.FieldType.DOUBLE)
-                    .addNullableField("ca_location_type", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema customerDemographicsSchema =
-            Schema.builder()
-                    .addField("cd_demo_sk", Schema.FieldType.INT64)
-                    .addNullableField("cd_gender", Schema.FieldType.STRING)
-                    .addNullableField("cd_marital_status", Schema.FieldType.STRING)
-                    .addNullableField("cd_education_status", Schema.FieldType.STRING)
-                    .addNullableField("cd_purchase_estimate", Schema.FieldType.INT64)
-                    .addNullableField("cd_credit_rating", Schema.FieldType.STRING)
-                    .addNullableField("cd_dep_count", Schema.FieldType.INT64)
-                    .addNullableField("cd_dep_employed_count", Schema.FieldType.INT64)
-                    .addNullableField("cd_dep_college_count", Schema.FieldType.INT64)
-                    .build();
-
-    private static Schema dateDimSchema =
-            Schema.builder()
-                    .addField("d_date_sk", Schema.FieldType.INT64)
-                    .addField("d_date_id", Schema.FieldType.STRING)
-                    .addNullableField("d_date", Schema.FieldType.STRING)
-                    .addNullableField("d_month_seq", Schema.FieldType.INT64)
-                    .addNullableField("d_week_seq", Schema.FieldType.INT64)
-                    .addNullableField("d_quarter_seq", Schema.FieldType.INT64)
-                    .addNullableField("d_year", Schema.FieldType.INT64)
-                    .addNullableField("d_dow", Schema.FieldType.INT64)
-                    .addNullableField("d_moy", Schema.FieldType.INT64)
-                    .addNullableField("d_dom", Schema.FieldType.INT64)
-                    .addNullableField("d_qoy", Schema.FieldType.INT64)
-                    .addNullableField("d_fy_year", Schema.FieldType.INT64)
-                    .addNullableField("d_fy_quarter_seq", Schema.FieldType.INT64)
-                    .addNullableField("d_fy_week_seq", Schema.FieldType.INT64)
-                    .addNullableField("d_day_name", Schema.FieldType.STRING)
-                    .addNullableField("d_quarter_name", Schema.FieldType.STRING)
-                    .addNullableField("d_holiday", Schema.FieldType.STRING)
-                    .addNullableField("d_weekend", Schema.FieldType.STRING)
-                    .addNullableField("d_following_holiday", Schema.FieldType.STRING)
-                    .addNullableField("d_first_dom", Schema.FieldType.INT64)
-                    .addNullableField("d_last_dom", Schema.FieldType.INT64)
-                    .addNullableField("d_same_day_ly", Schema.FieldType.INT64)
-                    .addNullableField("d_same_day_lq", Schema.FieldType.INT64)
-                    .addNullableField("d_current_day", Schema.FieldType.STRING)
-                    .addNullableField("d_current_week", Schema.FieldType.STRING)
-                    .addNullableField("d_current_month", Schema.FieldType.STRING)
-                    .addNullableField("d_current_quarter", Schema.FieldType.STRING)
-                    .addNullableField("d_current_year", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema householdDemographicsSchema =
-            Schema.builder()
-                    .addField("hd_demo_sk", Schema.FieldType.INT64)
-                    .addNullableField("hd_income_band_sk", Schema.FieldType.INT64)
-                    .addNullableField("hd_buy_potential", Schema.FieldType.STRING)
-                    .addNullableField("hd_dep_count", Schema.FieldType.INT64)
-                    .addNullableField("hd_vehicle_count", Schema.FieldType.INT64)
-                    .build();
-
-    private static Schema incomeBandSchema =
-            Schema.builder()
-                    .addField("ib_income_band_sk", Schema.FieldType.INT64)
-                    .addNullableField("ib_lower_bound", Schema.FieldType.INT64)
-                    .addNullableField("ib_upper_bound", Schema.FieldType.INT64)
-                    .build();
-
-    private static Schema inventorySchema =
-            Schema.builder()
-                    .addField("inv_date_sk", Schema.FieldType.INT32)
-                    .addField("inv_item_sk", Schema.FieldType.INT32)
-                    .addField("inv_warehouse_sk", Schema.FieldType.INT32)
-                    .addNullableField("inv_quantity_on_hand", Schema.FieldType.INT32)
-                    .build();
-
-    private static Schema itemSchema =
-            Schema.builder()
-                    .addField("i_item_sk", Schema.FieldType.INT64)
-                    .addField("i_item_id", Schema.FieldType.STRING)
-                    .addNullableField("i_rec_start_date", Schema.FieldType.STRING)
-                    .addNullableField("i_rec_end_date", Schema.FieldType.STRING)
-                    .addNullableField("i_item_desc", Schema.FieldType.STRING)
-                    .addNullableField("i_current_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("i_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("i_brand_id", Schema.FieldType.INT64)
-                    .addNullableField("i_brand", Schema.FieldType.STRING)
-                    .addNullableField("i_class_id", Schema.FieldType.INT64)
-                    .addNullableField("i_class", Schema.FieldType.STRING)
-                    .addNullableField("i_category_id", Schema.FieldType.INT64)
-                    .addNullableField("i_category", Schema.FieldType.STRING)
-                    .addNullableField("i_manufact_id", Schema.FieldType.INT64)
-                    .addNullableField("i_manufact", Schema.FieldType.STRING)
-                    .addNullableField("i_size", Schema.FieldType.STRING)
-                    .addNullableField("i_formulation", Schema.FieldType.STRING)
-                    .addNullableField("i_color", Schema.FieldType.STRING)
-                    .addNullableField("i_units", Schema.FieldType.STRING)
-                    .addNullableField("i_container", Schema.FieldType.STRING)
-                    .addNullableField("i_manager_id", Schema.FieldType.INT64)
-                    .addNullableField("i_product_name", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema promotionSchema =
-            Schema.builder()
-                    .addField("p_promo_sk", Schema.FieldType.INT64)
-                    .addField("p_promo_id", Schema.FieldType.STRING)
-                    .addNullableField("p_start_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("p_end_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("p_item_sk", Schema.FieldType.INT64)
-                    .addNullableField("p_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("p_response_target", Schema.FieldType.INT64)
-                    .addNullableField("p_promo_name", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_dmail", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_email", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_catalog", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_tv", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_radio", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_press", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_event", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_demo", Schema.FieldType.STRING)
-                    .addNullableField("p_channel_details", Schema.FieldType.STRING)
-                    .addNullableField("p_purpose", Schema.FieldType.STRING)
-                    .addNullableField("p_discount_active", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema reasonSchema =
-            Schema.builder()
-                    .addField("r_reason_sk", Schema.FieldType.INT64)
-                    .addField("r_reason_id", Schema.FieldType.STRING)
-                    .addNullableField("r_reason_desc", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema shipModeSchema =
-            Schema.builder()
-                    .addField("sm_ship_mode_sk", Schema.FieldType.INT64)
-                    .addField("sm_ship_mode_id", Schema.FieldType.STRING)
-                    .addNullableField("sm_type", Schema.FieldType.STRING)
-                    .addNullableField("sm_code", Schema.FieldType.STRING)
-                    .addNullableField("sm_carrier", Schema.FieldType.STRING)
-                    .addNullableField("sm_contract", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema storeSchema =
-            Schema.builder()
-                    .addField("s_store_sk", Schema.FieldType.INT64)
-                    .addField("s_store_id", Schema.FieldType.STRING)
-                    .addNullableField("s_rec_start_date", Schema.FieldType.STRING)
-                    .addNullableField("s_rec_end_date", Schema.FieldType.STRING)
-                    .addNullableField("s_closed_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("s_store_name", Schema.FieldType.STRING)
-                    .addNullableField("s_number_employees", Schema.FieldType.INT64)
-                    .addNullableField("s_floor_space", Schema.FieldType.INT64)
-                    .addNullableField("s_hours", Schema.FieldType.STRING)
-                    .addNullableField("S_manager", Schema.FieldType.STRING)
-                    .addNullableField("S_market_id", Schema.FieldType.INT64)
-                    .addNullableField("S_geography_class", Schema.FieldType.STRING)
-                    .addNullableField("S_market_desc", Schema.FieldType.STRING)
-                    .addNullableField("s_market_manager", Schema.FieldType.STRING)
-                    .addNullableField("s_division_id", Schema.FieldType.INT64)
-                    .addNullableField("s_division_name", Schema.FieldType.STRING)
-                    .addNullableField("s_company_id", Schema.FieldType.INT64)
-                    .addNullableField("s_company_name", Schema.FieldType.STRING)
-                    .addNullableField("s_street_number", Schema.FieldType.STRING)
-                    .addNullableField("s_street_name", Schema.FieldType.STRING)
-                    .addNullableField("s_street_type", Schema.FieldType.STRING)
-                    .addNullableField("s_suite_number", Schema.FieldType.STRING)
-                    .addNullableField("s_city", Schema.FieldType.STRING)
-                    .addNullableField("s_county", Schema.FieldType.STRING)
-                    .addNullableField("s_state", Schema.FieldType.STRING)
-                    .addNullableField("s_zip", Schema.FieldType.STRING)
-                    .addNullableField("s_country", Schema.FieldType.STRING)
-                    .addNullableField("s_gmt_offset", Schema.FieldType.DOUBLE)
-                    .addNullableField("s_tax_percentage", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema storeReturnsSchema =
-            Schema.builder()
-                    .addNullableField("sr_returned_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_return_time_sk", Schema.FieldType.INT64)
-                    .addField("sr_item_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_store_sk", Schema.FieldType.INT64)
-                    .addNullableField("sr_reason_sk", Schema.FieldType.INT64)
-                    .addField("sr_ticket_number", Schema.FieldType.INT64)
-                    .addNullableField("sr_return_quantity", Schema.FieldType.INT64)
-                    .addNullableField("sr_return_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_return_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_fee", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_return_ship_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_refunded_cash", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_reversed_charge", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_store_credit", Schema.FieldType.DOUBLE)
-                    .addNullableField("sr_net_loss", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema storeSalesSchema =
-            Schema.builder()
-                    .addNullableField("ss_sold_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_sold_time_sk", Schema.FieldType.INT64)
-                    .addField("ss_item_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_store_sk", Schema.FieldType.INT64)
-                    .addNullableField("ss_promo_sk", Schema.FieldType.INT64)
-                    .addField("ss_ticket_number", Schema.FieldType.INT64)
-                    .addNullableField("ss_quantity", Schema.FieldType.INT64)
-                    .addNullableField("ss_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_list_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_sales_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_ext_discount_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_ext_sales_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_ext_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_ext_list_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_ext_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_coupon_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_net_paid", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_net_paid_inc_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("ss_net_profit", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema timeDimSchema =
-            Schema.builder()
-                    .addField("t_time_sk", Schema.FieldType.INT64)
-                    .addField("t_time_id", Schema.FieldType.STRING)
-                    .addNullableField("t_time", Schema.FieldType.INT64)
-                    .addNullableField("t_hour", Schema.FieldType.INT64)
-                    .addNullableField("t_minute", Schema.FieldType.INT64)
-                    .addNullableField("t_second", Schema.FieldType.INT64)
-                    .addNullableField("t_am_pm", Schema.FieldType.STRING)
-                    .addNullableField("t_shift", Schema.FieldType.STRING)
-                    .addNullableField("t_sub_shift", Schema.FieldType.STRING)
-                    .addNullableField("t_meal_time", Schema.FieldType.STRING)
-                    .build();
-
-    private static Schema warehouseSchema =
-            Schema.builder()
-                    .addField("w_warehouse_sk", Schema.FieldType.INT64)
-                    .addField("w_warehouse_id", Schema.FieldType.STRING)
-                    .addNullableField("w_warehouse_name", Schema.FieldType.STRING)
-                    .addNullableField("w_warehouse_sq_ft", Schema.FieldType.INT64)
-                    .addNullableField("w_street_number", Schema.FieldType.STRING)
-                    .addNullableField("w_street_name", Schema.FieldType.STRING)
-                    .addNullableField("w_street_type", Schema.FieldType.STRING)
-                    .addNullableField("w_suite_number", Schema.FieldType.STRING)
-                    .addNullableField("w_city", Schema.FieldType.STRING)
-                    .addNullableField("w_county", Schema.FieldType.STRING)
-                    .addNullableField("w_state", Schema.FieldType.STRING)
-                    .addNullableField("w_zip", Schema.FieldType.STRING)
-                    .addNullableField("w_country", Schema.FieldType.STRING)
-                    .addNullableField("w_gmt_offset", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema webPageSchema =
-            Schema.builder()
-                    .addField("wp_web_page_sk", Schema.FieldType.INT64)
-                    .addField("wp_web_page_id", Schema.FieldType.STRING)
-                    .addNullableField("wp_rec_start_date", Schema.FieldType.STRING)
-                    .addNullableField("wp_rec_end_date", Schema.FieldType.STRING)
-                    .addNullableField("wp_creation_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("wp_access_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("wp_autogen_flag", Schema.FieldType.STRING)
-                    .addNullableField("wp_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("wp_url", Schema.FieldType.STRING)
-                    .addNullableField("wp_type", Schema.FieldType.STRING)
-                    .addNullableField("wp_char_count", Schema.FieldType.INT64)
-                    .addNullableField("wp_link_count", Schema.FieldType.INT64)
-                    .addNullableField("wp_image_count", Schema.FieldType.INT64)
-                    .addNullableField("wp_max_ad_count", Schema.FieldType.INT64)
-                    .build();
-
-    private static Schema webReturnsSchema =
-            Schema.builder()
-                    .addNullableField("wr_returned_date_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_returned_time_sk", Schema.FieldType.INT64)
-                    .addField("wr_item_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_refunded_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_refunded_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_refunded_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_refunded_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_returning_customer_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_returning_cdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_returning_hdemo_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_returning_addr_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_web_page_sk", Schema.FieldType.INT64)
-                    .addNullableField("wr_reason_sk", Schema.FieldType.INT64)
-                    .addField("wr_order_number", Schema.FieldType.INT64)
-                    .addNullableField("wr_return_quantity", Schema.FieldType.INT64)
-                    .addNullableField("wr_return_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_return_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_fee", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_return_ship_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_refunded_cash", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_reversed_charge", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_account_credit", Schema.FieldType.DOUBLE)
-                    .addNullableField("wr_net_loss", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema webSalesSchema =
-            Schema.builder()
-                    .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_ship_date_sk", Schema.FieldType.INT32)
-                    .addField("ws_item_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_bill_customer_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_bill_cdemo_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_bill_hdemo_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_bill_addr_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_ship_customer_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_ship_cdemo_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_ship_hdemo_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_ship_addr_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_web_page_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_web_site_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_ship_mode_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_warehouse_sk", Schema.FieldType.INT32)
-                    .addNullableField("ws_promo_sk", Schema.FieldType.INT32)
-                    .addField("ws_order_number", Schema.FieldType.INT64)
-                    .addNullableField("ws_quantity", Schema.FieldType.INT32)
-                    .addNullableField("ws_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_list_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_sales_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_ext_discount_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_ext_sales_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_ext_wholesale_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_ext_list_price", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_ext_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_coupon_amt", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_ext_ship_cost", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_net_paid", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_net_paid_inc_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_net_paid_inc_ship", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_net_paid_inc_ship_tax", Schema.FieldType.DOUBLE)
-                    .addNullableField("ws_net_profit", Schema.FieldType.DOUBLE)
-                    .build();
-
-    private static Schema webSiteSchema =
-            Schema.builder()
-                    .addField("web_site_sk", Schema.FieldType.STRING)
-                    .addField("web_site_id", Schema.FieldType.STRING)
-                    .addNullableField("web_rec_start_date", Schema.FieldType.STRING)
-                    .addNullableField("web_rec_end_date", Schema.FieldType.STRING)
-                    .addNullableField("web_name", Schema.FieldType.STRING)
-                    .addNullableField("web_open_date_sk", Schema.FieldType.INT32)
-                    .addNullableField("web_close_date_sk", Schema.FieldType.INT32)
-                    .addNullableField("web_class", Schema.FieldType.STRING)
-                    .addNullableField("web_manager", Schema.FieldType.STRING)
-                    .addNullableField("web_mkt_id", Schema.FieldType.INT32)
-                    .addNullableField("web_mkt_class", Schema.FieldType.STRING)
-                    .addNullableField("web_mkt_desc", Schema.FieldType.STRING)
-                    .addNullableField("web_market_manager", Schema.FieldType.STRING)
-                    .addNullableField("web_company_id", Schema.FieldType.INT32)
-                    .addNullableField("web_company_name", Schema.FieldType.STRING)
-                    .addNullableField("web_street_number", Schema.FieldType.STRING)
-                    .addNullableField("web_street_name", Schema.FieldType.STRING)
-                    .addNullableField("web_street_type", Schema.FieldType.STRING)
-                    .addNullableField("web_suite_number", Schema.FieldType.STRING)
-                    .addNullableField("web_city", Schema.FieldType.STRING)
-                    .addNullableField("web_county", Schema.FieldType.STRING)
-                    .addNullableField("web_state", Schema.FieldType.STRING)
-                    .addNullableField("web_zip", Schema.FieldType.STRING)
-                    .addNullableField("web_country", Schema.FieldType.STRING)
-                    .addNullableField("web_gmt_offset", Schema.FieldType.DOUBLE)
-                    .addNullableField("web_tax_percentage", Schema.FieldType.DOUBLE)
-                    .build();
+      Schema tableSchema = schemaBuilder.build();
+      schemaMap.put(tableName, tableSchema);
+    }
+    return schemaMap;
+  }
+
+  /**
+   * Get all tpcds table schemas according to tpcds official specification. Some fields are set to
+   * be not nullable.
+   *
+   * @return A map of all tpcds table schemas with their table names as keys.
+   */
+  public static Map<String, Schema> getTpcdsSchemasImmutableMap() {
+    ImmutableMap<String, Schema> immutableSchemaMap =
+        ImmutableMap.<String, Schema>builder()
+            .put("call_center", callCenterSchema)
+            .put("catalog_page", catalogPageSchema)
+            .put("catalog_returns", catalogReturnsSchema)
+            .put("catalog_sales", catalogSalesSchema)
+            .put("customer", customerSchema)
+            .put("customer_address", customerAddressSchema)
+            .put("customer_demographics", customerDemographicsSchema)
+            .put("date_dim", dateDimSchema)
+            .put("household_demographics", householdDemographicsSchema)
+            .put("income_band", incomeBandSchema)
+            .put("inventory", inventorySchema)
+            .put("item", itemSchema)
+            .put("promotion", promotionSchema)
+            .put("reason", reasonSchema)
+            .put("ship_mode", shipModeSchema)
+            .put("store", storeSchema)
+            .put("store_returns", storeReturnsSchema)
+            .put("store_sales", storeSalesSchema)
+            .put("time_dim", timeDimSchema)
+            .put("warehouse", warehouseSchema)
+            .put("web_page", webPageSchema)
+            .put("web_returns", webReturnsSchema)
+            .put("web_sales", webSalesSchema)
+            .put("web_site", webSiteSchema)
+            .build();
+    return immutableSchemaMap;
+  }
+
+  public static Schema getCallCenterSchema() {
+    return callCenterSchema;
+  }
+
+  public static Schema getCatalogPageSchema() {
+    return catalogPageSchema;
+  }
+
+  public static Schema getCatalogReturnsSchema() {
+    return catalogReturnsSchema;
+  }
+
+  public static Schema getCatalogSalesSchema() {
+    return catalogSalesSchema;
+  }
+
+  public static Schema getCustomerSchema() {
+    return customerSchema;
+  }
+
+  public static Schema getCustomerAddressSchema() {
+    return customerAddressSchema;
+  }
+
+  public static Schema getCustomerDemographicsSchema() {
+    return customerDemographicsSchema;
+  }
+
+  public static Schema getDateDimSchema() {
+    return dateDimSchema;
+  }
+
+  public static Schema getHouseholdDemographicsSchema() {
+    return householdDemographicsSchema;
+  }
+
+  public static Schema getIncomeBandSchema() {
+    return incomeBandSchema;
+  }
+
+  public static Schema getInventorySchema() {
+    return inventorySchema;
+  }
+
+  public static Schema getItemSchema() {
+    return itemSchema;
+  }
+
+  public static Schema getPromotionSchema() {
+    return promotionSchema;
+  }
+
+  public static Schema getReasonSchema() {
+    return reasonSchema;
+  }
+
+  public static Schema getShipModeSchema() {
+    return shipModeSchema;
+  }
+
+  public static Schema getStoreSchema() {
+    return storeSchema;
+  }
+
+  public static Schema getStoreReturnsSchema() {
+    return storeReturnsSchema;
+  }
+
+  public static Schema getStoreSalesSchema() {
+    return storeSalesSchema;
+  }
+
+  public static Schema getTimeDimSchema() {
+    return timeDimSchema;
+  }
+
+  public static Schema getWarehouseSchema() {
+    return warehouseSchema;
+  }
+
+  public static Schema getWebpageSchema() {
+    return webPageSchema;
+  }
+
+  public static Schema getWebReturnsSchema() {
+    return webReturnsSchema;
+  }
+
+  public static Schema getWebSalesSchema() {
+    return webSalesSchema;
+  }
+
+  public static Schema getWebSiteSchema() {
+    return webSiteSchema;
+  }
+
+  private static Schema callCenterSchema =
+      Schema.builder()
+          .addField("cc_call_center_sk", Schema.FieldType.INT64)
+          .addField("cc_call_center_id", Schema.FieldType.STRING)
+          .addNullableField("cc_rec_start_date", Schema.FieldType.STRING)
+          .addNullableField("cc_rec_end_date", Schema.FieldType.STRING)
+          .addNullableField("cc_closed_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cc_open_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cc_name", Schema.FieldType.STRING)
+          .addNullableField("cc_class", Schema.FieldType.STRING)
+          .addNullableField("cc_employees", Schema.FieldType.INT64)
+          .addNullableField("cc_sq_ft", Schema.FieldType.INT64)
+          .addNullableField("cc_hours", Schema.FieldType.STRING)
+          .addNullableField("cc_manager", Schema.FieldType.STRING)
+          .addNullableField("cc_mkt_id", Schema.FieldType.INT64)
+          .addNullableField("cc_mkt_class", Schema.FieldType.STRING)
+          .addNullableField("cc_mkt_desc", Schema.FieldType.STRING)
+          .addNullableField("cc_market_manager", Schema.FieldType.STRING)
+          .addNullableField("cc_division", Schema.FieldType.INT64)
+          .addNullableField("cc_division_name", Schema.FieldType.STRING)
+          .addNullableField("cc_company", Schema.FieldType.INT64)
+          .addNullableField("cc_company_name", Schema.FieldType.STRING)
+          .addNullableField("cc_street_number", Schema.FieldType.STRING)
+          .addNullableField("cc_street_name", Schema.FieldType.STRING)
+          .addNullableField("cc_street_type", Schema.FieldType.STRING)
+          .addNullableField("cc_suite_number", Schema.FieldType.STRING)
+          .addNullableField("cc_city", Schema.FieldType.STRING)
+          .addNullableField("cc_county", Schema.FieldType.STRING)
+          .addNullableField("cc_state", Schema.FieldType.STRING)
+          .addNullableField("cc_zip", Schema.FieldType.STRING)
+          .addNullableField("cc_country", Schema.FieldType.STRING)
+          .addNullableField("cc_gmt_offset", Schema.FieldType.DOUBLE)
+          .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema catalogPageSchema =
+      Schema.builder()
+          .addField("cp_catalog_page_sk", Schema.FieldType.INT64)
+          .addField("cp_catalog_page_id", Schema.FieldType.STRING)
+          .addNullableField("cp_start_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cp_end_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cp_department", Schema.FieldType.STRING)
+          .addNullableField("cp_catalog_number", Schema.FieldType.INT64)
+          .addNullableField("cp_catalog_page_number", Schema.FieldType.INT64)
+          .addNullableField("cp_description", Schema.FieldType.STRING)
+          .addNullableField("cp_type", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema catalogReturnsSchema =
+      Schema.builder()
+          .addNullableField("cr_returned_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_returned_time_sk", Schema.FieldType.INT64)
+          .addField("cr_item_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_refunded_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_refunded_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_refunded_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_refunded_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_returning_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_returning_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_returning_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_returning_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_call_center_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_catalog_page_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_ship_mode_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_warehouse_sk", Schema.FieldType.INT64)
+          .addNullableField("cr_reason_sk", Schema.FieldType.INT64)
+          .addField("cr_order_number", Schema.FieldType.INT64)
+          .addNullableField("cr_return_quantity", Schema.FieldType.INT64)
+          .addNullableField("cr_return_amount", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_return_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_fee", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_return_ship_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_refunded_cash", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_reversed_charge", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_store_credit", Schema.FieldType.DOUBLE)
+          .addNullableField("cr_net_loss", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema catalogSalesSchema =
+      Schema.builder()
+          .addNullableField("cs_sold_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_sold_time_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_ship_date_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_bill_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_bill_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_bill_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_bill_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_ship_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_ship_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_ship_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_ship_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_call_center_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_catalog_page_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_ship_mode_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_warehouse_sk", Schema.FieldType.INT64)
+          .addField("cs_item_sk", Schema.FieldType.INT64)
+          .addNullableField("cs_promo_sk", Schema.FieldType.INT64)
+          .addField("cs_order_number", Schema.FieldType.INT64)
+          .addNullableField("cs_quantity", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_list_price", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_sales_price", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_ext_discount_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_ext_sales_price", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_ext_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_ext_list_price", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_ext_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_coupon_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_ext_ship_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_net_paid", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_net_paid_inc_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_net_paid_inc_ship", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_net_paid_inc_ship_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("cs_net_profit", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema customerSchema =
+      Schema.builder()
+          .addField("c_customer_sk", Schema.FieldType.INT64)
+          .addField("c_customer_id", Schema.FieldType.STRING)
+          .addNullableField("c_current_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("c_current_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("c_current_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("c_first_shipto_date_sk", Schema.FieldType.INT64)
+          .addNullableField("c_first_sales_date_sk", Schema.FieldType.INT64)
+          .addNullableField("c_salutation", Schema.FieldType.STRING)
+          .addNullableField("c_first_name", Schema.FieldType.STRING)
+          .addNullableField("c_last_name", Schema.FieldType.STRING)
+          .addNullableField("c_preferred_cust_flag", Schema.FieldType.STRING)
+          .addNullableField("c_birth_day", Schema.FieldType.INT64)
+          .addNullableField("c_birth_month", Schema.FieldType.INT64)
+          .addNullableField("c_birth_year", Schema.FieldType.INT64)
+          .addNullableField("c_birth_country", Schema.FieldType.STRING)
+          .addNullableField("c_login", Schema.FieldType.STRING)
+          .addNullableField("c_email_address", Schema.FieldType.STRING)
+          .addNullableField("c_last_review_date_sk", Schema.FieldType.INT64)
+          .build();
+
+  private static Schema customerAddressSchema =
+      Schema.builder()
+          .addField("ca_address_sk", Schema.FieldType.INT64)
+          .addField("ca_address_id", Schema.FieldType.STRING)
+          .addNullableField("ca_street_number", Schema.FieldType.STRING)
+          .addNullableField("ca_street_name", Schema.FieldType.STRING)
+          .addNullableField("ca_street_type", Schema.FieldType.STRING)
+          .addNullableField("ca_suite_number", Schema.FieldType.STRING)
+          .addNullableField("ca_city", Schema.FieldType.STRING)
+          .addNullableField("ca_county", Schema.FieldType.STRING)
+          .addNullableField("ca_state", Schema.FieldType.STRING)
+          .addNullableField("ca_zip", Schema.FieldType.STRING)
+          .addNullableField("ca_country", Schema.FieldType.STRING)
+          .addNullableField("ca_gmt_offset", Schema.FieldType.DOUBLE)
+          .addNullableField("ca_location_type", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema customerDemographicsSchema =
+      Schema.builder()
+          .addField("cd_demo_sk", Schema.FieldType.INT64)
+          .addNullableField("cd_gender", Schema.FieldType.STRING)
+          .addNullableField("cd_marital_status", Schema.FieldType.STRING)
+          .addNullableField("cd_education_status", Schema.FieldType.STRING)
+          .addNullableField("cd_purchase_estimate", Schema.FieldType.INT64)
+          .addNullableField("cd_credit_rating", Schema.FieldType.STRING)
+          .addNullableField("cd_dep_count", Schema.FieldType.INT64)
+          .addNullableField("cd_dep_employed_count", Schema.FieldType.INT64)
+          .addNullableField("cd_dep_college_count", Schema.FieldType.INT64)
+          .build();
+
+  private static Schema dateDimSchema =
+      Schema.builder()
+          .addField("d_date_sk", Schema.FieldType.INT64)
+          .addField("d_date_id", Schema.FieldType.STRING)
+          .addNullableField("d_date", Schema.FieldType.STRING)
+          .addNullableField("d_month_seq", Schema.FieldType.INT64)
+          .addNullableField("d_week_seq", Schema.FieldType.INT64)
+          .addNullableField("d_quarter_seq", Schema.FieldType.INT64)
+          .addNullableField("d_year", Schema.FieldType.INT64)
+          .addNullableField("d_dow", Schema.FieldType.INT64)
+          .addNullableField("d_moy", Schema.FieldType.INT64)
+          .addNullableField("d_dom", Schema.FieldType.INT64)
+          .addNullableField("d_qoy", Schema.FieldType.INT64)
+          .addNullableField("d_fy_year", Schema.FieldType.INT64)
+          .addNullableField("d_fy_quarter_seq", Schema.FieldType.INT64)
+          .addNullableField("d_fy_week_seq", Schema.FieldType.INT64)
+          .addNullableField("d_day_name", Schema.FieldType.STRING)
+          .addNullableField("d_quarter_name", Schema.FieldType.STRING)
+          .addNullableField("d_holiday", Schema.FieldType.STRING)
+          .addNullableField("d_weekend", Schema.FieldType.STRING)
+          .addNullableField("d_following_holiday", Schema.FieldType.STRING)
+          .addNullableField("d_first_dom", Schema.FieldType.INT64)
+          .addNullableField("d_last_dom", Schema.FieldType.INT64)
+          .addNullableField("d_same_day_ly", Schema.FieldType.INT64)
+          .addNullableField("d_same_day_lq", Schema.FieldType.INT64)
+          .addNullableField("d_current_day", Schema.FieldType.STRING)
+          .addNullableField("d_current_week", Schema.FieldType.STRING)
+          .addNullableField("d_current_month", Schema.FieldType.STRING)
+          .addNullableField("d_current_quarter", Schema.FieldType.STRING)
+          .addNullableField("d_current_year", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema householdDemographicsSchema =
+      Schema.builder()
+          .addField("hd_demo_sk", Schema.FieldType.INT64)
+          .addNullableField("hd_income_band_sk", Schema.FieldType.INT64)
+          .addNullableField("hd_buy_potential", Schema.FieldType.STRING)
+          .addNullableField("hd_dep_count", Schema.FieldType.INT64)
+          .addNullableField("hd_vehicle_count", Schema.FieldType.INT64)
+          .build();
+
+  private static Schema incomeBandSchema =
+      Schema.builder()
+          .addField("ib_income_band_sk", Schema.FieldType.INT64)
+          .addNullableField("ib_lower_bound", Schema.FieldType.INT64)
+          .addNullableField("ib_upper_bound", Schema.FieldType.INT64)
+          .build();
+
+  private static Schema inventorySchema =
+      Schema.builder()
+          .addField("inv_date_sk", Schema.FieldType.INT32)
+          .addField("inv_item_sk", Schema.FieldType.INT32)
+          .addField("inv_warehouse_sk", Schema.FieldType.INT32)
+          .addNullableField("inv_quantity_on_hand", Schema.FieldType.INT32)
+          .build();
+
+  private static Schema itemSchema =
+      Schema.builder()
+          .addField("i_item_sk", Schema.FieldType.INT64)
+          .addField("i_item_id", Schema.FieldType.STRING)
+          .addNullableField("i_rec_start_date", Schema.FieldType.STRING)
+          .addNullableField("i_rec_end_date", Schema.FieldType.STRING)
+          .addNullableField("i_item_desc", Schema.FieldType.STRING)
+          .addNullableField("i_current_price", Schema.FieldType.DOUBLE)
+          .addNullableField("i_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("i_brand_id", Schema.FieldType.INT64)
+          .addNullableField("i_brand", Schema.FieldType.STRING)
+          .addNullableField("i_class_id", Schema.FieldType.INT64)
+          .addNullableField("i_class", Schema.FieldType.STRING)
+          .addNullableField("i_category_id", Schema.FieldType.INT64)
+          .addNullableField("i_category", Schema.FieldType.STRING)
+          .addNullableField("i_manufact_id", Schema.FieldType.INT64)
+          .addNullableField("i_manufact", Schema.FieldType.STRING)
+          .addNullableField("i_size", Schema.FieldType.STRING)
+          .addNullableField("i_formulation", Schema.FieldType.STRING)
+          .addNullableField("i_color", Schema.FieldType.STRING)
+          .addNullableField("i_units", Schema.FieldType.STRING)
+          .addNullableField("i_container", Schema.FieldType.STRING)
+          .addNullableField("i_manager_id", Schema.FieldType.INT64)
+          .addNullableField("i_product_name", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema promotionSchema =
+      Schema.builder()
+          .addField("p_promo_sk", Schema.FieldType.INT64)
+          .addField("p_promo_id", Schema.FieldType.STRING)
+          .addNullableField("p_start_date_sk", Schema.FieldType.INT64)
+          .addNullableField("p_end_date_sk", Schema.FieldType.INT64)
+          .addNullableField("p_item_sk", Schema.FieldType.INT64)
+          .addNullableField("p_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("p_response_target", Schema.FieldType.INT64)
+          .addNullableField("p_promo_name", Schema.FieldType.STRING)
+          .addNullableField("p_channel_dmail", Schema.FieldType.STRING)
+          .addNullableField("p_channel_email", Schema.FieldType.STRING)
+          .addNullableField("p_channel_catalog", Schema.FieldType.STRING)
+          .addNullableField("p_channel_tv", Schema.FieldType.STRING)
+          .addNullableField("p_channel_radio", Schema.FieldType.STRING)
+          .addNullableField("p_channel_press", Schema.FieldType.STRING)
+          .addNullableField("p_channel_event", Schema.FieldType.STRING)
+          .addNullableField("p_channel_demo", Schema.FieldType.STRING)
+          .addNullableField("p_channel_details", Schema.FieldType.STRING)
+          .addNullableField("p_purpose", Schema.FieldType.STRING)
+          .addNullableField("p_discount_active", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema reasonSchema =
+      Schema.builder()
+          .addField("r_reason_sk", Schema.FieldType.INT64)
+          .addField("r_reason_id", Schema.FieldType.STRING)
+          .addNullableField("r_reason_desc", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema shipModeSchema =
+      Schema.builder()
+          .addField("sm_ship_mode_sk", Schema.FieldType.INT64)
+          .addField("sm_ship_mode_id", Schema.FieldType.STRING)
+          .addNullableField("sm_type", Schema.FieldType.STRING)
+          .addNullableField("sm_code", Schema.FieldType.STRING)
+          .addNullableField("sm_carrier", Schema.FieldType.STRING)
+          .addNullableField("sm_contract", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema storeSchema =
+      Schema.builder()
+          .addField("s_store_sk", Schema.FieldType.INT64)
+          .addField("s_store_id", Schema.FieldType.STRING)
+          .addNullableField("s_rec_start_date", Schema.FieldType.STRING)
+          .addNullableField("s_rec_end_date", Schema.FieldType.STRING)
+          .addNullableField("s_closed_date_sk", Schema.FieldType.INT64)
+          .addNullableField("s_store_name", Schema.FieldType.STRING)
+          .addNullableField("s_number_employees", Schema.FieldType.INT64)
+          .addNullableField("s_floor_space", Schema.FieldType.INT64)
+          .addNullableField("s_hours", Schema.FieldType.STRING)
+          .addNullableField("S_manager", Schema.FieldType.STRING)
+          .addNullableField("S_market_id", Schema.FieldType.INT64)
+          .addNullableField("S_geography_class", Schema.FieldType.STRING)
+          .addNullableField("S_market_desc", Schema.FieldType.STRING)
+          .addNullableField("s_market_manager", Schema.FieldType.STRING)
+          .addNullableField("s_division_id", Schema.FieldType.INT64)
+          .addNullableField("s_division_name", Schema.FieldType.STRING)
+          .addNullableField("s_company_id", Schema.FieldType.INT64)
+          .addNullableField("s_company_name", Schema.FieldType.STRING)
+          .addNullableField("s_street_number", Schema.FieldType.STRING)
+          .addNullableField("s_street_name", Schema.FieldType.STRING)
+          .addNullableField("s_street_type", Schema.FieldType.STRING)
+          .addNullableField("s_suite_number", Schema.FieldType.STRING)
+          .addNullableField("s_city", Schema.FieldType.STRING)
+          .addNullableField("s_county", Schema.FieldType.STRING)
+          .addNullableField("s_state", Schema.FieldType.STRING)
+          .addNullableField("s_zip", Schema.FieldType.STRING)
+          .addNullableField("s_country", Schema.FieldType.STRING)
+          .addNullableField("s_gmt_offset", Schema.FieldType.DOUBLE)
+          .addNullableField("s_tax_percentage", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema storeReturnsSchema =
+      Schema.builder()
+          .addNullableField("sr_returned_date_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_return_time_sk", Schema.FieldType.INT64)
+          .addField("sr_item_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_store_sk", Schema.FieldType.INT64)
+          .addNullableField("sr_reason_sk", Schema.FieldType.INT64)
+          .addField("sr_ticket_number", Schema.FieldType.INT64)
+          .addNullableField("sr_return_quantity", Schema.FieldType.INT64)
+          .addNullableField("sr_return_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_return_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_fee", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_return_ship_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_refunded_cash", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_reversed_charge", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_store_credit", Schema.FieldType.DOUBLE)
+          .addNullableField("sr_net_loss", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema storeSalesSchema =
+      Schema.builder()
+          .addNullableField("ss_sold_date_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_sold_time_sk", Schema.FieldType.INT64)
+          .addField("ss_item_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_store_sk", Schema.FieldType.INT64)
+          .addNullableField("ss_promo_sk", Schema.FieldType.INT64)
+          .addField("ss_ticket_number", Schema.FieldType.INT64)
+          .addNullableField("ss_quantity", Schema.FieldType.INT64)
+          .addNullableField("ss_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_list_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_sales_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_ext_discount_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_ext_sales_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_ext_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_ext_list_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_ext_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_coupon_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_net_paid", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_net_paid_inc_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("ss_net_profit", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema timeDimSchema =
+      Schema.builder()
+          .addField("t_time_sk", Schema.FieldType.INT64)
+          .addField("t_time_id", Schema.FieldType.STRING)
+          .addNullableField("t_time", Schema.FieldType.INT64)
+          .addNullableField("t_hour", Schema.FieldType.INT64)
+          .addNullableField("t_minute", Schema.FieldType.INT64)
+          .addNullableField("t_second", Schema.FieldType.INT64)
+          .addNullableField("t_am_pm", Schema.FieldType.STRING)
+          .addNullableField("t_shift", Schema.FieldType.STRING)
+          .addNullableField("t_sub_shift", Schema.FieldType.STRING)
+          .addNullableField("t_meal_time", Schema.FieldType.STRING)
+          .build();
+
+  private static Schema warehouseSchema =
+      Schema.builder()
+          .addField("w_warehouse_sk", Schema.FieldType.INT64)
+          .addField("w_warehouse_id", Schema.FieldType.STRING)
+          .addNullableField("w_warehouse_name", Schema.FieldType.STRING)
+          .addNullableField("w_warehouse_sq_ft", Schema.FieldType.INT64)
+          .addNullableField("w_street_number", Schema.FieldType.STRING)
+          .addNullableField("w_street_name", Schema.FieldType.STRING)
+          .addNullableField("w_street_type", Schema.FieldType.STRING)
+          .addNullableField("w_suite_number", Schema.FieldType.STRING)
+          .addNullableField("w_city", Schema.FieldType.STRING)
+          .addNullableField("w_county", Schema.FieldType.STRING)
+          .addNullableField("w_state", Schema.FieldType.STRING)
+          .addNullableField("w_zip", Schema.FieldType.STRING)
+          .addNullableField("w_country", Schema.FieldType.STRING)
+          .addNullableField("w_gmt_offset", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema webPageSchema =
+      Schema.builder()
+          .addField("wp_web_page_sk", Schema.FieldType.INT64)
+          .addField("wp_web_page_id", Schema.FieldType.STRING)
+          .addNullableField("wp_rec_start_date", Schema.FieldType.STRING)
+          .addNullableField("wp_rec_end_date", Schema.FieldType.STRING)
+          .addNullableField("wp_creation_date_sk", Schema.FieldType.INT64)
+          .addNullableField("wp_access_date_sk", Schema.FieldType.INT64)
+          .addNullableField("wp_autogen_flag", Schema.FieldType.STRING)
+          .addNullableField("wp_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("wp_url", Schema.FieldType.STRING)
+          .addNullableField("wp_type", Schema.FieldType.STRING)
+          .addNullableField("wp_char_count", Schema.FieldType.INT64)
+          .addNullableField("wp_link_count", Schema.FieldType.INT64)
+          .addNullableField("wp_image_count", Schema.FieldType.INT64)
+          .addNullableField("wp_max_ad_count", Schema.FieldType.INT64)
+          .build();
+
+  private static Schema webReturnsSchema =
+      Schema.builder()
+          .addNullableField("wr_returned_date_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_returned_time_sk", Schema.FieldType.INT64)
+          .addField("wr_item_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_refunded_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_refunded_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_refunded_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_refunded_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_returning_customer_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_returning_cdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_returning_hdemo_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_returning_addr_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_web_page_sk", Schema.FieldType.INT64)
+          .addNullableField("wr_reason_sk", Schema.FieldType.INT64)
+          .addField("wr_order_number", Schema.FieldType.INT64)
+          .addNullableField("wr_return_quantity", Schema.FieldType.INT64)
+          .addNullableField("wr_return_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_return_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_fee", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_return_ship_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_refunded_cash", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_reversed_charge", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_account_credit", Schema.FieldType.DOUBLE)
+          .addNullableField("wr_net_loss", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema webSalesSchema =
+      Schema.builder()
+          .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_ship_date_sk", Schema.FieldType.INT32)
+          .addField("ws_item_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_bill_customer_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_bill_cdemo_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_bill_hdemo_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_bill_addr_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_ship_customer_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_ship_cdemo_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_ship_hdemo_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_ship_addr_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_web_page_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_web_site_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_ship_mode_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_warehouse_sk", Schema.FieldType.INT32)
+          .addNullableField("ws_promo_sk", Schema.FieldType.INT32)
+          .addField("ws_order_number", Schema.FieldType.INT64)
+          .addNullableField("ws_quantity", Schema.FieldType.INT32)
+          .addNullableField("ws_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_list_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_sales_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_ext_discount_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_ext_sales_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_ext_wholesale_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_ext_list_price", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_ext_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_coupon_amt", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_ext_ship_cost", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_net_paid", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_net_paid_inc_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_net_paid_inc_ship", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_net_paid_inc_ship_tax", Schema.FieldType.DOUBLE)
+          .addNullableField("ws_net_profit", Schema.FieldType.DOUBLE)
+          .build();
+
+  private static Schema webSiteSchema =
+      Schema.builder()
+          .addField("web_site_sk", Schema.FieldType.STRING)
+          .addField("web_site_id", Schema.FieldType.STRING)
+          .addNullableField("web_rec_start_date", Schema.FieldType.STRING)
+          .addNullableField("web_rec_end_date", Schema.FieldType.STRING)
+          .addNullableField("web_name", Schema.FieldType.STRING)
+          .addNullableField("web_open_date_sk", Schema.FieldType.INT32)
+          .addNullableField("web_close_date_sk", Schema.FieldType.INT32)
+          .addNullableField("web_class", Schema.FieldType.STRING)
+          .addNullableField("web_manager", Schema.FieldType.STRING)
+          .addNullableField("web_mkt_id", Schema.FieldType.INT32)
+          .addNullableField("web_mkt_class", Schema.FieldType.STRING)
+          .addNullableField("web_mkt_desc", Schema.FieldType.STRING)
+          .addNullableField("web_market_manager", Schema.FieldType.STRING)
+          .addNullableField("web_company_id", Schema.FieldType.INT32)
+          .addNullableField("web_company_name", Schema.FieldType.STRING)
+          .addNullableField("web_street_number", Schema.FieldType.STRING)
+          .addNullableField("web_street_name", Schema.FieldType.STRING)
+          .addNullableField("web_street_type", Schema.FieldType.STRING)
+          .addNullableField("web_suite_number", Schema.FieldType.STRING)
+          .addNullableField("web_city", Schema.FieldType.STRING)
+          .addNullableField("web_county", Schema.FieldType.STRING)
+          .addNullableField("web_state", Schema.FieldType.STRING)
+          .addNullableField("web_zip", Schema.FieldType.STRING)
+          .addNullableField("web_country", Schema.FieldType.STRING)
+          .addNullableField("web_gmt_offset", Schema.FieldType.DOUBLE)
+          .addNullableField("web_tax_percentage", Schema.FieldType.DOUBLE)
+          .build();
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/package-info.java
similarity index 59%
copy from sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
copy to sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/package-info.java
index d1ddc9d..8c2f339 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/package-info.java
@@ -15,19 +15,5 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+/** TPC-DS test suite. */
 package org.apache.beam.sdk.tpcds;
-
-import com.google.auto.service.AutoService;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
-
-/** {@link AutoService} registrar for {@link TpcdsOptions}. */
-@AutoService(PipelineOptionsRegistrar.class)
-public class TpcdsOptionsRegistrar implements PipelineOptionsRegistrar{
-
-    @Override
-    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-        return ImmutableList.of(TpcdsOptions.class);
-    }
-}
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/QueryReaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/QueryReaderTest.java
index 5696410..42f7d5b 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/QueryReaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/QueryReaderTest.java
@@ -18,188 +18,193 @@
 package org.apache.beam.sdk.tpcds;
 
 import static org.junit.Assert.assertEquals;
+
 import org.junit.Test;
 
 public class QueryReaderTest {
-    private final String headers = "-- Licensed to the Apache Software Foundation (ASF) under one\n" +
-            "-- or more contributor license agreements.  See the NOTICE file\n" +
-            "-- distributed with this work for additional information\n" +
-            "-- regarding copyright ownership.  The ASF licenses this file\n" +
-            "-- to you under the Apache License, Version 2.0 (the\n" +
-            "-- \"License\"); you may not use this file except in compliance\n" +
-            "-- with the License.  You may obtain a copy of the License at\n" +
-            "--\n" +
-            "--     http://www.apache.org/licenses/LICENSE-2.0\n" +
-            "--\n" +
-            "-- Unless required by applicable law or agreed to in writing, software\n" +
-            "-- distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
-            "-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
-            "-- See the License for the specific language governing permissions and\n" +
-            "-- limitations under the License.\n";
+  private final String headers =
+      "-- Licensed to the Apache Software Foundation (ASF) under one\n"
+          + "-- or more contributor license agreements.  See the NOTICE file\n"
+          + "-- distributed with this work for additional information\n"
+          + "-- regarding copyright ownership.  The ASF licenses this file\n"
+          + "-- to you under the Apache License, Version 2.0 (the\n"
+          + "-- \"License\"); you may not use this file except in compliance\n"
+          + "-- with the License.  You may obtain a copy of the License at\n"
+          + "--\n"
+          + "--     http://www.apache.org/licenses/LICENSE-2.0\n"
+          + "--\n"
+          + "-- Unless required by applicable law or agreed to in writing, software\n"
+          + "-- distributed under the License is distributed on an \"AS IS\" BASIS,\n"
+          + "-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n"
+          + "-- See the License for the specific language governing permissions and\n"
+          + "-- limitations under the License.\n";
 
-    @Test
-    public void testQuery3String() throws Exception {
-        String query3String = QueryReader.readQuery("query3");
-        String expected = "select  dt.d_year \n" +
-                "       ,item.i_brand_id brand_id \n" +
-                "       ,item.i_brand brand\n" +
-                "       ,sum(ss_ext_sales_price) sum_agg\n" +
-                " from  date_dim dt \n" +
-                "      ,store_sales\n" +
-                "      ,item\n" +
-                " where dt.d_date_sk = store_sales.ss_sold_date_sk\n" +
-                "   and store_sales.ss_item_sk = item.i_item_sk\n" +
-                "   and item.i_manufact_id = 436\n" +
-                "   and dt.d_moy=12\n" +
-                " group by dt.d_year\n" +
-                "      ,item.i_brand\n" +
-                "      ,item.i_brand_id\n" +
-                " order by dt.d_year\n" +
-                "         ,sum_agg desc\n" +
-                "         ,brand_id\n" +
-                " limit 100";
-        String query3StringNoSpaces = query3String.replaceAll("\\s+", "");
-        String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
-        assertEquals(expectedNoSpaces, query3StringNoSpaces);
-    }
+  @Test
+  public void testQuery3String() throws Exception {
+    String query3String = QueryReader.readQuery("query3");
+    String expected =
+        "select  dt.d_year \n"
+            + "       ,item.i_brand_id brand_id \n"
+            + "       ,item.i_brand brand\n"
+            + "       ,sum(ss_ext_sales_price) sum_agg\n"
+            + " from  date_dim dt \n"
+            + "      ,store_sales\n"
+            + "      ,item\n"
+            + " where dt.d_date_sk = store_sales.ss_sold_date_sk\n"
+            + "   and store_sales.ss_item_sk = item.i_item_sk\n"
+            + "   and item.i_manufact_id = 436\n"
+            + "   and dt.d_moy=12\n"
+            + " group by dt.d_year\n"
+            + "      ,item.i_brand\n"
+            + "      ,item.i_brand_id\n"
+            + " order by dt.d_year\n"
+            + "         ,sum_agg desc\n"
+            + "         ,brand_id\n"
+            + " limit 100";
+    String query3StringNoSpaces = query3String.replaceAll("\\s+", "");
+    String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
+    assertEquals(expectedNoSpaces, query3StringNoSpaces);
+  }
 
-    @Test
-    public void testQuery4String() throws Exception {
-        String query4String = QueryReader.readQuery("query4");
-        String expected = "with year_total as (\n" +
-                " select c_customer_id customer_id\n" +
-                "       ,c_first_name customer_first_name\n" +
-                "       ,c_last_name customer_last_name\n" +
-                "       ,c_preferred_cust_flag customer_preferred_cust_flag\n" +
-                "       ,c_birth_country customer_birth_country\n" +
-                "       ,c_login customer_login\n" +
-                "       ,c_email_address customer_email_address\n" +
-                "       ,d_year dyear\n" +
-                "       ,sum(((ss_ext_list_price-ss_ext_wholesale_cost-ss_ext_discount_amt)+ss_ext_sales_price)/2) year_total\n" +
-                "       ,'s' sale_type\n" +
-                " from customer\n" +
-                "     ,store_sales\n" +
-                "     ,date_dim\n" +
-                " where c_customer_sk = ss_customer_sk\n" +
-                "   and ss_sold_date_sk = d_date_sk\n" +
-                " group by c_customer_id\n" +
-                "         ,c_first_name\n" +
-                "         ,c_last_name\n" +
-                "         ,c_preferred_cust_flag\n" +
-                "         ,c_birth_country\n" +
-                "         ,c_login\n" +
-                "         ,c_email_address\n" +
-                "         ,d_year\n" +
-                " union all\n" +
-                " select c_customer_id customer_id\n" +
-                "       ,c_first_name customer_first_name\n" +
-                "       ,c_last_name customer_last_name\n" +
-                "       ,c_preferred_cust_flag customer_preferred_cust_flag\n" +
-                "       ,c_birth_country customer_birth_country\n" +
-                "       ,c_login customer_login\n" +
-                "       ,c_email_address customer_email_address\n" +
-                "       ,d_year dyear\n" +
-                "       ,sum((((cs_ext_list_price-cs_ext_wholesale_cost-cs_ext_discount_amt)+cs_ext_sales_price)/2) ) year_total\n" +
-                "       ,'c' sale_type\n" +
-                " from customer\n" +
-                "     ,catalog_sales\n" +
-                "     ,date_dim\n" +
-                " where c_customer_sk = cs_bill_customer_sk\n" +
-                "   and cs_sold_date_sk = d_date_sk\n" +
-                " group by c_customer_id\n" +
-                "         ,c_first_name\n" +
-                "         ,c_last_name\n" +
-                "         ,c_preferred_cust_flag\n" +
-                "         ,c_birth_country\n" +
-                "         ,c_login\n" +
-                "         ,c_email_address\n" +
-                "         ,d_year\n" +
-                "union all\n" +
-                " select c_customer_id customer_id\n" +
-                "       ,c_first_name customer_first_name\n" +
-                "       ,c_last_name customer_last_name\n" +
-                "       ,c_preferred_cust_flag customer_preferred_cust_flag\n" +
-                "       ,c_birth_country customer_birth_country\n" +
-                "       ,c_login customer_login\n" +
-                "       ,c_email_address customer_email_address\n" +
-                "       ,d_year dyear\n" +
-                "       ,sum((((ws_ext_list_price-ws_ext_wholesale_cost-ws_ext_discount_amt)+ws_ext_sales_price)/2) ) year_total\n" +
-                "       ,'w' sale_type\n" +
-                " from customer\n" +
-                "     ,web_sales\n" +
-                "     ,date_dim\n" +
-                " where c_customer_sk = ws_bill_customer_sk\n" +
-                "   and ws_sold_date_sk = d_date_sk\n" +
-                " group by c_customer_id\n" +
-                "         ,c_first_name\n" +
-                "         ,c_last_name\n" +
-                "         ,c_preferred_cust_flag\n" +
-                "         ,c_birth_country\n" +
-                "         ,c_login\n" +
-                "         ,c_email_address\n" +
-                "         ,d_year\n" +
-                "         )\n" +
-                "  select  \n" +
-                "                  t_s_secyear.customer_id\n" +
-                "                 ,t_s_secyear.customer_first_name\n" +
-                "                 ,t_s_secyear.customer_last_name\n" +
-                "                 ,t_s_secyear.customer_email_address\n" +
-                " from year_total t_s_firstyear\n" +
-                "     ,year_total t_s_secyear\n" +
-                "     ,year_total t_c_firstyear\n" +
-                "     ,year_total t_c_secyear\n" +
-                "     ,year_total t_w_firstyear\n" +
-                "     ,year_total t_w_secyear\n" +
-                " where t_s_secyear.customer_id = t_s_firstyear.customer_id\n" +
-                "   and t_s_firstyear.customer_id = t_c_secyear.customer_id\n" +
-                "   and t_s_firstyear.customer_id = t_c_firstyear.customer_id\n" +
-                "   and t_s_firstyear.customer_id = t_w_firstyear.customer_id\n" +
-                "   and t_s_firstyear.customer_id = t_w_secyear.customer_id\n" +
-                "   and t_s_firstyear.sale_type = 's'\n" +
-                "   and t_c_firstyear.sale_type = 'c'\n" +
-                "   and t_w_firstyear.sale_type = 'w'\n" +
-                "   and t_s_secyear.sale_type = 's'\n" +
-                "   and t_c_secyear.sale_type = 'c'\n" +
-                "   and t_w_secyear.sale_type = 'w'\n" +
-                "   and t_s_firstyear.dyear =  2001\n" +
-                "   and t_s_secyear.dyear = 2001+1\n" +
-                "   and t_c_firstyear.dyear =  2001\n" +
-                "   and t_c_secyear.dyear =  2001+1\n" +
-                "   and t_w_firstyear.dyear = 2001\n" +
-                "   and t_w_secyear.dyear = 2001+1\n" +
-                "   and t_s_firstyear.year_total > 0\n" +
-                "   and t_c_firstyear.year_total > 0\n" +
-                "   and t_w_firstyear.year_total > 0\n" +
-                "   and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end\n" +
-                "           > case when t_s_firstyear.year_total > 0 then t_s_secyear.year_total / t_s_firstyear.year_total else null end\n" +
-                "   and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end\n" +
-                "           > case when t_w_firstyear.year_total > 0 then t_w_secyear.year_total / t_w_firstyear.year_total else null end\n" +
-                " order by t_s_secyear.customer_id\n" +
-                "         ,t_s_secyear.customer_first_name\n" +
-                "         ,t_s_secyear.customer_last_name\n" +
-                "         ,t_s_secyear.customer_email_address\n" +
-                "limit 100";
-        String query4StringNoSpaces = query4String.replaceAll("\\s+", "");
-        String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
-        assertEquals(expectedNoSpaces, query4StringNoSpaces);
-    }
+  @Test
+  public void testQuery4String() throws Exception {
+    String query4String = QueryReader.readQuery("query4");
+    String expected =
+        "with year_total as (\n"
+            + " select c_customer_id customer_id\n"
+            + "       ,c_first_name customer_first_name\n"
+            + "       ,c_last_name customer_last_name\n"
+            + "       ,c_preferred_cust_flag customer_preferred_cust_flag\n"
+            + "       ,c_birth_country customer_birth_country\n"
+            + "       ,c_login customer_login\n"
+            + "       ,c_email_address customer_email_address\n"
+            + "       ,d_year dyear\n"
+            + "       ,sum(((ss_ext_list_price-ss_ext_wholesale_cost-ss_ext_discount_amt)+ss_ext_sales_price)/2) year_total\n"
+            + "       ,'s' sale_type\n"
+            + " from customer\n"
+            + "     ,store_sales\n"
+            + "     ,date_dim\n"
+            + " where c_customer_sk = ss_customer_sk\n"
+            + "   and ss_sold_date_sk = d_date_sk\n"
+            + " group by c_customer_id\n"
+            + "         ,c_first_name\n"
+            + "         ,c_last_name\n"
+            + "         ,c_preferred_cust_flag\n"
+            + "         ,c_birth_country\n"
+            + "         ,c_login\n"
+            + "         ,c_email_address\n"
+            + "         ,d_year\n"
+            + " union all\n"
+            + " select c_customer_id customer_id\n"
+            + "       ,c_first_name customer_first_name\n"
+            + "       ,c_last_name customer_last_name\n"
+            + "       ,c_preferred_cust_flag customer_preferred_cust_flag\n"
+            + "       ,c_birth_country customer_birth_country\n"
+            + "       ,c_login customer_login\n"
+            + "       ,c_email_address customer_email_address\n"
+            + "       ,d_year dyear\n"
+            + "       ,sum((((cs_ext_list_price-cs_ext_wholesale_cost-cs_ext_discount_amt)+cs_ext_sales_price)/2) ) year_total\n"
+            + "       ,'c' sale_type\n"
+            + " from customer\n"
+            + "     ,catalog_sales\n"
+            + "     ,date_dim\n"
+            + " where c_customer_sk = cs_bill_customer_sk\n"
+            + "   and cs_sold_date_sk = d_date_sk\n"
+            + " group by c_customer_id\n"
+            + "         ,c_first_name\n"
+            + "         ,c_last_name\n"
+            + "         ,c_preferred_cust_flag\n"
+            + "         ,c_birth_country\n"
+            + "         ,c_login\n"
+            + "         ,c_email_address\n"
+            + "         ,d_year\n"
+            + "union all\n"
+            + " select c_customer_id customer_id\n"
+            + "       ,c_first_name customer_first_name\n"
+            + "       ,c_last_name customer_last_name\n"
+            + "       ,c_preferred_cust_flag customer_preferred_cust_flag\n"
+            + "       ,c_birth_country customer_birth_country\n"
+            + "       ,c_login customer_login\n"
+            + "       ,c_email_address customer_email_address\n"
+            + "       ,d_year dyear\n"
+            + "       ,sum((((ws_ext_list_price-ws_ext_wholesale_cost-ws_ext_discount_amt)+ws_ext_sales_price)/2) ) year_total\n"
+            + "       ,'w' sale_type\n"
+            + " from customer\n"
+            + "     ,web_sales\n"
+            + "     ,date_dim\n"
+            + " where c_customer_sk = ws_bill_customer_sk\n"
+            + "   and ws_sold_date_sk = d_date_sk\n"
+            + " group by c_customer_id\n"
+            + "         ,c_first_name\n"
+            + "         ,c_last_name\n"
+            + "         ,c_preferred_cust_flag\n"
+            + "         ,c_birth_country\n"
+            + "         ,c_login\n"
+            + "         ,c_email_address\n"
+            + "         ,d_year\n"
+            + "         )\n"
+            + "  select  \n"
+            + "                  t_s_secyear.customer_id\n"
+            + "                 ,t_s_secyear.customer_first_name\n"
+            + "                 ,t_s_secyear.customer_last_name\n"
+            + "                 ,t_s_secyear.customer_email_address\n"
+            + " from year_total t_s_firstyear\n"
+            + "     ,year_total t_s_secyear\n"
+            + "     ,year_total t_c_firstyear\n"
+            + "     ,year_total t_c_secyear\n"
+            + "     ,year_total t_w_firstyear\n"
+            + "     ,year_total t_w_secyear\n"
+            + " where t_s_secyear.customer_id = t_s_firstyear.customer_id\n"
+            + "   and t_s_firstyear.customer_id = t_c_secyear.customer_id\n"
+            + "   and t_s_firstyear.customer_id = t_c_firstyear.customer_id\n"
+            + "   and t_s_firstyear.customer_id = t_w_firstyear.customer_id\n"
+            + "   and t_s_firstyear.customer_id = t_w_secyear.customer_id\n"
+            + "   and t_s_firstyear.sale_type = 's'\n"
+            + "   and t_c_firstyear.sale_type = 'c'\n"
+            + "   and t_w_firstyear.sale_type = 'w'\n"
+            + "   and t_s_secyear.sale_type = 's'\n"
+            + "   and t_c_secyear.sale_type = 'c'\n"
+            + "   and t_w_secyear.sale_type = 'w'\n"
+            + "   and t_s_firstyear.dyear =  2001\n"
+            + "   and t_s_secyear.dyear = 2001+1\n"
+            + "   and t_c_firstyear.dyear =  2001\n"
+            + "   and t_c_secyear.dyear =  2001+1\n"
+            + "   and t_w_firstyear.dyear = 2001\n"
+            + "   and t_w_secyear.dyear = 2001+1\n"
+            + "   and t_s_firstyear.year_total > 0\n"
+            + "   and t_c_firstyear.year_total > 0\n"
+            + "   and t_w_firstyear.year_total > 0\n"
+            + "   and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end\n"
+            + "           > case when t_s_firstyear.year_total > 0 then t_s_secyear.year_total / t_s_firstyear.year_total else null end\n"
+            + "   and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end\n"
+            + "           > case when t_w_firstyear.year_total > 0 then t_w_secyear.year_total / t_w_firstyear.year_total else null end\n"
+            + " order by t_s_secyear.customer_id\n"
+            + "         ,t_s_secyear.customer_first_name\n"
+            + "         ,t_s_secyear.customer_last_name\n"
+            + "         ,t_s_secyear.customer_email_address\n"
+            + "limit 100";
+    String query4StringNoSpaces = query4String.replaceAll("\\s+", "");
+    String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
+    assertEquals(expectedNoSpaces, query4StringNoSpaces);
+  }
 
-    @Test
-    public void testQuery55String() throws Exception {
-        String query55String = QueryReader.readQuery("query55");
-        String expected = "select  i_brand_id brand_id, i_brand brand,\n" +
-                " \tsum(ss_ext_sales_price) ext_price\n" +
-                " from date_dim, store_sales, item\n" +
-                " where d_date_sk = ss_sold_date_sk\n" +
-                " \tand ss_item_sk = i_item_sk\n" +
-                " \tand i_manager_id=36\n" +
-                " \tand d_moy=12\n" +
-                " \tand d_year=2001\n" +
-                " group by i_brand, i_brand_id\n" +
-                " order by ext_price desc, i_brand_id\n" +
-                "limit 100";
-        String query55StringNoSpaces = query55String.replaceAll("\\s+", "");
-        String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
-        assertEquals(expectedNoSpaces, query55StringNoSpaces);
-    }
+  @Test
+  public void testQuery55String() throws Exception {
+    String query55String = QueryReader.readQuery("query55");
+    String expected =
+        "select  i_brand_id brand_id, i_brand brand,\n"
+            + " \tsum(ss_ext_sales_price) ext_price\n"
+            + " from date_dim, store_sales, item\n"
+            + " where d_date_sk = ss_sold_date_sk\n"
+            + " \tand ss_item_sk = i_item_sk\n"
+            + " \tand i_manager_id=36\n"
+            + " \tand d_moy=12\n"
+            + " \tand d_year=2001\n"
+            + " group by i_brand, i_brand_id\n"
+            + " order by ext_price desc, i_brand_id\n"
+            + "limit 100";
+    String query55StringNoSpaces = query55String.replaceAll("\\s+", "");
+    String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
+    assertEquals(expectedNoSpaces, query55StringNoSpaces);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
index 7748bee..1d597f0 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
@@ -18,134 +18,160 @@
 package org.apache.beam.sdk.tpcds;
 
 import static org.junit.Assert.assertEquals;
-import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-
+import org.junit.Test;
 
 public class TableSchemaJSONLoaderTest {
-    @Test
-    public void testStoreReturnsTable() throws Exception {
-        String storeReturnsSchemaString = TableSchemaJSONLoader.parseTableSchema("store_returns");
-        String expected = "sr_returned_date_sk bigint,"
-                + "sr_return_time_sk bigint,"
-                + "sr_item_sk bigint,"
-                + "sr_customer_sk bigint,"
-                + "sr_cdemo_sk bigint,"
-                + "sr_hdemo_sk bigint,"
-                + "sr_addr_sk bigint,"
-                + "sr_store_sk bigint,"
-                + "sr_reason_sk bigint,"
-                + "sr_ticket_number bigint,"
-                + "sr_return_quantity bigint,"
-                + "sr_return_amt double,"
-                + "sr_return_tax double,"
-                + "sr_return_amt_inc_tax double,"
-                + "sr_fee double,"
-                + "sr_return_ship_cost double,"
-                + "sr_refunded_cash double,"
-                + "sr_reversed_charge double,"
-                + "sr_store_credit double,"
-                + "sr_net_loss double";
-        assertEquals(expected, storeReturnsSchemaString);
-    }
+  @Test
+  public void testStoreReturnsTable() throws Exception {
+    String storeReturnsSchemaString = TableSchemaJSONLoader.parseTableSchema("store_returns");
+    String expected =
+        "sr_returned_date_sk bigint,"
+            + "sr_return_time_sk bigint,"
+            + "sr_item_sk bigint,"
+            + "sr_customer_sk bigint,"
+            + "sr_cdemo_sk bigint,"
+            + "sr_hdemo_sk bigint,"
+            + "sr_addr_sk bigint,"
+            + "sr_store_sk bigint,"
+            + "sr_reason_sk bigint,"
+            + "sr_ticket_number bigint,"
+            + "sr_return_quantity bigint,"
+            + "sr_return_amt double,"
+            + "sr_return_tax double,"
+            + "sr_return_amt_inc_tax double,"
+            + "sr_fee double,"
+            + "sr_return_ship_cost double,"
+            + "sr_refunded_cash double,"
+            + "sr_reversed_charge double,"
+            + "sr_store_credit double,"
+            + "sr_net_loss double";
+    assertEquals(expected, storeReturnsSchemaString);
+  }
 
-    @Test
-    public void testItemTable() throws Exception {
-        String itemSchemaString = TableSchemaJSONLoader.parseTableSchema("item");
-        String expected = "i_item_sk bigint,"
-                + "i_item_id varchar,"
-                + "i_rec_start_date varchar,"
-                + "i_rec_end_date varchar,"
-                + "i_item_desc varchar,"
-                + "i_current_price double,"
-                + "i_wholesale_cost double,"
-                + "i_brand_id bigint,"
-                + "i_brand varchar,"
-                + "i_class_id bigint,"
-                + "i_class varchar,"
-                + "i_category_id bigint,"
-                + "i_category varchar,"
-                + "i_manufact_id bigint,"
-                + "i_manufact varchar,"
-                + "i_size varchar,"
-                + "i_formulation varchar,"
-                + "i_color varchar,"
-                + "i_units varchar,"
-                + "i_container varchar,"
-                + "i_manager_id bigint,"
-                + "i_product_name varchar";
-        assertEquals(expected, itemSchemaString);
-    }
+  @Test
+  public void testItemTable() throws Exception {
+    String itemSchemaString = TableSchemaJSONLoader.parseTableSchema("item");
+    String expected =
+        "i_item_sk bigint,"
+            + "i_item_id varchar,"
+            + "i_rec_start_date varchar,"
+            + "i_rec_end_date varchar,"
+            + "i_item_desc varchar,"
+            + "i_current_price double,"
+            + "i_wholesale_cost double,"
+            + "i_brand_id bigint,"
+            + "i_brand varchar,"
+            + "i_class_id bigint,"
+            + "i_class varchar,"
+            + "i_category_id bigint,"
+            + "i_category varchar,"
+            + "i_manufact_id bigint,"
+            + "i_manufact varchar,"
+            + "i_size varchar,"
+            + "i_formulation varchar,"
+            + "i_color varchar,"
+            + "i_units varchar,"
+            + "i_container varchar,"
+            + "i_manager_id bigint,"
+            + "i_product_name varchar";
+    assertEquals(expected, itemSchemaString);
+  }
 
-    @Test
-    public void testDateDimTable() throws Exception {
-        String dateDimSchemaString = TableSchemaJSONLoader.parseTableSchema("date_dim");
-        String expected = "d_date_sk bigint,"
-                + "d_date_id varchar,"
-                + "d_date varchar,"
-                + "d_month_seq bigint,"
-                + "d_week_seq bigint,"
-                + "d_quarter_seq bigint,"
-                + "d_year bigint,"
-                + "d_dow bigint,"
-                + "d_moy bigint,"
-                + "d_dom bigint,"
-                + "d_qoy bigint,"
-                + "d_fy_year bigint,"
-                + "d_fy_quarter_seq bigint,"
-                + "d_fy_week_seq bigint,"
-                + "d_day_name varchar,"
-                + "d_quarter_name varchar,"
-                + "d_holiday varchar,"
-                + "d_weekend varchar,"
-                + "d_following_holiday varchar,"
-                + "d_first_dom bigint,"
-                + "d_last_dom bigint,"
-                + "d_same_day_ly bigint,"
-                + "d_same_day_lq bigint,"
-                + "d_current_day varchar,"
-                + "d_current_week varchar,"
-                + "d_current_month varchar,"
-                + "d_current_quarter varchar,"
-                + "d_current_year varchar";
-        assertEquals(expected, dateDimSchemaString);
-    }
+  @Test
+  public void testDateDimTable() throws Exception {
+    String dateDimSchemaString = TableSchemaJSONLoader.parseTableSchema("date_dim");
+    String expected =
+        "d_date_sk bigint,"
+            + "d_date_id varchar,"
+            + "d_date varchar,"
+            + "d_month_seq bigint,"
+            + "d_week_seq bigint,"
+            + "d_quarter_seq bigint,"
+            + "d_year bigint,"
+            + "d_dow bigint,"
+            + "d_moy bigint,"
+            + "d_dom bigint,"
+            + "d_qoy bigint,"
+            + "d_fy_year bigint,"
+            + "d_fy_quarter_seq bigint,"
+            + "d_fy_week_seq bigint,"
+            + "d_day_name varchar,"
+            + "d_quarter_name varchar,"
+            + "d_holiday varchar,"
+            + "d_weekend varchar,"
+            + "d_following_holiday varchar,"
+            + "d_first_dom bigint,"
+            + "d_last_dom bigint,"
+            + "d_same_day_ly bigint,"
+            + "d_same_day_lq bigint,"
+            + "d_current_day varchar,"
+            + "d_current_week varchar,"
+            + "d_current_month varchar,"
+            + "d_current_quarter varchar,"
+            + "d_current_year varchar";
+    assertEquals(expected, dateDimSchemaString);
+  }
 
-    @Test
-    public void testWarehouseTable() throws Exception {
-        String warehouseSchemaString = TableSchemaJSONLoader.parseTableSchema("warehouse");
-        String expected = "w_warehouse_sk bigint,"
-                + "w_warehouse_id varchar,"
-                + "w_warehouse_name varchar,"
-                + "w_warehouse_sq_ft bigint,"
-                + "w_street_number varchar,"
-                + "w_street_name varchar,"
-                + "w_street_type varchar,"
-                + "w_suite_number varchar,"
-                + "w_city varchar,"
-                + "w_county varchar,"
-                + "w_state varchar,"
-                + "w_zip varchar,"
-                + "w_country varchar,"
-                + "w_gmt_offset double";
-        assertEquals(expected, warehouseSchemaString);
-    }
+  @Test
+  public void testWarehouseTable() throws Exception {
+    String warehouseSchemaString = TableSchemaJSONLoader.parseTableSchema("warehouse");
+    String expected =
+        "w_warehouse_sk bigint,"
+            + "w_warehouse_id varchar,"
+            + "w_warehouse_name varchar,"
+            + "w_warehouse_sq_ft bigint,"
+            + "w_street_number varchar,"
+            + "w_street_name varchar,"
+            + "w_street_type varchar,"
+            + "w_suite_number varchar,"
+            + "w_city varchar,"
+            + "w_county varchar,"
+            + "w_state varchar,"
+            + "w_zip varchar,"
+            + "w_country varchar,"
+            + "w_gmt_offset double";
+    assertEquals(expected, warehouseSchemaString);
+  }
 
-    @Test
-    public void testGetAllTableNames() {
-        List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
-        Collections.sort(tableNames);
-        List<String> expectedTableNames = Arrays.asList("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics",
-                "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim",
-                "warehouse", "web_page", "web_returns", "web_sales", "web_site");
+  @Test
+  public void testGetAllTableNames() {
+    List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
+    Collections.sort(tableNames);
+    List<String> expectedTableNames =
+        Arrays.asList(
+            "call_center",
+            "catalog_page",
+            "catalog_returns",
+            "catalog_sales",
+            "customer",
+            "customer_address",
+            "customer_demographics",
+            "date_dim",
+            "household_demographics",
+            "income_band",
+            "inventory",
+            "item",
+            "promotion",
+            "reason",
+            "ship_mode",
+            "store",
+            "store_returns",
+            "store_sales",
+            "time_dim",
+            "warehouse",
+            "web_page",
+            "web_returns",
+            "web_sales",
+            "web_site");
 
-        assertEquals(expectedTableNames.size(), tableNames.size());
+    assertEquals(expectedTableNames.size(), tableNames.size());
 
-        for (int i = 0; i < tableNames.size(); i++) {
-            assertEquals(expectedTableNames.get(i), tableNames.get(i));
-        }
+    for (int i = 0; i < tableNames.size(); i++) {
+      assertEquals(expectedTableNames.get(i), tableNames.get(i));
     }
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsParametersReaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsParametersReaderTest.java
index 3f8c951..2cd1b0b 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsParametersReaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsParametersReaderTest.java
@@ -17,76 +17,76 @@
  */
 package org.apache.beam.sdk.tpcds;
 
+import static org.junit.Assert.assertEquals;
+
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
 public class TpcdsParametersReaderTest {
-    private TpcdsOptions tpcdsOptions;
-    private TpcdsOptions tpcdsOptionsError;
+  private TpcdsOptions tpcdsOptions;
+  private TpcdsOptions tpcdsOptionsError;
 
-    @Before
-    public void initializeTpcdsOptions() {
-        tpcdsOptions = PipelineOptionsFactory.as(TpcdsOptions.class);
-        tpcdsOptionsError = PipelineOptionsFactory.as(TpcdsOptions.class);
+  @Before
+  public void initializeTpcdsOptions() {
+    tpcdsOptions = PipelineOptionsFactory.as(TpcdsOptions.class);
+    tpcdsOptionsError = PipelineOptionsFactory.as(TpcdsOptions.class);
 
-        tpcdsOptions.setDataSize("1G");
-        tpcdsOptions.setQueries("1,2,3");
-        tpcdsOptions.setTpcParallel(2);
+    tpcdsOptions.setDataSize("1G");
+    tpcdsOptions.setQueries("1,2,3");
+    tpcdsOptions.setTpcParallel(2);
 
-        tpcdsOptionsError.setDataSize("5G");
-        tpcdsOptionsError.setQueries("0,100");
-        tpcdsOptionsError.setTpcParallel(0);
-    }
+    tpcdsOptionsError.setDataSize("5G");
+    tpcdsOptionsError.setQueries("0,100");
+    tpcdsOptionsError.setTpcParallel(0);
+  }
 
-    @Test
-    public void testGetAndCheckDataSize() throws Exception {
-        String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
-        String expected = "1G";
-        assertEquals(expected, dataSize);
-    }
+  @Test
+  public void testGetAndCheckDataSize() throws Exception {
+    String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+    String expected = "1G";
+    assertEquals(expected, dataSize);
+  }
 
-    @Test( expected = Exception.class)
-    public void testGetAndCheckDataSizeException() throws Exception {
-        TpcdsParametersReader.getAndCheckDataSize(tpcdsOptionsError);
-    }
+  @Test(expected = Exception.class)
+  public void testGetAndCheckDataSizeException() throws Exception {
+    TpcdsParametersReader.getAndCheckDataSize(tpcdsOptionsError);
+  }
 
-    @Test
-    public void testGetAndCheckQueries() throws Exception {
-        TpcdsOptions tpcdsOptionsAll = PipelineOptionsFactory.as(TpcdsOptions.class);
-        tpcdsOptionsAll.setQueries("all");
-        String[] queryNameArray = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptionsAll);
-        String[] expected = new String[99];
-        for (int i = 0; i < 99; i++) {
-            expected[i] = "query" + (i + 1);
-        }
-        Assert.assertArrayEquals(expected, queryNameArray);
+  @Test
+  public void testGetAndCheckQueries() throws Exception {
+    TpcdsOptions tpcdsOptionsAll = PipelineOptionsFactory.as(TpcdsOptions.class);
+    tpcdsOptionsAll.setQueries("all");
+    String[] queryNameArray = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptionsAll);
+    String[] expected = new String[99];
+    for (int i = 0; i < 99; i++) {
+      expected[i] = "query" + (i + 1);
     }
+    Assert.assertArrayEquals(expected, queryNameArray);
+  }
 
-    @Test
-    public void testGetAndCheckAllQueries() throws Exception {
-        String[] queryNameArray = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
-        String[] expected = {"query1", "query2", "query3"};
-        Assert.assertArrayEquals(expected, queryNameArray);
-    }
+  @Test
+  public void testGetAndCheckAllQueries() throws Exception {
+    String[] queryNameArray = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
+    String[] expected = {"query1", "query2", "query3"};
+    Assert.assertArrayEquals(expected, queryNameArray);
+  }
 
-    @Test( expected = Exception.class)
-    public void testGetAndCheckQueriesException() throws Exception {
-        TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptionsError);
-    }
+  @Test(expected = Exception.class)
+  public void testGetAndCheckQueriesException() throws Exception {
+    TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptionsError);
+  }
 
-    @Test
-    public void testGetAndCheckTpcParallel() throws Exception {
-        int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
-        int expected = 2;
-        assertEquals(expected, nThreads);
-    }
+  @Test
+  public void testGetAndCheckTpcParallel() throws Exception {
+    int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
+    int expected = 2;
+    assertEquals(expected, nThreads);
+  }
 
-    @Test( expected = Exception.class)
-    public void ttestGetAndCheckTpcParallelException() throws Exception {
-        TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptionsError);
-    }
+  @Test(expected = Exception.class)
+  public void ttestGetAndCheckTpcParallelException() throws Exception {
+    TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptionsError);
+  }
 }
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java
index 05402f2..9c43935 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java
@@ -17,108 +17,107 @@
  */
 package org.apache.beam.sdk.tpcds;
 
-import org.apache.beam.sdk.schemas.Schema;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Map;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+import org.junit.Before;
+import org.junit.Test;
 
 public class TpcdsSchemasTest {
-    private Map<String, Schema> schemaMap;
-    private Map<String, Schema> immutableSchemaMap;
+  private Map<String, Schema> schemaMap;
+  private Map<String, Schema> immutableSchemaMap;
 
-    @Before
-    public void initializeMaps() throws Exception {
-        schemaMap = TpcdsSchemas.getTpcdsSchemas();
-        immutableSchemaMap = TpcdsSchemas.getTpcdsSchemasImmutableMap();
-    }
+  @Before
+  public void initializeMaps() throws Exception {
+    schemaMap = TpcdsSchemas.getTpcdsSchemas();
+    immutableSchemaMap = TpcdsSchemas.getTpcdsSchemasImmutableMap();
+  }
 
-    @Test
-    public void testCallCenterSchema() throws Exception {
-        Schema callCenterSchema =
-                Schema.builder()
-                        .addField("cc_call_center_sk", Schema.FieldType.INT64)
-                        .addField("cc_call_center_id", Schema.FieldType.STRING)
-                        .addNullableField("cc_rec_start_date", Schema.FieldType.STRING)
-                        .addNullableField("cc_rec_end_date", Schema.FieldType.STRING)
-                        .addNullableField("cc_closed_date_sk", Schema.FieldType.INT64)
-                        .addNullableField("cc_open_date_sk", Schema.FieldType.INT64)
-                        .addNullableField("cc_name", Schema.FieldType.STRING)
-                        .addNullableField("cc_class", Schema.FieldType.STRING)
-                        .addNullableField("cc_employees", Schema.FieldType.INT64)
-                        .addNullableField("cc_sq_ft", Schema.FieldType.INT64)
-                        .addNullableField("cc_hours", Schema.FieldType.STRING)
-                        .addNullableField("cc_manager", Schema.FieldType.STRING)
-                        .addNullableField("cc_mkt_id", Schema.FieldType.INT64)
-                        .addNullableField("cc_mkt_class", Schema.FieldType.STRING)
-                        .addNullableField("cc_mkt_desc", Schema.FieldType.STRING)
-                        .addNullableField("cc_market_manager", Schema.FieldType.STRING)
-                        .addNullableField("cc_division", Schema.FieldType.INT64)
-                        .addNullableField("cc_division_name", Schema.FieldType.STRING)
-                        .addNullableField("cc_company", Schema.FieldType.INT64)
-                        .addNullableField("cc_company_name", Schema.FieldType.STRING)
-                        .addNullableField("cc_street_number", Schema.FieldType.STRING)
-                        .addNullableField("cc_street_name", Schema.FieldType.STRING)
-                        .addNullableField("cc_street_type", Schema.FieldType.STRING)
-                        .addNullableField("cc_suite_number", Schema.FieldType.STRING)
-                        .addNullableField("cc_city", Schema.FieldType.STRING)
-                        .addNullableField("cc_county", Schema.FieldType.STRING)
-                        .addNullableField("cc_state", Schema.FieldType.STRING)
-                        .addNullableField("cc_zip", Schema.FieldType.STRING)
-                        .addNullableField("cc_country", Schema.FieldType.STRING)
-                        .addNullableField("cc_gmt_offset", Schema.FieldType.DOUBLE)
-                        .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
-                        .build();
+  @Test
+  public void testCallCenterSchema() throws Exception {
+    Schema callCenterSchema =
+        Schema.builder()
+            .addField("cc_call_center_sk", Schema.FieldType.INT64)
+            .addField("cc_call_center_id", Schema.FieldType.STRING)
+            .addNullableField("cc_rec_start_date", Schema.FieldType.STRING)
+            .addNullableField("cc_rec_end_date", Schema.FieldType.STRING)
+            .addNullableField("cc_closed_date_sk", Schema.FieldType.INT64)
+            .addNullableField("cc_open_date_sk", Schema.FieldType.INT64)
+            .addNullableField("cc_name", Schema.FieldType.STRING)
+            .addNullableField("cc_class", Schema.FieldType.STRING)
+            .addNullableField("cc_employees", Schema.FieldType.INT64)
+            .addNullableField("cc_sq_ft", Schema.FieldType.INT64)
+            .addNullableField("cc_hours", Schema.FieldType.STRING)
+            .addNullableField("cc_manager", Schema.FieldType.STRING)
+            .addNullableField("cc_mkt_id", Schema.FieldType.INT64)
+            .addNullableField("cc_mkt_class", Schema.FieldType.STRING)
+            .addNullableField("cc_mkt_desc", Schema.FieldType.STRING)
+            .addNullableField("cc_market_manager", Schema.FieldType.STRING)
+            .addNullableField("cc_division", Schema.FieldType.INT64)
+            .addNullableField("cc_division_name", Schema.FieldType.STRING)
+            .addNullableField("cc_company", Schema.FieldType.INT64)
+            .addNullableField("cc_company_name", Schema.FieldType.STRING)
+            .addNullableField("cc_street_number", Schema.FieldType.STRING)
+            .addNullableField("cc_street_name", Schema.FieldType.STRING)
+            .addNullableField("cc_street_type", Schema.FieldType.STRING)
+            .addNullableField("cc_suite_number", Schema.FieldType.STRING)
+            .addNullableField("cc_city", Schema.FieldType.STRING)
+            .addNullableField("cc_county", Schema.FieldType.STRING)
+            .addNullableField("cc_state", Schema.FieldType.STRING)
+            .addNullableField("cc_zip", Schema.FieldType.STRING)
+            .addNullableField("cc_country", Schema.FieldType.STRING)
+            .addNullableField("cc_gmt_offset", Schema.FieldType.DOUBLE)
+            .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
+            .build();
 
-        assertNotEquals(schemaMap.get("call_center"), callCenterSchema);
-        assertEquals(immutableSchemaMap.get("call_center"), callCenterSchema);
-    }
+    assertNotEquals(schemaMap.get("call_center"), callCenterSchema);
+    assertEquals(immutableSchemaMap.get("call_center"), callCenterSchema);
+  }
 
-    @Test
-    public void testCatalogPageSchemaNullable() throws Exception {
-        Schema catalogPageSchemaNullable =
-                Schema.builder()
-                        .addNullableField("cp_catalog_page_sk", Schema.FieldType.INT64)
-                        .addNullableField("cp_catalog_page_id", Schema.FieldType.STRING)
-                        .addNullableField("cp_start_date_sk", Schema.FieldType.INT64)
-                        .addNullableField("cp_end_date_sk", Schema.FieldType.INT64)
-                        .addNullableField("cp_department", Schema.FieldType.STRING)
-                        .addNullableField("cp_catalog_number", Schema.FieldType.INT64)
-                        .addNullableField("cp_catalog_page_number", Schema.FieldType.INT64)
-                        .addNullableField("cp_description", Schema.FieldType.STRING)
-                        .addNullableField("cp_type", Schema.FieldType.STRING)
-                        .build();
+  @Test
+  public void testCatalogPageSchemaNullable() throws Exception {
+    Schema catalogPageSchemaNullable =
+        Schema.builder()
+            .addNullableField("cp_catalog_page_sk", Schema.FieldType.INT64)
+            .addNullableField("cp_catalog_page_id", Schema.FieldType.STRING)
+            .addNullableField("cp_start_date_sk", Schema.FieldType.INT64)
+            .addNullableField("cp_end_date_sk", Schema.FieldType.INT64)
+            .addNullableField("cp_department", Schema.FieldType.STRING)
+            .addNullableField("cp_catalog_number", Schema.FieldType.INT64)
+            .addNullableField("cp_catalog_page_number", Schema.FieldType.INT64)
+            .addNullableField("cp_description", Schema.FieldType.STRING)
+            .addNullableField("cp_type", Schema.FieldType.STRING)
+            .build();
 
-        assertEquals(schemaMap.get("catalog_page"), catalogPageSchemaNullable);
-        assertNotEquals(schemaMap.get("catalog_page"), TpcdsSchemas.getCatalogPageSchema());
-        assertEquals(immutableSchemaMap.get("catalog_page"), TpcdsSchemas.getCatalogPageSchema());
-    }
+    assertEquals(schemaMap.get("catalog_page"), catalogPageSchemaNullable);
+    assertNotEquals(schemaMap.get("catalog_page"), TpcdsSchemas.getCatalogPageSchema());
+    assertEquals(immutableSchemaMap.get("catalog_page"), TpcdsSchemas.getCatalogPageSchema());
+  }
 
-    @Test
-    public void testCustomerAddressSchemaNullable() throws Exception {
-        Schema customerAddressSchemaNullable =
-                Schema.builder()
-                        .addNullableField("ca_address_sk", Schema.FieldType.INT64)
-                        .addNullableField("ca_address_id", Schema.FieldType.STRING)
-                        .addNullableField("ca_street_number", Schema.FieldType.STRING)
-                        .addNullableField("ca_street_name", Schema.FieldType.STRING)
-                        .addNullableField("ca_street_type", Schema.FieldType.STRING)
-                        .addNullableField("ca_suite_number", Schema.FieldType.STRING)
-                        .addNullableField("ca_city", Schema.FieldType.STRING)
-                        .addNullableField("ca_county", Schema.FieldType.STRING)
-                        .addNullableField("ca_state", Schema.FieldType.STRING)
-                        .addNullableField("ca_zip", Schema.FieldType.STRING)
-                        .addNullableField("ca_country", Schema.FieldType.STRING)
-                        .addNullableField("ca_gmt_offset", Schema.FieldType.DOUBLE)
-                        .addNullableField("ca_location_type", Schema.FieldType.STRING)
-                        .build();
+  @Test
+  public void testCustomerAddressSchemaNullable() throws Exception {
+    Schema customerAddressSchemaNullable =
+        Schema.builder()
+            .addNullableField("ca_address_sk", Schema.FieldType.INT64)
+            .addNullableField("ca_address_id", Schema.FieldType.STRING)
+            .addNullableField("ca_street_number", Schema.FieldType.STRING)
+            .addNullableField("ca_street_name", Schema.FieldType.STRING)
+            .addNullableField("ca_street_type", Schema.FieldType.STRING)
+            .addNullableField("ca_suite_number", Schema.FieldType.STRING)
+            .addNullableField("ca_city", Schema.FieldType.STRING)
+            .addNullableField("ca_county", Schema.FieldType.STRING)
+            .addNullableField("ca_state", Schema.FieldType.STRING)
+            .addNullableField("ca_zip", Schema.FieldType.STRING)
+            .addNullableField("ca_country", Schema.FieldType.STRING)
+            .addNullableField("ca_gmt_offset", Schema.FieldType.DOUBLE)
+            .addNullableField("ca_location_type", Schema.FieldType.STRING)
+            .build();
 
-        assertEquals(schemaMap.get("customer_address"), customerAddressSchemaNullable);
-        assertNotEquals(schemaMap.get("customer_address"), TpcdsSchemas.getCustomerAddressSchema());
-        assertEquals(immutableSchemaMap.get("customer_address"), TpcdsSchemas.getCustomerAddressSchema());
-    }
+    assertEquals(schemaMap.get("customer_address"), customerAddressSchemaNullable);
+    assertNotEquals(schemaMap.get("customer_address"), TpcdsSchemas.getCustomerAddressSchema());
+    assertEquals(
+        immutableSchemaMap.get("customer_address"), TpcdsSchemas.getCustomerAddressSchema());
+  }
 }