You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2021/04/13 12:23:23 UTC
[beam] 01/04: [BEAM-11712] Make up-to-date build file and codestyle
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 28eec3f9d484fb7e9484c8f8c57c30a4fdaf867b
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Tue Mar 30 11:16:39 2021 +0200
[BEAM-11712] Make up-to-date build file and codestyle
---
sdks/java/testing/tpcds/build.gradle | 99 +-
.../apache/beam/sdk/tpcds/BeamSqlEnvRunner.java | 328 +++--
.../java/org/apache/beam/sdk/tpcds/BeamTpcds.java | 54 +-
.../java/org/apache/beam/sdk/tpcds/CsvToRow.java | 47 +-
.../org/apache/beam/sdk/tpcds/QueryReader.java | 87 +-
.../java/org/apache/beam/sdk/tpcds/RowToCsv.java | 38 +-
.../apache/beam/sdk/tpcds/SqlTransformRunner.java | 317 +++--
.../apache/beam/sdk/tpcds/SummaryGenerator.java | 219 ++--
.../beam/sdk/tpcds/TableSchemaJSONLoader.java | 171 ++-
.../org/apache/beam/sdk/tpcds/TpcdsOptions.java | 26 +-
.../beam/sdk/tpcds/TpcdsOptionsRegistrar.java | 10 +-
.../beam/sdk/tpcds/TpcdsParametersReader.java | 136 +-
.../java/org/apache/beam/sdk/tpcds/TpcdsRun.java | 54 +-
.../org/apache/beam/sdk/tpcds/TpcdsRunResult.java | 123 +-
.../org/apache/beam/sdk/tpcds/TpcdsSchemas.java | 1338 ++++++++++----------
...pcdsOptionsRegistrar.java => package-info.java} | 16 +-
.../org/apache/beam/sdk/tpcds/QueryReaderTest.java | 361 +++---
.../beam/sdk/tpcds/TableSchemaJSONLoaderTest.java | 260 ++--
.../beam/sdk/tpcds/TpcdsParametersReaderTest.java | 110 +-
.../apache/beam/sdk/tpcds/TpcdsSchemasTest.java | 183 ++-
20 files changed, 2138 insertions(+), 1839 deletions(-)
diff --git a/sdks/java/testing/tpcds/build.gradle b/sdks/java/testing/tpcds/build.gradle
index c52ec7a..6237776 100644
--- a/sdks/java/testing/tpcds/build.gradle
+++ b/sdks/java/testing/tpcds/build.gradle
@@ -16,38 +16,97 @@
* limitations under the License.
*/
-plugins {
- id 'java'
-}
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+ automaticModuleName: 'org.apache.beam.sdk.tpcds',
+ exportJavadoc: false,
+ archivesBaseName: 'beam-sdks-java-tpcds',
+)
-description = "Apache Beam :: SDKs :: Java :: TPC-DS Benchark"
+description = "Apache Beam :: SDKs :: Java :: TPC-DS"
-version '2.24.0-SNAPSHOT'
+// When running via Gradle, this property can be used to pass commandline arguments
+// to the TPD-DS run
+def tpcdsArgsProperty = "tpcds.args"
-sourceCompatibility = 1.8
+// When running via Gradle, this property sets the runner dependency
+def tpcdsRunnerProperty = "tpcds.runner"
+def tpcdsRunnerDependency = project.findProperty(tpcdsRunnerProperty)
+ ?: ":runners:direct-java"
+def shouldProvideSpark = ":runners:spark".equals(tpcdsRunnerDependency)
+def isDataflowRunner = ":runners:google-cloud-dataflow-java".equals(tpcdsRunnerDependency)
+def runnerConfiguration = ":runners:direct-java".equals(tpcdsRunnerDependency) ? "shadow" : null
+
+if (isDataflowRunner) {
+ /*
+ * We need to rely on manually specifying these evaluationDependsOn to ensure that
+ * the following projects are evaluated before we evaluate this project. This is because
+ * we are attempting to reference a property from the project directly.
+ */
+ evaluationDependsOn(":runners:google-cloud-dataflow-java:worker:legacy-worker")
+}
-repositories {
- mavenCentral()
+configurations {
+ // A configuration for running the TPC-DS launcher directly from Gradle, which
+ // uses Gradle to put the appropriate dependencies on the Classpath rather than
+ // bundling them into a fat jar
+ gradleRun
}
dependencies {
- compile 'com.googlecode.json-simple:json-simple:1.1.1'
- compile project(path: ":sdks:java:core", configuration: "shadow")
- compile project(path: ":runners:google-cloud-dataflow-java")
- compile project(":sdks:java:io:google-cloud-platform")
+ compile library.java.vendored_guava_26_0_jre
+ compile library.java.vendored_calcite_1_20_0
+ compile library.java.commons_csv
+ compile library.java.slf4j_api
+ compile "com.googlecode.json-simple:json-simple:1.1.1"
+ compile "com.alibaba:fastjson:1.2.69"
compile project(":sdks:java:extensions:sql")
compile project(":sdks:java:extensions:sql:zetasql")
- compile group: 'com.google.auto.service', name: 'auto-service', version: '1.0-rc1'
- testCompile group: 'junit', name: 'junit', version: '4.12'
+ compile project(path: ":runners:google-cloud-dataflow-java")
+ compile project(path: ":sdks:java:core", configuration: "shadow")
+ testRuntimeClasspath library.java.slf4j_jdk14
+ testCompile project(path: ":sdks:java:io:google-cloud-platform", configuration: "testRuntime")
+ gradleRun project(project.path)
+ gradleRun project(path: tpcdsRunnerDependency, configuration: runnerConfiguration)
+
+ // The Spark runner requires the user to provide a Spark dependency. For self-contained
+ // runs with the Spark runner, we can provide such a dependency. This is deliberately phrased
+ // to not hardcode any runner other than :runners:direct-java
+ if (shouldProvideSpark) {
+ gradleRun library.java.spark_core, {
+ exclude group:"org.slf4j", module:"jul-to-slf4j"
+ }
+ gradleRun library.java.spark_sql
+ gradleRun library.java.spark_streaming
+ }
}
-// When running via Gradle, this property can be used to pass commandline arguments
-// to the tpcds run
-def tpcdsArgsProperty = "tpcds.args"
+if (shouldProvideSpark) {
+ configurations.gradleRun {
+ // Using Spark runner causes a StackOverflowError if slf4j-jdk14 is on the classpath
+ exclude group: "org.slf4j", module: "slf4j-jdk14"
+ }
+}
task run(type: JavaExec) {
- main = "org.apache.beam.sdk.tpcds.BeamTpcds"
- classpath = sourceSets.main.runtimeClasspath
def tpcdsArgsStr = project.findProperty(tpcdsArgsProperty) ?: ""
- args tpcdsArgsStr.split()
+ def tpcdsArgsList = new ArrayList<String>()
+ Collections.addAll(tpcdsArgsList, tpcdsArgsStr.split())
+
+ if (isDataflowRunner) {
+ dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
+
+ def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?:
+ project(":runners:google-cloud-dataflow-java:worker:legacy-worker")
+ .shadowJar.archivePath
+ // Provide job with a customizable worker jar.
+ // With legacy worker jar, containerImage is set to empty (i.e. to use the internal build).
+ // More context and discussions can be found in PR#6694.
+ tpcdsArgsList.add("--dataflowWorkerJar=${dataflowWorkerJar}".toString())
+ tpcdsArgsList.add('--workerHarnessContainerImage=')
+ }
+
+ main = "org.apache.beam.sdk.tpcds.BeamTpcds"
+ classpath = configurations.gradleRun
+ args tpcdsArgsList.toArray()
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
index f94b748..43b97d2 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
@@ -18,9 +18,16 @@
package org.apache.beam.sdk.tpcds;
import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -34,155 +41,198 @@ import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class executes jobs using BeamSqlEnv, it uses BeamSqlEnv.executeDdl and BeamSqlEnv.parseQuery to run queries.
+ * This class executes jobs using BeamSqlEnv, it uses BeamSqlEnv.executeDdl and
+ * BeamSqlEnv.parseQuery to run queries.
*/
public class BeamSqlEnvRunner {
- private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
- private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
- private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
- private static final List<String> SUMMARY_HEADERS_LIST = Arrays.asList("Query Name", "Job Name", "Data Size", "Dialect", "Status", "Start Time", "End Time", "Elapsed Time(sec)");
-
- private static final Logger Log = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
-
- private static String buildTableCreateStatement(String tableName) {
- String createStatement = "CREATE EXTERNAL TABLE " + tableName + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
- return createStatement;
+ private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
+ private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
+ private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
+ private static final List<String> SUMMARY_HEADERS_LIST =
+ Arrays.asList(
+ "Query Name",
+ "Job Name",
+ "Data Size",
+ "Dialect",
+ "Status",
+ "Start Time",
+ "End Time",
+ "Elapsed Time(sec)");
+
+ private static final Logger LOG = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
+
+ private static String buildTableCreateStatement(String tableName) {
+ String createStatement =
+ "CREATE EXTERNAL TABLE "
+ + tableName
+ + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
+ return createStatement;
+ }
+
+ private static String buildDataLocation(String dataSize, String tableName) {
+ String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
+ return dataLocation;
+ }
+
+ /**
+ * Register all tables into BeamSqlEnv, set their schemas, and set the locations where their
+ * corresponding data are stored. Currently this method is not supported by ZetaSQL planner.
+ */
+ private static void registerAllTablesByBeamSqlEnv(BeamSqlEnv env, String dataSize)
+ throws Exception {
+ List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
+ for (String tableName : tableNames) {
+ String createStatement = buildTableCreateStatement(tableName);
+ String tableSchema = TableSchemaJSONLoader.parseTableSchema(tableName);
+ String dataLocation = buildDataLocation(dataSize, tableName);
+ env.executeDdl(String.format(createStatement, tableSchema, dataLocation));
}
-
- private static String buildDataLocation(String dataSize, String tableName) {
- String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
- return dataLocation;
+ }
+
+ /**
+ * Register all tables into InMemoryMetaStore, set their schemas, and set the locations where
+ * their corresponding data are stored.
+ */
+ private static void registerAllTablesByInMemoryMetaStore(
+ InMemoryMetaStore inMemoryMetaStore, String dataSize) throws Exception {
+ JSONObject properties = new JSONObject();
+ properties.put("csvformat", "InformixUnload");
+
+ Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
+ for (Map.Entry<String, Schema> entry : schemaMap.entrySet()) {
+ String tableName = entry.getKey();
+ String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
+ Schema tableSchema = schemaMap.get(tableName);
+ Table table =
+ Table.builder()
+ .name(tableName)
+ .schema(tableSchema)
+ .location(dataLocation)
+ .properties(properties)
+ .type("text")
+ .build();
+ inMemoryMetaStore.createTable(table);
}
-
- /** Register all tables into BeamSqlEnv, set their schemas, and set the locations where their corresponding data are stored.
- * Currently this method is not supported by ZetaSQL planner. */
- private static void registerAllTablesByBeamSqlEnv(BeamSqlEnv env, String dataSize) throws Exception {
- List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
- for (String tableName : tableNames) {
- String createStatement = buildTableCreateStatement(tableName);
- String tableSchema = TableSchemaJSONLoader.parseTableSchema(tableName);
- String dataLocation = buildDataLocation(dataSize, tableName);
- env.executeDdl(String.format(createStatement, tableSchema, dataLocation));
- }
+ }
+
+ /**
+ * Print the summary table after all jobs are finished.
+ *
+ * @param completion A collection of all TpcdsRunResult that are from finished jobs.
+ * @param numOfResults The number of results in the collection.
+ * @throws Exception
+ */
+ private static void printExecutionSummary(
+ CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
+ List<List<String>> summaryRowsList = new ArrayList<>();
+ for (int i = 0; i < numOfResults; i++) {
+ TpcdsRunResult tpcdsRunResult = completion.take().get();
+ List<String> list = new ArrayList<>();
+ list.add(tpcdsRunResult.getQueryName());
+ list.add(tpcdsRunResult.getJobName());
+ list.add(tpcdsRunResult.getDataSize());
+ list.add(tpcdsRunResult.getDialect());
+ list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
+ list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
+ list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString() : "");
+ list.add(
+ tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
+ summaryRowsList.add(list);
}
- /** Register all tables into InMemoryMetaStore, set their schemas, and set the locations where their corresponding data are stored. */
- private static void registerAllTablesByInMemoryMetaStore(InMemoryMetaStore inMemoryMetaStore, String dataSize) throws Exception {
- JSONObject properties = new JSONObject();
- properties.put("csvformat", "InformixUnload");
-
- Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
- for (String tableName : schemaMap.keySet()) {
- String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
- Schema tableSchema = schemaMap.get(tableName);
- Table table = Table.builder().name(tableName).schema(tableSchema).location(dataLocation).properties(properties).type("text").build();
- inMemoryMetaStore.createTable(table);
- }
+ System.out.println(SUMMARY_START);
+ System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+ }
+
+ /**
+ * This is the alternative method in BeamTpcds.main method. Run job using BeamSqlEnv.parseQuery()
+ * method. (Doesn't perform well when running query96).
+ *
+ * @param args Command line arguments
+ * @throws Exception
+ */
+ public static void runUsingBeamSqlEnv(String[] args) throws Exception {
+ InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
+ inMemoryMetaStore.registerProvider(new TextTableProvider());
+
+ TpcdsOptions tpcdsOptions =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+
+ String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+ String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
+ int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
+
+ // Using ExecutorService and CompletionService to fulfill multi-threading functionality
+ ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+ CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
+
+ // Directly create all tables and register them into inMemoryMetaStore before creating
+ // BeamSqlEnv object.
+ registerAllTablesByInMemoryMetaStore(inMemoryMetaStore, dataSize);
+
+ BeamSqlPipelineOptions beamSqlPipelineOptions = tpcdsOptions.as(BeamSqlPipelineOptions.class);
+ BeamSqlEnv env =
+ BeamSqlEnv.builder(inMemoryMetaStore)
+ .setPipelineOptions(beamSqlPipelineOptions)
+ .setQueryPlannerClassName(beamSqlPipelineOptions.getPlannerName())
+ .build();
+
+ // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
+ Pipeline[] pipelines = new Pipeline[queryNameArr.length];
+
+ // Execute all queries, transform the each result into a PCollection<String>, write them into
+ // the txt file and store in a GCP directory.
+ for (int i = 0; i < queryNameArr.length; i++) {
+ // For each query, get a copy of pipelineOptions from command line arguments, cast
+ // tpcdsOptions as a DataflowPipelineOptions object to read and set required parameters for
+ // pipeline execution.
+ TpcdsOptions tpcdsOptionsCopy =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+ DataflowPipelineOptions dataflowPipelineOptionsCopy =
+ tpcdsOptionsCopy.as(DataflowPipelineOptions.class);
+
+ // Set a unique job name using the time stamp so that multiple different pipelines can run
+ // together.
+ dataflowPipelineOptionsCopy.setJobName(
+ queryNameArr[i] + "result" + System.currentTimeMillis());
+
+ pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
+ String queryString = QueryReader.readQuery(queryNameArr[i]);
+
+ try {
+ // Query execution
+ PCollection<Row> rows =
+ BeamSqlRelUtils.toPCollection(pipelines[i], env.parseQuery(queryString));
+
+ // Transform the result from PCollection<Row> into PCollection<String>, and write it to the
+ // location where results are stored.
+ PCollection<String> rowStrings =
+ rows.apply(
+ MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()));
+ rowStrings.apply(
+ TextIO.write()
+ .to(
+ RESULT_DIRECTORY
+ + "/"
+ + dataSize
+ + "/"
+ + pipelines[i].getOptions().getJobName())
+ .withSuffix(".txt")
+ .withNumShards(1));
+ } catch (Exception e) {
+ LOG.error("{} failed to execute", queryNameArr[i]);
+ e.printStackTrace();
+ }
+
+ completion.submit(new TpcdsRun(pipelines[i]));
}
- /**
- * Print the summary table after all jobs are finished.
- * @param completion A collection of all TpcdsRunResult that are from finished jobs.
- * @param numOfResults The number of results in the collection.
- * @throws Exception
- */
- private static void printExecutionSummary(CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
- List<List<String>> summaryRowsList = new ArrayList<>();
- for (int i = 0; i < numOfResults; i++) {
- TpcdsRunResult tpcdsRunResult = completion.take().get();
- List<String> list = new ArrayList<>();
- list.add(tpcdsRunResult.getQueryName());
- list.add(tpcdsRunResult.getJobName());
- list.add(tpcdsRunResult.getDataSize());
- list.add(tpcdsRunResult.getDialect());
- list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
- list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
- list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString(): "");
- list.add(tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
- summaryRowsList.add(list);
- }
-
- System.out.println(SUMMARY_START);
- System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
- }
+ executor.shutdown();
- /**
- * This is the alternative method in BeamTpcds.main method. Run job using BeamSqlEnv.parseQuery() method. (Doesn't perform well when running query96).
- * @param args Command line arguments
- * @throws Exception
- */
- public static void runUsingBeamSqlEnv(String[] args) throws Exception {
- InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
- inMemoryMetaStore.registerProvider(new TextTableProvider());
-
- TpcdsOptions tpcdsOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
-
- String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
- String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
- int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
-
- // Using ExecutorService and CompletionService to fulfill multi-threading functionality
- ExecutorService executor = Executors.newFixedThreadPool(nThreads);
- CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
-
- // Directly create all tables and register them into inMemoryMetaStore before creating BeamSqlEnv object.
- registerAllTablesByInMemoryMetaStore(inMemoryMetaStore, dataSize);
-
- BeamSqlPipelineOptions beamSqlPipelineOptions = tpcdsOptions.as(BeamSqlPipelineOptions.class);
- BeamSqlEnv env =
- BeamSqlEnv
- .builder(inMemoryMetaStore)
- .setPipelineOptions(beamSqlPipelineOptions)
- .setQueryPlannerClassName(beamSqlPipelineOptions.getPlannerName())
- .build();
-
- // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
- Pipeline[] pipelines = new Pipeline[queryNameArr.length];
-
- // Execute all queries, transform the each result into a PCollection<String>, write them into the txt file and store in a GCP directory.
- for (int i = 0; i < queryNameArr.length; i++) {
- // For each query, get a copy of pipelineOptions from command line arguments, cast tpcdsOptions as a DataflowPipelineOptions object to read and set required parameters for pipeline execution.
- TpcdsOptions tpcdsOptionsCopy = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
- DataflowPipelineOptions dataflowPipelineOptionsCopy = tpcdsOptionsCopy.as(DataflowPipelineOptions.class);
-
- // Set a unique job name using the time stamp so that multiple different pipelines can run together.
- dataflowPipelineOptionsCopy.setJobName(queryNameArr[i] + "result" + System.currentTimeMillis());
-
- pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
- String queryString = QueryReader.readQuery(queryNameArr[i]);
-
- try {
- // Query execution
- PCollection<Row> rows = BeamSqlRelUtils.toPCollection(pipelines[i], env.parseQuery(queryString));
-
- // Transform the result from PCollection<Row> into PCollection<String>, and write it to the location where results are stored.
- PCollection<String> rowStrings = rows.apply(MapElements
- .into(TypeDescriptors.strings())
- .via((Row row) -> row.toString()));
- rowStrings.apply(TextIO.write().to(RESULT_DIRECTORY + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName()).withSuffix(".txt").withNumShards(1));
- } catch (Exception e) {
- Log.error("{} failed to execute", queryNameArr[i]);
- e.printStackTrace();
- }
-
- completion.submit(new TpcdsRun(pipelines[i]));
- }
-
- executor.shutdown();
-
- printExecutionSummary(completion, queryNameArr.length);
- }
+ printExecutionSummary(completion, queryNameArr.length);
+ }
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
index a7f67de..6b25f65 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java
@@ -17,43 +17,33 @@
*/
package org.apache.beam.sdk.tpcds;
-
/**
* To execute this main() method, run the following example command from the command line.
*
- * ./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
- * --queries=3,26,55 \
- * --tpcParallel=2 \
- * --project=apache-beam-testing \
- * --stagingLocation=gs://beamsql_tpcds_1/staging \
- * --tempLocation=gs://beamsql_tpcds_2/temp \
- * --runner=DataflowRunner \
- * --region=us-west1 \
- * --maxNumWorkers=10"
- *
+ * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=3,26,55 \
+ * --tpcParallel=2 \ --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging
+ * \ --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \
+ * --maxNumWorkers=10"
*
- * To run query using ZetaSQL planner (currently query96 can be run using ZetaSQL), set the plannerName as below. If not specified, the default planner is Calcite.
+ * <p>To run query using ZetaSQL planner (currently query96 can be run using ZetaSQL), set the
+ * plannerName as below. If not specified, the default planner is Calcite.
*
- * ./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \
- * --queries=96 \
- * --tpcParallel=2 \
- * --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \
- * --project=apache-beam-testing \
- * --stagingLocation=gs://beamsql_tpcds_1/staging \
- * --tempLocation=gs://beamsql_tpcds_2/temp \
- * --runner=DataflowRunner \
- * --region=us-west1 \
- * --maxNumWorkers=10"
+ * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=96 \
+ * --tpcParallel=2 \ --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \
+ * --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging \
+ * --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \
+ * --maxNumWorkers=10"
*/
public class BeamTpcds {
- /**
- * The main method can choose to run either SqlTransformRunner.runUsingSqlTransform() or BeamSqlEnvRunner.runUsingBeamSqlEnv()
- * Currently the former has better performance so it is chosen.
- *
- * @param args Command line arguments
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- SqlTransformRunner.runUsingSqlTransform(args);
- }
+ /**
+ * The main method can choose to run either SqlTransformRunner.runUsingSqlTransform() or
+ * BeamSqlEnvRunner.runUsingBeamSqlEnv() Currently the former has better performance so it is
+ * chosen.
+ *
+ * @param args Command line arguments
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ SqlTransformRunner.runUsingSqlTransform(args);
+ }
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java
index 22519b2..d66b128 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/CsvToRow.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.tpcds;
+import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLines2BeamRows;
+
+import java.io.Serializable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.PTransform;
@@ -25,34 +28,30 @@ import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.csv.CSVFormat;
-import java.io.Serializable;
-
-import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLines2BeamRows;
-
/**
- * A readConverter class for TextTable that can read csv file and transform it to PCollection<Row>
+ * A readConverter class for TextTable that can read csv file and transform it to PCollection<Row>.
*/
public class CsvToRow extends PTransform<PCollection<String>, PCollection<Row>>
- implements Serializable {
- private Schema schema;
- private CSVFormat csvFormat;
+ implements Serializable {
+ private Schema schema;
+ private CSVFormat csvFormat;
- public CSVFormat getCsvFormat() {
- return csvFormat;
- }
+ public CSVFormat getCsvFormat() {
+ return csvFormat;
+ }
- public CsvToRow(Schema schema, CSVFormat csvFormat) {
- this.schema = schema;
- this.csvFormat = csvFormat;
- }
+ public CsvToRow(Schema schema, CSVFormat csvFormat) {
+ this.schema = schema;
+ this.csvFormat = csvFormat;
+ }
- @Override
- public PCollection<Row> expand(PCollection<String> input) {
- return input
- .apply(
- "csvToRow",
- FlatMapElements.into(TypeDescriptors.rows())
- .via(s -> csvLines2BeamRows(csvFormat, s, schema)))
- .setRowSchema(schema);
- }
+ @Override
+ public PCollection<Row> expand(PCollection<String> input) {
+ return input
+ .apply(
+ "csvToRow",
+ FlatMapElements.into(TypeDescriptors.rows())
+ .via(s -> csvLines2BeamRows(csvFormat, s, schema)))
+ .setRowSchema(schema);
+ }
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
index 1666c78..ca4cf63 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java
@@ -19,41 +19,66 @@ package org.apache.beam.sdk.tpcds;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileReader;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.util.Objects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
/**
- * The QueryReader reads query file (the file's extension is '.sql' and content doesn't end with a ';'), write the query as a string and return it.
+ * The QueryReader reads query file (the file's extension is '.sql' and content doesn't end with a
+ * ';'), write the query as a string and return it.
*/
public class QueryReader {
- /**
- * Reads a query file (.sql), return the query as a string.
- * @param queryFileName The name of the query file (such as "query1, query5...") which is stored in resource/queries directory
- * @return The query string stored in this file.
- * @throws Exception
- */
- public static String readQuery(String queryFileName) throws Exception {
- // Prepare the file reader.
- String queryFilePath = Objects.requireNonNull(QueryReader.class.getClassLoader().getResource("queries/" + queryFileName + ".sql")).getPath();
- File queryFile = new File(queryFilePath);
- FileReader fileReader = new FileReader(queryFile);
- BufferedReader reader = new BufferedReader(fileReader);
-
- // Read the file into stringBuilder.
- StringBuilder stringBuilder = new StringBuilder();
- String line;
- String ls = System.getProperty("line.separator");
- while ((line = reader.readLine()) != null) {
- stringBuilder.append(line);
- stringBuilder.append(ls);
- }
-
- // Delete the last new line separator.
- stringBuilder.deleteCharAt(stringBuilder.length() - 1);
- reader.close();
-
- String queryString = stringBuilder.toString();
-
- return queryString;
+ public static String readQuery(String queryFileName) throws Exception {
+ String path = "queries/" + queryFileName + ".sql";
+ String query = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
+ return query;
+ }
+
+ /**
+ * Reads a query file (.sql), return the query as a string.
+ *
+ * @param queryFileName The name of the query file (such as "query1, query5...") which is stored
+ * in resource/queries directory
+ * @return The query string stored in this file.
+ * @throws Exception
+ */
+ public static String readQuery2(String queryFileName) throws Exception {
+
+ // Prepare the file reader.
+ ClassLoader classLoader = QueryReader.class.getClassLoader();
+ if (classLoader == null) {
+ throw new RuntimeException("Can't get classloader from QueryReader.");
+ }
+ String path = "queries/" + queryFileName + ".sql";
+
+ URL resource = classLoader.getResource(path);
+ if (resource == null) {
+ throw new RuntimeException("Resource for " + path + " can't be null.");
}
+ String queryFilePath = Objects.requireNonNull(resource).getPath();
+ File queryFile = new File(queryFilePath);
+ Reader fileReader =
+ new InputStreamReader(new FileInputStream(queryFile), StandardCharsets.UTF_8);
+ BufferedReader reader = new BufferedReader(fileReader);
+
+ // Read the file into stringBuilder.
+ StringBuilder stringBuilder = new StringBuilder();
+ String line;
+ String ls = System.getProperty("line.separator");
+ while ((line = reader.readLine()) != null) {
+ stringBuilder.append(line);
+ stringBuilder.append(ls);
+ }
+
+ // Delete the last new line separator.
+ stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+ reader.close();
+
+ return stringBuilder.toString();
+ }
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java
index 3bae75d..40a8cc5 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/RowToCsv.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.tpcds;
+import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
+
+import java.io.Serializable;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
@@ -24,29 +27,26 @@ import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.csv.CSVFormat;
-import java.io.Serializable;
-
-import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
-
/**
- * A writeConverter class for TextTable that can transform PCollection<Row> into PCollection<String>, the format of string is determined by CSVFormat
+ * A writeConverter class for TextTable that can transform PCollection<Row> into
+ * PCollection<String>, the format of string is determined by CSVFormat.
*/
public class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>
- implements Serializable {
- private CSVFormat csvFormat;
+ implements Serializable {
+ private CSVFormat csvFormat;
- public RowToCsv(CSVFormat csvFormat) {
- this.csvFormat = csvFormat;
- }
+ public RowToCsv(CSVFormat csvFormat) {
+ this.csvFormat = csvFormat;
+ }
- public CSVFormat getCsvFormat() {
- return csvFormat;
- }
+ public CSVFormat getCsvFormat() {
+ return csvFormat;
+ }
- @Override
- public PCollection<String> expand(PCollection<Row> input) {
- return input.apply(
- "rowToCsv",
- MapElements.into(TypeDescriptors.strings()).via(row -> beamRow2CsvLine(row, csvFormat)));
- }
+ @Override
+ public PCollection<String> expand(PCollection<Row> input) {
+ return input.apply(
+ "rowToCsv",
+ MapElements.into(TypeDescriptors.strings()).via(row -> beamRow2CsvLine(row, csvFormat)));
+ }
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
index a40cd12..34274f9 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
@@ -17,6 +17,14 @@
*/
package org.apache.beam.sdk.tpcds;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
@@ -27,155 +35,184 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.*;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.csv.CSVFormat;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class executes jobs using PCollection and SqlTransform, it uses SqlTransform.query to run queries.
+ * This class executes jobs using PCollection and SqlTransform, it uses SqlTransform.query to run
+ * queries.
*/
public class SqlTransformRunner {
- private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
- private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
- private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
- private static final List<String> SUMMARY_HEADERS_LIST = Arrays.asList("Query Name", "Job Name", "Data Size", "Dialect", "Status", "Start Time", "End Time", "Elapsed Time(sec)");
-
- private static final Logger Log = LoggerFactory.getLogger(SqlTransform.class);
-
- /**
- * Get all tables (in the form of TextTable) needed for a specific query execution
- * @param pipeline The pipeline that will be run to execute the query
- * @param csvFormat The csvFormat to construct readConverter (CsvToRow) and writeConverter (RowToCsv)
- * @param queryName The name of the query which will be executed (for example: query3, query55, query96)
- * @return A PCollectionTuple which is constructed by all tables needed for running query.
- * @throws Exception
- */
- private static PCollectionTuple getTables(Pipeline pipeline, CSVFormat csvFormat, String queryName) throws Exception {
- Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
- TpcdsOptions tpcdsOptions = pipeline.getOptions().as(TpcdsOptions.class);
- String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
- String queryString = QueryReader.readQuery(queryName);
-
- PCollectionTuple tables = PCollectionTuple.empty(pipeline);
- for (Map.Entry<String, Schema> tableSchema : schemaMap.entrySet()) {
- String tableName = tableSchema.getKey();
-
- // Only when queryString contains tableName, the table is relevant to this query and will be added. This can avoid reading unnecessary data files.
- if (queryString.contains(tableName)) {
- // This is location path where the data are stored
- String filePattern = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
-
- PCollection<Row> table =
- new TextTable(
- tableSchema.getValue(),
- filePattern,
- new CsvToRow(tableSchema.getValue(), csvFormat),
- new RowToCsv(csvFormat))
- .buildIOReader(pipeline.begin())
- .setCoder(SchemaCoder.of(tableSchema.getValue()))
- .setName(tableSchema.getKey());
-
- tables = tables.and(new TupleTag<>(tableName), table);
- }
- }
- return tables;
+ private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
+ private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
+ private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
+ private static final List<String> SUMMARY_HEADERS_LIST =
+ Arrays.asList(
+ "Query Name",
+ "Job Name",
+ "Data Size",
+ "Dialect",
+ "Status",
+ "Start Time",
+ "End Time",
+ "Elapsed Time(sec)");
+
+ private static final Logger LOG = LoggerFactory.getLogger(SqlTransformRunner.class);
+
+ /**
+ * Get all tables (in the form of TextTable) needed for a specific query execution.
+ *
+ * @param pipeline The pipeline that will be run to execute the query
+ * @param csvFormat The csvFormat to construct readConverter (CsvToRow) and writeConverter
+ * (RowToCsv)
+ * @param queryName The name of the query which will be executed (for example: query3, query55,
+ * query96)
+ * @return A PCollectionTuple which is constructed by all tables needed for running query.
+ * @throws Exception
+ */
+ private static PCollectionTuple getTables(
+ Pipeline pipeline, CSVFormat csvFormat, String queryName) throws Exception {
+ Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
+ TpcdsOptions tpcdsOptions = pipeline.getOptions().as(TpcdsOptions.class);
+ String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+ String queryString = QueryReader.readQuery(queryName);
+
+ PCollectionTuple tables = PCollectionTuple.empty(pipeline);
+ for (Map.Entry<String, Schema> tableSchema : schemaMap.entrySet()) {
+ String tableName = tableSchema.getKey();
+ System.out.println("tableName = " + tableName);
+
+ // Only when queryString contains tableName, the table is relevant to this query and will be
+ // added. This can avoid reading unnecessary data files.
+ if (queryString.contains(tableName)) {
+ // This is location path where the data are stored
+ String filePattern = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
+ System.out.println("filePattern = " + filePattern);
+
+ PCollection<Row> table =
+ new TextTable(
+ tableSchema.getValue(),
+ filePattern,
+ new CsvToRow(tableSchema.getValue(), csvFormat),
+ new RowToCsv(csvFormat))
+ .buildIOReader(pipeline.begin())
+ .setCoder(SchemaCoder.of(tableSchema.getValue()))
+ .setName(tableSchema.getKey());
+
+ tables = tables.and(new TupleTag<>(tableName), table);
+ }
}
-
- /**
- * Print the summary table after all jobs are finished.
- * @param completion A collection of all TpcdsRunResult that are from finished jobs.
- * @param numOfResults The number of results in the collection.
- * @throws Exception
- */
- private static void printExecutionSummary(CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
- List<List<String>> summaryRowsList = new ArrayList<>();
- for (int i = 0; i < numOfResults; i++) {
- TpcdsRunResult tpcdsRunResult = completion.take().get();
- List<String> list = new ArrayList<>();
- list.add(tpcdsRunResult.getQueryName());
- list.add(tpcdsRunResult.getJobName());
- list.add(tpcdsRunResult.getDataSize());
- list.add(tpcdsRunResult.getDialect());
- // If the job is not successful, leave the run time related field blank
- list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
- list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
- list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString(): "");
- list.add(tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
- summaryRowsList.add(list);
- }
-
- System.out.println(SUMMARY_START);
- System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+ return tables;
+ }
+
+ /**
+ * Print the summary table after all jobs are finished.
+ *
+ * @param completion A collection of all TpcdsRunResult that are from finished jobs.
+ * @param numOfResults The number of results in the collection.
+ * @throws Exception
+ */
+ private static void printExecutionSummary(
+ CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
+ List<List<String>> summaryRowsList = new ArrayList<>();
+ for (int i = 0; i < numOfResults; i++) {
+ TpcdsRunResult tpcdsRunResult = completion.take().get();
+ List<String> list = new ArrayList<>();
+ list.add(tpcdsRunResult.getQueryName());
+ list.add(tpcdsRunResult.getJobName());
+ list.add(tpcdsRunResult.getDataSize());
+ list.add(tpcdsRunResult.getDialect());
+ // If the job is not successful, leave the run time related field blank
+ list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
+ list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
+ list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString() : "");
+ list.add(
+ tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
+ summaryRowsList.add(list);
}
- /**
- * This is the default method in BeamTpcds.main method. Run job using SqlTranform.query() method.
- * @param args Command line arguments
- * @throws Exception
- */
- public static void runUsingSqlTransform(String[] args) throws Exception {
- TpcdsOptions tpcdsOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
-
- String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
- String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
- int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
-
- // Using ExecutorService and CompletionService to fulfill multi-threading functionality
- ExecutorService executor = Executors.newFixedThreadPool(nThreads);
- CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
-
- // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
- Pipeline[] pipelines = new Pipeline[queryNameArr.length];
- CSVFormat csvFormat = CSVFormat.MYSQL.withDelimiter('|').withNullString("");
-
- // Execute all queries, transform the each result into a PCollection<String>, write them into the txt file and store in a GCP directory.
- for (int i = 0; i < queryNameArr.length; i++) {
- // For each query, get a copy of pipelineOptions from command line arguments.
- TpcdsOptions tpcdsOptionsCopy = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
-
- // Cast tpcdsOptions as a BeamSqlPipelineOptions object to read and set queryPlanner (the default one is Calcite, can change to ZetaSQL).
- BeamSqlPipelineOptions beamSqlPipelineOptionsCopy = tpcdsOptionsCopy.as(BeamSqlPipelineOptions.class);
-
- // Finally, cast BeamSqlPipelineOptions as a DataflowPipelineOptions object to read and set other required pipeline optionsparameters .
- DataflowPipelineOptions dataflowPipelineOptionsCopy = beamSqlPipelineOptionsCopy.as(DataflowPipelineOptions.class);
-
- // Set a unique job name using the time stamp so that multiple different pipelines can run together.
- dataflowPipelineOptionsCopy.setJobName(queryNameArr[i] + "result" + System.currentTimeMillis());
-
- pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
- String queryString = QueryReader.readQuery(queryNameArr[i]);
- PCollectionTuple tables = getTables(pipelines[i], csvFormat, queryNameArr[i]);
-
- try {
- tables
- .apply(
- SqlTransform.query(queryString))
- .apply(
- MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
- .apply(TextIO.write()
- .to(RESULT_DIRECTORY + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName())
- .withSuffix(".txt")
- .withNumShards(1));
- } catch (Exception e) {
- Log.error("{} failed to execute", queryNameArr[i]);
- e.printStackTrace();
- }
-
- completion.submit(new TpcdsRun(pipelines[i]));
- }
-
- executor.shutdown();
-
- printExecutionSummary(completion, queryNameArr.length);
+ System.out.println(SUMMARY_START);
+ System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+ }
+
+ /**
+ * This is the default method in BeamTpcds.main method. Run job using SqlTranform.query() method.
+ *
+ * @param args Command line arguments
+ * @throws Exception
+ */
+ public static void runUsingSqlTransform(String[] args) throws Exception {
+ TpcdsOptions tpcdsOptions =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+
+ String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+ String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
+ int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
+
+ // Using ExecutorService and CompletionService to fulfill multi-threading functionality
+ ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+ CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
+
+ // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
+ Pipeline[] pipelines = new Pipeline[queryNameArr.length];
+ CSVFormat csvFormat = CSVFormat.MYSQL.withDelimiter('|').withNullString("");
+
+ // Execute all queries, transform the each result into a PCollection<String>, write them into
+ // the txt file and store in a GCP directory.
+ for (int i = 0; i < queryNameArr.length; i++) {
+ // For each query, get a copy of pipelineOptions from command line arguments.
+ TpcdsOptions tpcdsOptionsCopy =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+
+ // Cast tpcdsOptions as a BeamSqlPipelineOptions object to read and set queryPlanner (the
+ // default one is Calcite, can change to ZetaSQL).
+ BeamSqlPipelineOptions beamSqlPipelineOptionsCopy =
+ tpcdsOptionsCopy.as(BeamSqlPipelineOptions.class);
+
+ // Finally, cast BeamSqlPipelineOptions as a DataflowPipelineOptions object to read and set
+ // other required pipeline optionsparameters .
+ DataflowPipelineOptions dataflowPipelineOptionsCopy =
+ beamSqlPipelineOptionsCopy.as(DataflowPipelineOptions.class);
+
+ // Set a unique job name using the time stamp so that multiple different pipelines can run
+ // together.
+ dataflowPipelineOptionsCopy.setJobName(
+ queryNameArr[i] + "result" + System.currentTimeMillis());
+
+ pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
+ String queryString = QueryReader.readQuery(queryNameArr[i]);
+ PCollectionTuple tables = getTables(pipelines[i], csvFormat, queryNameArr[i]);
+
+ try {
+ tables
+ .apply(SqlTransform.query(queryString))
+ .apply(MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
+ .apply(
+ TextIO.write()
+ .to(
+ RESULT_DIRECTORY
+ + "/"
+ + dataSize
+ + "/"
+ + pipelines[i].getOptions().getJobName())
+ .withSuffix(".txt")
+ .withNumShards(1));
+ } catch (Exception e) {
+ LOG.error("{} failed to execute", queryNameArr[i]);
+ e.printStackTrace();
+ }
+
+ completion.submit(new TpcdsRun(pipelines[i]));
}
+
+ executor.shutdown();
+
+ printExecutionSummary(completion, queryNameArr.length);
+ }
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
index bddb6a8..36f7a28 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
@@ -20,134 +20,157 @@ package org.apache.beam.sdk.tpcds;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
-/**
- * Generate the tpcds queries execution summary on the command line after finishing all jobs.
- */
+/** Generate the tpcds queries execution summary on the command line after finishing all jobs. */
public class SummaryGenerator {
- private static final int PADDING_SIZE = 2;
- private static final String NEW_LINE = "\n";
- private static final String TABLE_JOINT_SYMBOL = "+";
- private static final String TABLE_V_SPLIT_SYMBOL = "|";
- private static final String TABLE_H_SPLIT_SYMBOL = "-";
+ private static final int PADDING_SIZE = 2;
+ private static final String NEW_LINE = "\n";
+ private static final String TABLE_JOINT_SYMBOL = "+";
+ private static final String TABLE_V_SPLIT_SYMBOL = "|";
+ private static final String TABLE_H_SPLIT_SYMBOL = "-";
- public static String generateTable(List<String> headersList, List<List<String>> rowsList,int... overRiddenHeaderHeight) {
- StringBuilder stringBuilder = new StringBuilder();
+ public static String generateTable(
+ List<String> headersList, List<List<String>> rowsList, int... overRiddenHeaderHeight) {
+ StringBuilder stringBuilder = new StringBuilder();
- int rowHeight = overRiddenHeaderHeight.length > 0 ? overRiddenHeaderHeight[0] : 1;
+ int rowHeight = overRiddenHeaderHeight.length > 0 ? overRiddenHeaderHeight[0] : 1;
- Map<Integer,Integer> columnMaxWidthMapping = getMaximumWidthofTable(headersList, rowsList);
+ Map<Integer, Integer> columnMaxWidthMapping = getMaximumWidthofTable(headersList, rowsList);
- stringBuilder.append(NEW_LINE);
- stringBuilder.append(NEW_LINE);
- createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
- stringBuilder.append(NEW_LINE);
+ stringBuilder.append(NEW_LINE);
+ stringBuilder.append(NEW_LINE);
+ createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+ stringBuilder.append(NEW_LINE);
- for (int headerIndex = 0; headerIndex < headersList.size(); headerIndex++) {
- fillCell(stringBuilder, headersList.get(headerIndex), headerIndex, columnMaxWidthMapping);
- }
-
- stringBuilder.append(NEW_LINE);
+ for (int headerIndex = 0; headerIndex < headersList.size(); headerIndex++) {
+ fillCell(stringBuilder, headersList.get(headerIndex), headerIndex, columnMaxWidthMapping);
+ }
- createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+ stringBuilder.append(NEW_LINE);
- for (List<String> row : rowsList) {
- for (int i = 0; i < rowHeight; i++) {
- stringBuilder.append(NEW_LINE);
- }
- for (int cellIndex = 0; cellIndex < row.size(); cellIndex++) {
- fillCell(stringBuilder, row.get(cellIndex), cellIndex, columnMaxWidthMapping);
- }
- }
+ createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+ for (List<String> row : rowsList) {
+ for (int i = 0; i < rowHeight; i++) {
stringBuilder.append(NEW_LINE);
- createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
- stringBuilder.append(NEW_LINE);
- stringBuilder.append(NEW_LINE);
-
- return stringBuilder.toString();
+ }
+ for (int cellIndex = 0; cellIndex < row.size(); cellIndex++) {
+ fillCell(stringBuilder, row.get(cellIndex), cellIndex, columnMaxWidthMapping);
+ }
}
- private static void fillSpace(StringBuilder stringBuilder, int length) {
- for (int i = 0; i < length; i++) {
- stringBuilder.append(" ");
- }
- }
+ stringBuilder.append(NEW_LINE);
+ createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+ stringBuilder.append(NEW_LINE);
+ stringBuilder.append(NEW_LINE);
- /** Add a rowLine at the beginning, the middle between headersList and rowLists, the end of the summary table. */
- private static void createRowLine(StringBuilder stringBuilder,int headersListSize, Map<Integer,Integer> columnMaxWidthMapping) {
- for (int i = 0; i < headersListSize; i++) {
- if(i == 0) {
- stringBuilder.append(TABLE_JOINT_SYMBOL);
- }
-
- for (int j = 0; j < columnMaxWidthMapping.get(i) + PADDING_SIZE * 2 ; j++) {
- stringBuilder.append(TABLE_H_SPLIT_SYMBOL);
- }
- stringBuilder.append(TABLE_JOINT_SYMBOL);
- }
- }
+ return stringBuilder.toString();
+ }
- /** Get the width of the summary table. */
- private static Map<Integer,Integer> getMaximumWidthofTable(List<String> headersList, List<List<String>> rowsList) {
- Map<Integer,Integer> columnMaxWidthMapping = new HashMap<>();
+ private static void fillSpace(StringBuilder stringBuilder, int length) {
+ for (int i = 0; i < length; i++) {
+ stringBuilder.append(" ");
+ }
+ }
+
+ /**
+ * Add a rowLine at the beginning, the middle between headersList and rowLists, the end of the
+ * summary table.
+ */
+ private static void createRowLine(
+ StringBuilder stringBuilder,
+ int headersListSize,
+ Map<Integer, Integer> columnMaxWidthMapping) {
+ for (int i = 0; i < headersListSize; i++) {
+ if (i == 0) {
+ stringBuilder.append(TABLE_JOINT_SYMBOL);
+ }
+
+ int columnMaxWidth = Optional.ofNullable(columnMaxWidthMapping.get(i)).orElse(0);
+ for (int j = 0; j < columnMaxWidth + PADDING_SIZE * 2; j++) {
+ stringBuilder.append(TABLE_H_SPLIT_SYMBOL);
+ }
+ stringBuilder.append(TABLE_JOINT_SYMBOL);
+ }
+ }
- for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
- columnMaxWidthMapping.put(columnIndex, 0);
- }
+ /** Get the width of the summary table. */
+ private static Map<Integer, Integer> getMaximumWidthofTable(
+ List<String> headersList, List<List<String>> rowsList) {
+ Map<Integer, Integer> columnMaxWidthMapping = new HashMap<>();
- for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
- if(headersList.get(columnIndex).length() > columnMaxWidthMapping.get(columnIndex)) {
- columnMaxWidthMapping.put(columnIndex, headersList.get(columnIndex).length());
- }
- }
+ for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+ columnMaxWidthMapping.put(columnIndex, 0);
+ }
- for (List<String> row : rowsList) {
- for (int columnIndex = 0; columnIndex < row.size(); columnIndex++) {
- if(row.get(columnIndex).length() > columnMaxWidthMapping.get(columnIndex)) {
- columnMaxWidthMapping.put(columnIndex, row.get(columnIndex).length());
- }
- }
- }
+ for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+ Integer columnMaxWidth =
+ Optional.ofNullable(columnMaxWidthMapping.get(columnIndex)).orElse(0);
+ if (headersList.get(columnIndex).length() > columnMaxWidth) {
+ columnMaxWidthMapping.put(columnIndex, headersList.get(columnIndex).length());
+ }
+ }
- for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
- if(columnMaxWidthMapping.get(columnIndex) % 2 != 0) {
- columnMaxWidthMapping.put(columnIndex, columnMaxWidthMapping.get(columnIndex) + 1);
- }
+ for (List<String> row : rowsList) {
+ for (int columnIndex = 0; columnIndex < row.size(); columnIndex++) {
+ Integer columnMaxWidth =
+ Optional.ofNullable(columnMaxWidthMapping.get(columnIndex)).orElse(0);
+ if (row.get(columnIndex).length() > columnMaxWidth) {
+ columnMaxWidthMapping.put(columnIndex, row.get(columnIndex).length());
}
-
- return columnMaxWidthMapping;
+ }
}
- private static int getOptimumCellPadding(int cellIndex,int datalength,Map<Integer,Integer> columnMaxWidthMapping,int cellPaddingSize) {
- if(datalength % 2 != 0) {
- datalength++;
- }
+ for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+ int columnMaxWidth = Optional.ofNullable(columnMaxWidthMapping.get(columnIndex)).orElse(0);
+ if (columnMaxWidth % 2 != 0) {
+ columnMaxWidthMapping.put(columnIndex, columnMaxWidth + 1);
+ }
+ }
- if(datalength < columnMaxWidthMapping.get(cellIndex)) {
- cellPaddingSize = cellPaddingSize + (columnMaxWidthMapping.get(cellIndex) - datalength) / 2;
- }
+ return columnMaxWidthMapping;
+ }
- return cellPaddingSize;
+ private static int getOptimumCellPadding(
+ int cellIndex,
+ int datalength,
+ Map<Integer, Integer> columnMaxWidthMapping,
+ int cellPaddingSize) {
+ if (datalength % 2 != 0) {
+ datalength++;
}
- /** Use space to fill a single cell with optimum cell padding size. */
- private static void fillCell(StringBuilder stringBuilder,String cell,int cellIndex,Map<Integer,Integer> columnMaxWidthMapping) {
+ int columnMaxWidth = Optional.ofNullable(columnMaxWidthMapping.get(cellIndex)).orElse(0);
+ if (datalength < columnMaxWidth) {
+ cellPaddingSize = cellPaddingSize + (columnMaxWidth - datalength) / 2;
+ }
- int cellPaddingSize = getOptimumCellPadding(cellIndex, cell.length(), columnMaxWidthMapping, PADDING_SIZE);
+ return cellPaddingSize;
+ }
- if(cellIndex == 0) {
- stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
- }
+ /** Use space to fill a single cell with optimum cell padding size. */
+ private static void fillCell(
+ StringBuilder stringBuilder,
+ String cell,
+ int cellIndex,
+ Map<Integer, Integer> columnMaxWidthMapping) {
- fillSpace(stringBuilder, cellPaddingSize);
- stringBuilder.append(cell);
- if(cell.length() % 2 != 0) {
- stringBuilder.append(" ");
- }
+ int cellPaddingSize =
+ getOptimumCellPadding(cellIndex, cell.length(), columnMaxWidthMapping, PADDING_SIZE);
- fillSpace(stringBuilder, cellPaddingSize);
+ if (cellIndex == 0) {
+ stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+ }
- stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+ fillSpace(stringBuilder, cellPaddingSize);
+ stringBuilder.append(cell);
+ if (cell.length() % 2 != 0) {
+ stringBuilder.append(" ");
}
+
+ fillSpace(stringBuilder, cellPaddingSize);
+
+ stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+ }
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
index 420386c..3a2371d 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java
@@ -17,96 +17,127 @@
*/
package org.apache.beam.sdk.tpcds;
-import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.FileNameUtils;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-
import java.io.File;
-import java.io.FileReader;
+import java.net.URL;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.ArrayList;
-
+import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.FileNameUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
/**
- * TableSchemaJSONLoader can get all table's names from resource/schemas directory and parse a table's schema into a string.
+ * TableSchemaJSONLoader can get all table's names from resource/schemas directory and parse a
+ * table's schema into a string.
*/
public class TableSchemaJSONLoader {
- /**
- * Read a table schema json file from resource/schemas directory, parse the file into a string which can be utilized by BeamSqlEnv.executeDdl method.
- * @param tableName The name of the json file to be read (fo example: item, store_sales).
- * @return A string that matches the format in BeamSqlEnv.executeDdl method, such as "d_date_sk bigint, d_date_id varchar"
- * @throws Exception
- */
- public static String parseTableSchema(String tableName) throws Exception {
- String tableFilePath = Objects.requireNonNull(TableSchemaJSONLoader.class.getClassLoader().getResource("schemas/" + tableName +".json")).getPath();
+ public static String readQuery(String tableName) throws Exception {
+ String path = "schemas/" + tableName + ".json";
+ String fixture = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
+ return fixture;
+ }
- JSONObject jsonObject = (JSONObject) new JSONParser().parse(new FileReader(new File(tableFilePath)));
- JSONArray jsonArray = (JSONArray) jsonObject.get("schema");
+ /**
+ * Read a table schema json file from resource/schemas directory, parse the file into a string
+ * which can be utilized by BeamSqlEnv.executeDdl method.
+ *
+ * @param tableName The name of the json file to be read (fo example: item, store_sales).
+ * @return A string that matches the format in BeamSqlEnv.executeDdl method, such as "d_date_sk
+ * bigint, d_date_id varchar"
+ * @throws Exception
+ */
+ @SuppressWarnings({"rawtypes", "DefaultCharset"})
+ public static String parseTableSchema(String tableName) throws Exception {
+ String path = "schemas/" + tableName + ".json";
+ String schema = Resources.toString(Resources.getResource(path), Charsets.UTF_8);
+ System.out.println("schema = " + schema);
- // Iterate each element in jsonArray to construct the schema string
- StringBuilder schemaStringBuilder = new StringBuilder();
+ JSONObject jsonObject = (JSONObject) new JSONParser().parse(schema);
+ JSONArray jsonArray = (JSONArray) jsonObject.get("schema");
+ if (jsonArray == null) {
+ throw new RuntimeException("Can't get Json array for \"schema\" key.");
+ }
- Iterator jsonArrIterator = jsonArray.iterator();
- Iterator<Map.Entry> recordIterator;
- while (jsonArrIterator.hasNext()) {
- recordIterator = ((Map) jsonArrIterator.next()).entrySet().iterator();
- while (recordIterator.hasNext()) {
- Map.Entry pair = recordIterator.next();
+ // Iterate each element in jsonArray to construct the schema string
+ StringBuilder schemaStringBuilder = new StringBuilder();
- if (pair.getKey().equals("type")) {
- // If the key of the pair is "type", make some modification before appending it to the schemaStringBuilder, then append a comma.
- String typeName = (String) pair.getValue();
- if (typeName.toLowerCase().equals("identifier") || typeName.toLowerCase().equals("integer")) {
- // Use long type to represent int, prevent overflow
- schemaStringBuilder.append("bigint");
- } else if (typeName.contains("decimal")) {
- // Currently Beam SQL doesn't handle "decimal" type properly, use "double" to replace it for now.
- schemaStringBuilder.append("double");
- } else {
- // Currently Beam SQL doesn't handle "date" type properly, use "varchar" replace it for now.
- schemaStringBuilder.append("varchar");
- }
- schemaStringBuilder.append(',');
- } else {
- // If the key of the pair is "name", directly append it to the StringBuilder, then append a space.
- schemaStringBuilder.append((pair.getValue()));
- schemaStringBuilder.append(' ');
- }
- }
- }
+ Iterator jsonArrIterator = jsonArray.iterator();
+ Iterator<Map.Entry> recordIterator;
+ while (jsonArrIterator.hasNext()) {
+ recordIterator = ((Map) jsonArrIterator.next()).entrySet().iterator();
+ while (recordIterator.hasNext()) {
+ Map.Entry pair = recordIterator.next();
- // Delete the last ',' in schema string
- if (schemaStringBuilder.length() > 0) {
- schemaStringBuilder.deleteCharAt(schemaStringBuilder.length() - 1);
+ if (pair.getKey().equals("type")) {
+ // If the key of the pair is "type", make some modification before appending it to the
+ // schemaStringBuilder, then append a comma.
+ String typeName = (String) pair.getValue();
+ if (typeName.toLowerCase().equals("identifier")
+ || typeName.toLowerCase().equals("integer")) {
+ // Use long type to represent int, prevent overflow
+ schemaStringBuilder.append("bigint");
+ } else if (typeName.contains("decimal")) {
+ // Currently Beam SQL doesn't handle "decimal" type properly, use "double" to replace it
+ // for now.
+ schemaStringBuilder.append("double");
+ } else {
+ // Currently Beam SQL doesn't handle "date" type properly, use "varchar" replace it for
+ // now.
+ schemaStringBuilder.append("varchar");
+ }
+ schemaStringBuilder.append(',');
+ } else {
+ // If the key of the pair is "name", directly append it to the StringBuilder, then append
+ // a space.
+ schemaStringBuilder.append((pair.getValue()));
+ schemaStringBuilder.append(' ');
}
+ }
+ }
- String schemaString = schemaStringBuilder.toString();
-
- return schemaString;
+ // Delete the last ',' in schema string
+ if (schemaStringBuilder.length() > 0) {
+ schemaStringBuilder.deleteCharAt(schemaStringBuilder.length() - 1);
}
- /**
- * Get all tables' names. Tables are stored in resource/schemas directory in the form of json files, such as "item.json", "store_sales.json", they'll be converted to "item", "store_sales".
- * @return The list of names of all tables.
- */
- public static List<String> getAllTableNames() {
- String tableDirPath = Objects.requireNonNull(TableSchemaJSONLoader.class.getClassLoader().getResource("schemas")).getPath();
- File tableDir = new File(tableDirPath);
- File[] tableDirListing = tableDir.listFiles();
+ String schemaString = schemaStringBuilder.toString();
- List<String> tableNames = new ArrayList<>();
+ return schemaString;
+ }
- if (tableDirListing != null) {
- for (File file : tableDirListing) {
- // Remove the .json extension in file name
- tableNames.add(FileNameUtils.getBaseName((file.getName())));
- }
- }
+ /**
+ * Get all tables' names. Tables are stored in resource/schemas directory in the form of json
+ * files, such as "item.json", "store_sales.json", they'll be converted to "item", "store_sales".
+ *
+ * @return The list of names of all tables.
+ */
+ public static List<String> getAllTableNames() {
+ ClassLoader classLoader = TableSchemaJSONLoader.class.getClassLoader();
+ if (classLoader == null) {
+ throw new RuntimeException("Can't get classloader from TableSchemaJSONLoader.");
+ }
+ URL resource = classLoader.getResource("schemas");
+ if (resource == null) {
+ throw new RuntimeException("Resource for \"schemas\" can't be null.");
+ }
+ String tableDirPath = Objects.requireNonNull(resource).getPath();
+ File tableDir = new File(tableDirPath);
+ File[] tableDirListing = tableDir.listFiles();
- return tableNames;
+ List<String> tableNames = new ArrayList<>();
+
+ if (tableDirListing != null) {
+ for (File file : tableDirListing) {
+ // Remove the .json extension in file name
+ tableNames.add(FileNameUtils.getBaseName((file.getName())));
+ }
}
+
+ return tableNames;
+ }
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
index 1c567dd..c693dfd 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java
@@ -21,22 +21,24 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
-/** Options used to configure TPC-DS test */
+/** Options used to configure TPC-DS test. */
public interface TpcdsOptions extends PipelineOptions {
- @Description("The size of TPC-DS data to run query on, user input should contain the unit, such as '1G', '10G'")
- String getDataSize();
+ @Description(
+ "The size of TPC-DS data to run query on, user input should contain the unit, such as '1G', '10G'")
+ String getDataSize();
- void setDataSize(String dataSize);
+ void setDataSize(String dataSize);
- // Set the return type to be String since reading from the command line (user input will be like "1,2,55" which represent TPC-DS query1, query3, query55)
- @Description("The queries numbers, read user input as string, numbers separated by commas")
- String getQueries();
+ // Set the return type to be String since reading from the command line (user input will be like
+ // "1,2,55" which represent TPC-DS query1, query3, query55)
+ @Description("The queries numbers, read user input as string, numbers separated by commas")
+ String getQueries();
- void setQueries(String queries);
+ void setQueries(String queries);
- @Description("The number of queries to run in parallel")
- @Default.Integer(1)
- Integer getTpcParallel();
+ @Description("The number of queries to run in parallel")
+ @Default.Integer(1)
+ Integer getTpcParallel();
- void setTpcParallel(Integer parallelism);
+ void setTpcParallel(Integer parallelism);
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
index d1ddc9d..40f4f86 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
@@ -24,10 +24,10 @@ import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.Immutabl
/** {@link AutoService} registrar for {@link TpcdsOptions}. */
@AutoService(PipelineOptionsRegistrar.class)
-public class TpcdsOptionsRegistrar implements PipelineOptionsRegistrar{
+public class TpcdsOptionsRegistrar implements PipelineOptionsRegistrar {
- @Override
- public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.of(TpcdsOptions.class);
- }
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.of(TpcdsOptions.class);
+ }
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
index c281341..8928292 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
@@ -22,86 +22,88 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-/**
- * Get and check the TpcdsOptions' parameters, throw exceptions when user input is invalid
- */
+/** Get and check the TpcdsOptions' parameters, throw exceptions when user input is invalid. */
public class TpcdsParametersReader {
- /** The data sizes that have been supported. */
- private static final Set<String> supportedDataSizes = Stream.of("1G", "10G", "100G").collect(Collectors.toCollection(HashSet::new));
-
- /**
- * Get and check dataSize entered by user. This dataSize has to have been supported.
- *
- * @param tpcdsOptions TpcdsOptions object constructed from user input
- * @return The dateSize user entered, if it is contained in supportedDataSizes set.
- * @throws Exception
- */
- public static String getAndCheckDataSize(TpcdsOptions tpcdsOptions) throws Exception {
- String dataSize = tpcdsOptions.getDataSize();
+ /** The data sizes that have been supported. */
+ private static final Set<String> supportedDataSizes =
+ Stream.of("1G", "10G", "100G").collect(Collectors.toCollection(HashSet::new));
- if (!supportedDataSizes.contains(dataSize)) {
- throw new Exception("The data size you entered has not been supported.");
- }
+ /**
+ * Get and check dataSize entered by user. This dataSize has to have been supported.
+ *
+ * @param tpcdsOptions TpcdsOptions object constructed from user input
+ * @return The dateSize user entered, if it is contained in supportedDataSizes set.
+ * @throws Exception
+ */
+ public static String getAndCheckDataSize(TpcdsOptions tpcdsOptions) throws Exception {
+ String dataSize = tpcdsOptions.getDataSize();
- return dataSize;
+ if (!supportedDataSizes.contains(dataSize)) {
+ throw new Exception("The data size you entered has not been supported.");
}
- /**
- * Get and check queries entered by user. This has to be a string of numbers separated by commas or "all" which means run all 99 queiries.
- * All query numbers have to be between 1 and 99.
- *
- * @param tpcdsOptions TpcdsOptions object constructed from user input
- * @return An array of query names, for example "1,2,7" will be output as "query1,query2,query7"
- * @throws Exception
- */
- public static String[] getAndCheckQueryNameArray(TpcdsOptions tpcdsOptions) throws Exception {
- String queryNums = tpcdsOptions.getQueries();
+ return dataSize;
+ }
- String[] queryNumArr;
- if (queryNums.toLowerCase().equals("all")) {
- // All 99 TPC-DS queries need to be executed.
- queryNumArr = new String[99];
- for (int i = 0; i < 99; i++) {
- queryNumArr[i] = Integer.toString(i + 1);
- }
- } else {
- // Split user input queryNums by spaces and commas, get an array of all query numbers.
- queryNumArr = queryNums.split("[\\s,]+");
+ /**
+ * Get and check queries entered by user. This has to be a string of numbers separated by commas
+ * or "all" which means run all 99 queiries. All query numbers have to be between 1 and 99.
+ *
+ * @param tpcdsOptions TpcdsOptions object constructed from user input
+ * @return An array of query names, for example "1,2,7" will be output as "query1,query2,query7"
+ * @throws Exception
+ */
+ public static String[] getAndCheckQueryNameArray(TpcdsOptions tpcdsOptions) throws Exception {
+ String queryNums = tpcdsOptions.getQueries();
- for (String queryNumStr : queryNumArr) {
- try {
- int queryNum = Integer.parseInt(queryNumStr);
- if (queryNum < 1 || queryNum > 99) {
- throw new Exception("The queries you entered contains invalid query number, please provide integers between 1 and 99.");
- }
- } catch (NumberFormatException e) {
- System.out.println("The queries you entered should be integers, please provide integers between 1 and 99.");
- }
- }
- }
+ String[] queryNumArr;
+ if (queryNums.toLowerCase().equals("all")) {
+ // All 99 TPC-DS queries need to be executed.
+ queryNumArr = new String[99];
+ for (int i = 0; i < 99; i++) {
+ queryNumArr[i] = Integer.toString(i + 1);
+ }
+ } else {
+ // Split user input queryNums by spaces and commas, get an array of all query numbers.
+ queryNumArr = queryNums.split("[\\s,]+");
- String[] queryNameArr = new String[queryNumArr.length];
- for (int i = 0; i < queryNumArr.length; i++) {
- queryNameArr[i] = "query" + queryNumArr[i];
+ for (String queryNumStr : queryNumArr) {
+ try {
+ int queryNum = Integer.parseInt(queryNumStr);
+ if (queryNum < 1 || queryNum > 99) {
+ throw new Exception(
+ "The queries you entered contains invalid query number, please provide integers between 1 and 99.");
+ }
+ } catch (NumberFormatException e) {
+ System.out.println(
+ "The queries you entered should be integers, please provide integers between 1 and 99.");
}
+ }
+ }
- return queryNameArr;
+ String[] queryNameArr = new String[queryNumArr.length];
+ for (int i = 0; i < queryNumArr.length; i++) {
+ queryNameArr[i] = "query" + queryNumArr[i];
}
- /**
- * Get and check TpcParallel entered by user. This has to be an integer between 1 and 99.
- *
- * @param tpcdsOptions TpcdsOptions object constructed from user input.
- * @return The TpcParallel user entered.
- * @throws Exception
- */
- public static int getAndCheckTpcParallel(TpcdsOptions tpcdsOptions) throws Exception {
- int nThreads = tpcdsOptions.getTpcParallel();
+ return queryNameArr;
+ }
- if (nThreads < 1 || nThreads > 99) {
- throw new Exception("The TpcParallel your entered is invalid, please provide an integer between 1 and 99.");
- }
+ /**
+ * Get and check TpcParallel entered by user. This has to be an integer between 1 and 99.
+ *
+ * @param tpcdsOptions TpcdsOptions object constructed from user input.
+ * @return The TpcParallel user entered.
+ * @throws Exception
+ */
+ public static int getAndCheckTpcParallel(TpcdsOptions tpcdsOptions) throws Exception {
+ int nThreads = tpcdsOptions.getTpcParallel();
- return nThreads;
+ if (nThreads < 1 || nThreads > 99) {
+ throw new Exception(
+ "The TpcParallel your entered is invalid, please provide an integer between 1 and 99.");
}
+
+ return nThreads;
+ }
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
index 1070a88..4b80aa8 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
@@ -17,40 +17,42 @@
*/
package org.apache.beam.sdk.tpcds;
+import java.util.concurrent.Callable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
-import java.util.concurrent.Callable;
-/**
- * To fulfill multi-threaded execution
- */
+/** To fulfill multi-threaded execution. */
public class TpcdsRun implements Callable<TpcdsRunResult> {
- private final Pipeline pipeline;
+ private final Pipeline pipeline;
- public TpcdsRun (Pipeline pipeline) {
- this.pipeline = pipeline;
- }
-
- @Override
- public TpcdsRunResult call() {
- TpcdsRunResult tpcdsRunResult;
+ public TpcdsRun(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
- try {
- PipelineResult pipelineResult = pipeline.run();
- long startTimeStamp = System.currentTimeMillis();
- State state = pipelineResult.waitUntilFinish();
- long endTimeStamp = System.currentTimeMillis();
+ @Override
+ public TpcdsRunResult call() {
+ TpcdsRunResult tpcdsRunResult;
- // Make sure to set the job status to be successful only when pipelineResult's final state is DONE.
- boolean isSuccessful = state == State.DONE;
- tpcdsRunResult = new TpcdsRunResult(isSuccessful, startTimeStamp, endTimeStamp, pipeline.getOptions(), pipelineResult);
- } catch (Exception e) {
- // If the pipeline execution failed, return a result with failed status but don't interrupt other threads.
- e.printStackTrace();
- tpcdsRunResult = new TpcdsRunResult(false, 0, 0, pipeline.getOptions(), null);
- }
+ try {
+ PipelineResult pipelineResult = pipeline.run();
+ long startTimeStamp = System.currentTimeMillis();
+ State state = pipelineResult.waitUntilFinish();
+ long endTimeStamp = System.currentTimeMillis();
- return tpcdsRunResult;
+ // Make sure to set the job status to be successful only when pipelineResult's final state is
+ // DONE.
+ boolean isSuccessful = state == State.DONE;
+ tpcdsRunResult =
+ new TpcdsRunResult(
+ isSuccessful, startTimeStamp, endTimeStamp, pipeline.getOptions(), pipelineResult);
+ } catch (Exception e) {
+ // If the pipeline execution failed, return a result with failed status but don't interrupt
+ // other threads.
+ e.printStackTrace();
+ tpcdsRunResult = new TpcdsRunResult(false, 0, 0, pipeline.getOptions(), null);
}
+
+ return tpcdsRunResult;
+ }
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
index 0e22dce..c89a34a 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
@@ -17,76 +17,89 @@
*/
package org.apache.beam.sdk.tpcds;
+import java.sql.Timestamp;
+import java.util.Date;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import java.sql.Timestamp;
-import java.util.Date;
-
+import org.checkerframework.checker.nullness.qual.Nullable;
public class TpcdsRunResult {
- private boolean isSuccessful;
- private long startTime;
- private long endTime;
- private PipelineOptions pipelineOptions;
- private PipelineResult pipelineResult;
+ private final boolean isSuccessful;
+ private final long startTime;
+ private final long endTime;
+ private final PipelineOptions pipelineOptions;
+ private final @Nullable PipelineResult pipelineResult;
- public TpcdsRunResult(boolean isSuccessful, long startTime, long endTime, PipelineOptions pipelineOptions, PipelineResult pipelineResult) {
- this.isSuccessful = isSuccessful;
- this.startTime = startTime;
- this.endTime = endTime;
- this.pipelineOptions = pipelineOptions;
- this.pipelineResult = pipelineResult;
- }
+ public TpcdsRunResult(
+ boolean isSuccessful,
+ long startTime,
+ long endTime,
+ PipelineOptions pipelineOptions,
+ @Nullable PipelineResult pipelineResult) {
+ this.isSuccessful = isSuccessful;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.pipelineOptions = pipelineOptions;
+ this.pipelineResult = pipelineResult;
+ }
- public boolean getIsSuccessful() { return isSuccessful; }
+ public boolean getIsSuccessful() {
+ return isSuccessful;
+ }
- public Date getStartDate() {
- Timestamp startTimeStamp = new Timestamp(startTime);
- Date startDate = new Date(startTimeStamp.getTime());
- return startDate;
- }
+ public Date getStartDate() {
+ Timestamp startTimeStamp = new Timestamp(startTime);
+ Date startDate = new Date(startTimeStamp.getTime());
+ return startDate;
+ }
- public Date getEndDate() {
- Timestamp endTimeStamp = new Timestamp(endTime);
- Date endDate = new Date(endTimeStamp.getTime());
- return endDate;
- }
+ public Date getEndDate() {
+ Timestamp endTimeStamp = new Timestamp(endTime);
+ Date endDate = new Date(endTimeStamp.getTime());
+ return endDate;
+ }
- public double getElapsedTime() {
- return (endTime - startTime) / 1000.0;
- }
+ public double getElapsedTime() {
+ return (endTime - startTime) / 1000.0;
+ }
- public PipelineOptions getPipelineOptions() { return pipelineOptions; }
+ public PipelineOptions getPipelineOptions() {
+ return pipelineOptions;
+ }
- public PipelineResult getPipelineResult() { return pipelineResult; }
+ public @Nullable PipelineResult getPipelineResult() {
+ return pipelineResult;
+ }
- public String getJobName() {
- PipelineOptions pipelineOptions = getPipelineOptions();
- return pipelineOptions.getJobName();
- }
+ public String getJobName() {
+ PipelineOptions pipelineOptions = getPipelineOptions();
+ return pipelineOptions.getJobName();
+ }
- public String getQueryName() {
- String jobName = getJobName();
- int endIndex = jobName.indexOf("result");
- String queryName = jobName.substring(0, endIndex);
- return queryName;
- }
+ public String getQueryName() {
+ String jobName = getJobName();
+ int endIndex = jobName.indexOf("result");
+ String queryName = jobName.substring(0, endIndex);
+ return queryName;
+ }
- public String getDataSize() throws Exception {
- PipelineOptions pipelineOptions = getPipelineOptions();
- return TpcdsParametersReader.getAndCheckDataSize(pipelineOptions.as(TpcdsOptions.class));
- }
+ public String getDataSize() throws Exception {
+ PipelineOptions pipelineOptions = getPipelineOptions();
+ return TpcdsParametersReader.getAndCheckDataSize(pipelineOptions.as(TpcdsOptions.class));
+ }
- public String getDialect() throws Exception {
- PipelineOptions pipelineOptions = getPipelineOptions();
- String queryPlannerClassName = pipelineOptions.as(BeamSqlPipelineOptions.class).getPlannerName();
- String dialect;
- if (queryPlannerClassName.equals("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")) {
- dialect = "ZetaSQL";
- } else {
- dialect = "Calcite";
- }
- return dialect;
+ public String getDialect() throws Exception {
+ PipelineOptions pipelineOptions = getPipelineOptions();
+ String queryPlannerClassName =
+ pipelineOptions.as(BeamSqlPipelineOptions.class).getPlannerName();
+ String dialect;
+ if (queryPlannerClassName.equals(
+ "org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")) {
+ dialect = "ZetaSQL";
+ } else {
+ dialect = "Calcite";
}
+ return dialect;
+ }
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java
index b776551..7f3d874 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsSchemas.java
@@ -17,656 +17,706 @@
*/
package org.apache.beam.sdk.tpcds;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
public class TpcdsSchemas {
- /**
- * Get all tpcds table schemas automatically by reading json files.
- * In this case all field will be nullable, this is a bit different from the tpcds specification, but doesn't affect query execution.
- *
- * @return A map of all tpcds table schemas with their table names as keys.
- * @throws Exception
- */
- public static Map<String, Schema> getTpcdsSchemas() throws Exception {
- Map<String, Schema> schemaMap = new HashMap<>();
-
- List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
- for (String tableName : tableNames) {
- Schema.Builder schemaBuilder = Schema.builder();
-
- String tableSchemaString = TableSchemaJSONLoader.parseTableSchema(tableName);
- String[] nameTypePairs = tableSchemaString.split(",");
-
- for (String nameTypePairString : nameTypePairs) {
- String[] nameTypePair = nameTypePairString.split("\\s+");
- String name = nameTypePair[0];
- String type = nameTypePair[1];
-
- Schema.FieldType fieldType;
- if (type.equals("bigint")) {
- fieldType = Schema.FieldType.INT64;
- } else if (type.equals("double")) {
- fieldType = Schema.FieldType.DOUBLE;
- } else {
- fieldType = Schema.FieldType.STRING;
- }
-
- schemaBuilder.addNullableField(name, fieldType);
- }
-
- Schema tableSchema = schemaBuilder.build();
- schemaMap.put(tableName,tableSchema);
+ /**
+ * Get all tpcds table schemas automatically by reading json files. In this case all field will be
+ * nullable, this is a bit different from the tpcds specification, but doesn't affect query
+ * execution.
+ *
+ * @return A map of all tpcds table schemas with their table names as keys.
+ * @throws Exception
+ */
+ @SuppressWarnings("StringSplitter")
+ public static Map<String, Schema> getTpcdsSchemas() throws Exception {
+ Map<String, Schema> schemaMap = new HashMap<>();
+
+ List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
+ for (String tableName : tableNames) {
+ Schema.Builder schemaBuilder = Schema.builder();
+
+ String tableSchemaString = TableSchemaJSONLoader.parseTableSchema(tableName);
+ String[] nameTypePairs = tableSchemaString.split(",");
+
+ for (String nameTypePairString : nameTypePairs) {
+ String[] nameTypePair = nameTypePairString.split("\\s+");
+ String name = nameTypePair[0];
+ String type = nameTypePair[1];
+
+ Schema.FieldType fieldType;
+ if (type.equals("bigint")) {
+ fieldType = Schema.FieldType.INT64;
+ } else if (type.equals("double")) {
+ fieldType = Schema.FieldType.DOUBLE;
+ } else {
+ fieldType = Schema.FieldType.STRING;
}
- return schemaMap;
- }
- /**
- * Get all tpcds table schemas according to tpcds official specification. Some fields are set to be not nullable.
- *
- * @return A map of all tpcds table schemas with their table names as keys.
- */
- public static Map<String, Schema> getTpcdsSchemasImmutableMap() {
- ImmutableMap<String, Schema> immutableSchemaMap =
- ImmutableMap.<String, Schema> builder()
- .put("call_center", callCenterSchema)
- .put("catalog_page", catalogPageSchema)
- .put("catalog_returns", catalogReturnsSchema)
- .put("catalog_sales", catalogSalesSchema)
- .put("customer", customerSchema)
- .put("customer_address", customerAddressSchema)
- .put("customer_demographics", customerDemographicsSchema)
- .put("date_dim", dateDimSchema)
- .put("household_demographics", householdDemographicsSchema)
- .put("income_band", incomeBandSchema)
- .put("inventory", inventorySchema)
- .put("item", itemSchema)
- .put("promotion", promotionSchema)
- .put("reason", reasonSchema)
- .put("ship_mode", shipModeSchema)
- .put("store", storeSchema)
- .put("store_returns", storeReturnsSchema)
- .put("store_sales", storeSalesSchema)
- .put("time_dim", timeDimSchema)
- .put("warehouse", warehouseSchema)
- .put("web_page", webPageSchema)
- .put("web_returns", webReturnsSchema)
- .put("web_sales", webSalesSchema)
- .put("web_site", webSiteSchema)
- .build();
- return immutableSchemaMap;
- }
+ schemaBuilder.addNullableField(name, fieldType);
+ }
- public static Schema getCallCenterSchema() { return callCenterSchema; }
-
- public static Schema getCatalogPageSchema() { return catalogPageSchema; }
-
- public static Schema getCatalogReturnsSchema() { return catalogReturnsSchema; }
-
- public static Schema getCatalogSalesSchema() { return catalogSalesSchema; }
-
- public static Schema getCustomerSchema() { return customerSchema; }
-
- public static Schema getCustomerAddressSchema() { return customerAddressSchema; }
-
- public static Schema getCustomerDemographicsSchema() { return customerDemographicsSchema; }
-
- public static Schema getDateDimSchema() { return dateDimSchema; }
-
- public static Schema getHouseholdDemographicsSchema() { return householdDemographicsSchema; }
-
- public static Schema getIncomeBandSchema() { return incomeBandSchema; }
-
- public static Schema getInventorySchema() { return inventorySchema; }
-
- public static Schema getItemSchema() { return itemSchema; }
-
- public static Schema getPromotionSchema() { return promotionSchema; }
-
- public static Schema getReasonSchema() { return reasonSchema; }
-
- public static Schema getShipModeSchema() { return shipModeSchema; }
-
- public static Schema getStoreSchema() { return storeSchema; }
-
- public static Schema getStoreReturnsSchema() { return storeReturnsSchema; }
-
- public static Schema getStoreSalesSchema() { return storeSalesSchema; }
-
- public static Schema getTimeDimSchema() { return timeDimSchema; }
-
- public static Schema getWarehouseSchema() { return warehouseSchema; }
-
- public static Schema getWebpageSchema() { return webPageSchema; }
-
- public static Schema getWebReturnsSchema() { return webReturnsSchema; }
-
- public static Schema getWebSalesSchema() { return webSalesSchema; }
-
- public static Schema getWebSiteSchema() { return webSiteSchema; }
-
- private static Schema callCenterSchema =
- Schema.builder()
- .addField("cc_call_center_sk", Schema.FieldType.INT64)
- .addField("cc_call_center_id", Schema.FieldType.STRING)
- .addNullableField("cc_rec_start_date", Schema.FieldType.STRING)
- .addNullableField("cc_rec_end_date", Schema.FieldType.STRING)
- .addNullableField("cc_closed_date_sk", Schema.FieldType.INT64)
- .addNullableField("cc_open_date_sk", Schema.FieldType.INT64)
- .addNullableField("cc_name", Schema.FieldType.STRING)
- .addNullableField("cc_class", Schema.FieldType.STRING)
- .addNullableField("cc_employees", Schema.FieldType.INT64)
- .addNullableField("cc_sq_ft", Schema.FieldType.INT64)
- .addNullableField("cc_hours", Schema.FieldType.STRING)
- .addNullableField("cc_manager", Schema.FieldType.STRING)
- .addNullableField("cc_mkt_id", Schema.FieldType.INT64)
- .addNullableField("cc_mkt_class", Schema.FieldType.STRING)
- .addNullableField("cc_mkt_desc", Schema.FieldType.STRING)
- .addNullableField("cc_market_manager", Schema.FieldType.STRING)
- .addNullableField("cc_division", Schema.FieldType.INT64)
- .addNullableField("cc_division_name", Schema.FieldType.STRING)
- .addNullableField("cc_company", Schema.FieldType.INT64)
- .addNullableField("cc_company_name", Schema.FieldType.STRING)
- .addNullableField("cc_street_number", Schema.FieldType.STRING)
- .addNullableField("cc_street_name", Schema.FieldType.STRING)
- .addNullableField("cc_street_type", Schema.FieldType.STRING)
- .addNullableField("cc_suite_number", Schema.FieldType.STRING)
- .addNullableField("cc_city", Schema.FieldType.STRING)
- .addNullableField("cc_county", Schema.FieldType.STRING)
- .addNullableField("cc_state", Schema.FieldType.STRING)
- .addNullableField("cc_zip", Schema.FieldType.STRING)
- .addNullableField("cc_country", Schema.FieldType.STRING)
- .addNullableField("cc_gmt_offset", Schema.FieldType.DOUBLE)
- .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
- .build();
-
- private static Schema catalogPageSchema =
- Schema.builder()
- .addField("cp_catalog_page_sk", Schema.FieldType.INT64)
- .addField("cp_catalog_page_id", Schema.FieldType.STRING)
- .addNullableField("cp_start_date_sk", Schema.FieldType.INT64)
- .addNullableField("cp_end_date_sk", Schema.FieldType.INT64)
- .addNullableField("cp_department", Schema.FieldType.STRING)
- .addNullableField("cp_catalog_number", Schema.FieldType.INT64)
- .addNullableField("cp_catalog_page_number", Schema.FieldType.INT64)
- .addNullableField("cp_description", Schema.FieldType.STRING)
- .addNullableField("cp_type", Schema.FieldType.STRING)
- .build();
-
- private static Schema catalogReturnsSchema =
- Schema.builder()
- .addNullableField("cr_returned_date_sk", Schema.FieldType.INT64)
- .addNullableField("cr_returned_time_sk", Schema.FieldType.INT64)
- .addField("cr_item_sk", Schema.FieldType.INT64)
- .addNullableField("cr_refunded_customer_sk", Schema.FieldType.INT64)
- .addNullableField("cr_refunded_cdemo_sk", Schema.FieldType.INT64)
- .addNullableField("cr_refunded_hdemo_sk", Schema.FieldType.INT64)
- .addNullableField("cr_refunded_addr_sk", Schema.FieldType.INT64)
- .addNullableField("cr_returning_customer_sk", Schema.FieldType.INT64)
- .addNullableField("cr_returning_cdemo_sk", Schema.FieldType.INT64)
- .addNullableField("cr_returning_hdemo_sk", Schema.FieldType.INT64)
- .addNullableField("cr_returning_addr_sk", Schema.FieldType.INT64)
- .addNullableField("cr_call_center_sk", Schema.FieldType.INT64)
- .addNullableField("cr_catalog_page_sk", Schema.FieldType.INT64)
- .addNullableField("cr_ship_mode_sk", Schema.FieldType.INT64)
- .addNullableField("cr_warehouse_sk", Schema.FieldType.INT64)
- .addNullableField("cr_reason_sk", Schema.FieldType.INT64)
- .addField("cr_order_number", Schema.FieldType.INT64)
- .addNullableField("cr_return_quantity", Schema.FieldType.INT64)
- .addNullableField("cr_return_amount", Schema.FieldType.DOUBLE)
- .addNullableField("cr_return_tax", Schema.FieldType.DOUBLE)
- .addNullableField("cr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
- .addNullableField("cr_fee", Schema.FieldType.DOUBLE)
- .addNullableField("cr_return_ship_cost", Schema.FieldType.DOUBLE)
- .addNullableField("cr_refunded_cash", Schema.FieldType.DOUBLE)
- .addNullableField("cr_reversed_charge", Schema.FieldType.DOUBLE)
- .addNullableField("cr_store_credit", Schema.FieldType.DOUBLE)
- .addNullableField("cr_net_loss", Schema.FieldType.DOUBLE)
- .build();
-
- private static Schema catalogSalesSchema =
- Schema.builder()
- .addNullableField("cs_sold_date_sk", Schema.FieldType.INT64)
- .addNullableField("cs_sold_time_sk", Schema.FieldType.INT64)
- .addNullableField("cs_ship_date_sk", Schema.FieldType.INT64)
- .addNullableField("cs_bill_customer_sk", Schema.FieldType.INT64)
- .addNullableField("cs_bill_cdemo_sk", Schema.FieldType.INT64)
- .addNullableField("cs_bill_hdemo_sk", Schema.FieldType.INT64)
- .addNullableField("cs_bill_addr_sk", Schema.FieldType.INT64)
- .addNullableField("cs_ship_customer_sk", Schema.FieldType.INT64)
- .addNullableField("cs_ship_cdemo_sk", Schema.FieldType.INT64)
- .addNullableField("cs_ship_hdemo_sk", Schema.FieldType.INT64)
- .addNullableField("cs_ship_addr_sk", Schema.FieldType.INT64)
- .addNullableField("cs_call_center_sk", Schema.FieldType.INT64)
- .addNullableField("cs_catalog_page_sk", Schema.FieldType.INT64)
- .addNullableField("cs_ship_mode_sk", Schema.FieldType.INT64)
- .addNullableField("cs_warehouse_sk", Schema.FieldType.INT64)
- .addField("cs_item_sk", Schema.FieldType.INT64)
- .addNullableField("cs_promo_sk", Schema.FieldType.INT64)
- .addField("cs_order_number", Schema.FieldType.INT64)
- .addNullableField("cs_quantity", Schema.FieldType.DOUBLE)
- .addNullableField("cs_wholesale_cost", Schema.FieldType.DOUBLE)
- .addNullableField("cs_list_price", Schema.FieldType.DOUBLE)
- .addNullableField("cs_sales_price", Schema.FieldType.DOUBLE)
- .addNullableField("cs_ext_discount_amt", Schema.FieldType.DOUBLE)
- .addNullableField("cs_ext_sales_price", Schema.FieldType.DOUBLE)
- .addNullableField("cs_ext_wholesale_cost", Schema.FieldType.DOUBLE)
- .addNullableField("cs_ext_list_price", Schema.FieldType.DOUBLE)
- .addNullableField("cs_ext_tax", Schema.FieldType.DOUBLE)
- .addNullableField("cs_coupon_amt", Schema.FieldType.DOUBLE)
- .addNullableField("cs_ext_ship_cost", Schema.FieldType.DOUBLE)
- .addNullableField("cs_net_paid", Schema.FieldType.DOUBLE)
- .addNullableField("cs_net_paid_inc_tax", Schema.FieldType.DOUBLE)
- .addNullableField("cs_net_paid_inc_ship", Schema.FieldType.DOUBLE)
- .addNullableField("cs_net_paid_inc_ship_tax", Schema.FieldType.DOUBLE)
- .addNullableField("cs_net_profit", Schema.FieldType.DOUBLE)
- .build();
-
- private static Schema customerSchema =
- Schema.builder()
- .addField("c_customer_sk", Schema.FieldType.INT64)
- .addField("c_customer_id", Schema.FieldType.STRING)
- .addNullableField("c_current_cdemo_sk", Schema.FieldType.INT64)
- .addNullableField("c_current_hdemo_sk", Schema.FieldType.INT64)
- .addNullableField("c_current_addr_sk", Schema.FieldType.INT64)
- .addNullableField("c_first_shipto_date_sk", Schema.FieldType.INT64)
- .addNullableField("c_first_sales_date_sk", Schema.FieldType.INT64)
- .addNullableField("c_salutation", Schema.FieldType.STRING)
- .addNullableField("c_first_name", Schema.FieldType.STRING)
- .addNullableField("c_last_name", Schema.FieldType.STRING)
- .addNullableField("c_preferred_cust_flag", Schema.FieldType.STRING)
- .addNullableField("c_birth_day", Schema.FieldType.INT64)
- .addNullableField("c_birth_month", Schema.FieldType.INT64)
- .addNullableField("c_birth_year", Schema.FieldType.INT64)
- .addNullableField("c_birth_country", Schema.FieldType.STRING)
- .addNullableField("c_login", Schema.FieldType.STRING)
- .addNullableField("c_email_address", Schema.FieldType.STRING)
- .addNullableField("c_last_review_date_sk", Schema.FieldType.INT64)
- .build();
-
- private static Schema customerAddressSchema =
- Schema.builder()
- .addField("ca_address_sk", Schema.FieldType.INT64)
- .addField("ca_address_id", Schema.FieldType.STRING)
- .addNullableField("ca_street_number", Schema.FieldType.STRING)
- .addNullableField("ca_street_name", Schema.FieldType.STRING)
- .addNullableField("ca_street_type", Schema.FieldType.STRING)
- .addNullableField("ca_suite_number", Schema.FieldType.STRING)
- .addNullableField("ca_city", Schema.FieldType.STRING)
- .addNullableField("ca_county", Schema.FieldType.STRING)
- .addNullableField("ca_state", Schema.FieldType.STRING)
- .addNullableField("ca_zip", Schema.FieldType.STRING)
- .addNullableField("ca_country", Schema.FieldType.STRING)
- .addNullableField("ca_gmt_offset", Schema.FieldType.DOUBLE)
- .addNullableField("ca_location_type", Schema.FieldType.STRING)
- .build();
-
- private static Schema customerDemographicsSchema =
- Schema.builder()
- .addField("cd_demo_sk", Schema.FieldType.INT64)
- .addNullableField("cd_gender", Schema.FieldType.STRING)
- .addNullableField("cd_marital_status", Schema.FieldType.STRING)
- .addNullableField("cd_education_status", Schema.FieldType.STRING)
- .addNullableField("cd_purchase_estimate", Schema.FieldType.INT64)
- .addNullableField("cd_credit_rating", Schema.FieldType.STRING)
- .addNullableField("cd_dep_count", Schema.FieldType.INT64)
- .addNullableField("cd_dep_employed_count", Schema.FieldType.INT64)
- .addNullableField("cd_dep_college_count", Schema.FieldType.INT64)
- .build();
-
- private static Schema dateDimSchema =
- Schema.builder()
- .addField("d_date_sk", Schema.FieldType.INT64)
- .addField("d_date_id", Schema.FieldType.STRING)
- .addNullableField("d_date", Schema.FieldType.STRING)
- .addNullableField("d_month_seq", Schema.FieldType.INT64)
- .addNullableField("d_week_seq", Schema.FieldType.INT64)
- .addNullableField("d_quarter_seq", Schema.FieldType.INT64)
- .addNullableField("d_year", Schema.FieldType.INT64)
- .addNullableField("d_dow", Schema.FieldType.INT64)
- .addNullableField("d_moy", Schema.FieldType.INT64)
- .addNullableField("d_dom", Schema.FieldType.INT64)
- .addNullableField("d_qoy", Schema.FieldType.INT64)
- .addNullableField("d_fy_year", Schema.FieldType.INT64)
- .addNullableField("d_fy_quarter_seq", Schema.FieldType.INT64)
- .addNullableField("d_fy_week_seq", Schema.FieldType.INT64)
- .addNullableField("d_day_name", Schema.FieldType.STRING)
- .addNullableField("d_quarter_name", Schema.FieldType.STRING)
- .addNullableField("d_holiday", Schema.FieldType.STRING)
- .addNullableField("d_weekend", Schema.FieldType.STRING)
- .addNullableField("d_following_holiday", Schema.FieldType.STRING)
- .addNullableField("d_first_dom", Schema.FieldType.INT64)
- .addNullableField("d_last_dom", Schema.FieldType.INT64)
- .addNullableField("d_same_day_ly", Schema.FieldType.INT64)
- .addNullableField("d_same_day_lq", Schema.FieldType.INT64)
- .addNullableField("d_current_day", Schema.FieldType.STRING)
- .addNullableField("d_current_week", Schema.FieldType.STRING)
- .addNullableField("d_current_month", Schema.FieldType.STRING)
- .addNullableField("d_current_quarter", Schema.FieldType.STRING)
- .addNullableField("d_current_year", Schema.FieldType.STRING)
- .build();
-
- private static Schema householdDemographicsSchema =
- Schema.builder()
- .addField("hd_demo_sk", Schema.FieldType.INT64)
- .addNullableField("hd_income_band_sk", Schema.FieldType.INT64)
- .addNullableField("hd_buy_potential", Schema.FieldType.STRING)
- .addNullableField("hd_dep_count", Schema.FieldType.INT64)
- .addNullableField("hd_vehicle_count", Schema.FieldType.INT64)
- .build();
-
- private static Schema incomeBandSchema =
- Schema.builder()
- .addField("ib_income_band_sk", Schema.FieldType.INT64)
- .addNullableField("ib_lower_bound", Schema.FieldType.INT64)
- .addNullableField("ib_upper_bound", Schema.FieldType.INT64)
- .build();
-
- private static Schema inventorySchema =
- Schema.builder()
- .addField("inv_date_sk", Schema.FieldType.INT32)
- .addField("inv_item_sk", Schema.FieldType.INT32)
- .addField("inv_warehouse_sk", Schema.FieldType.INT32)
- .addNullableField("inv_quantity_on_hand", Schema.FieldType.INT32)
- .build();
-
- private static Schema itemSchema =
- Schema.builder()
- .addField("i_item_sk", Schema.FieldType.INT64)
- .addField("i_item_id", Schema.FieldType.STRING)
- .addNullableField("i_rec_start_date", Schema.FieldType.STRING)
- .addNullableField("i_rec_end_date", Schema.FieldType.STRING)
- .addNullableField("i_item_desc", Schema.FieldType.STRING)
- .addNullableField("i_current_price", Schema.FieldType.DOUBLE)
- .addNullableField("i_wholesale_cost", Schema.FieldType.DOUBLE)
- .addNullableField("i_brand_id", Schema.FieldType.INT64)
- .addNullableField("i_brand", Schema.FieldType.STRING)
- .addNullableField("i_class_id", Schema.FieldType.INT64)
- .addNullableField("i_class", Schema.FieldType.STRING)
- .addNullableField("i_category_id", Schema.FieldType.INT64)
- .addNullableField("i_category", Schema.FieldType.STRING)
- .addNullableField("i_manufact_id", Schema.FieldType.INT64)
- .addNullableField("i_manufact", Schema.FieldType.STRING)
- .addNullableField("i_size", Schema.FieldType.STRING)
- .addNullableField("i_formulation", Schema.FieldType.STRING)
- .addNullableField("i_color", Schema.FieldType.STRING)
- .addNullableField("i_units", Schema.FieldType.STRING)
- .addNullableField("i_container", Schema.FieldType.STRING)
- .addNullableField("i_manager_id", Schema.FieldType.INT64)
- .addNullableField("i_product_name", Schema.FieldType.STRING)
- .build();
-
- private static Schema promotionSchema =
- Schema.builder()
- .addField("p_promo_sk", Schema.FieldType.INT64)
- .addField("p_promo_id", Schema.FieldType.STRING)
- .addNullableField("p_start_date_sk", Schema.FieldType.INT64)
- .addNullableField("p_end_date_sk", Schema.FieldType.INT64)
- .addNullableField("p_item_sk", Schema.FieldType.INT64)
- .addNullableField("p_cost", Schema.FieldType.DOUBLE)
- .addNullableField("p_response_target", Schema.FieldType.INT64)
- .addNullableField("p_promo_name", Schema.FieldType.STRING)
- .addNullableField("p_channel_dmail", Schema.FieldType.STRING)
- .addNullableField("p_channel_email", Schema.FieldType.STRING)
- .addNullableField("p_channel_catalog", Schema.FieldType.STRING)
- .addNullableField("p_channel_tv", Schema.FieldType.STRING)
- .addNullableField("p_channel_radio", Schema.FieldType.STRING)
- .addNullableField("p_channel_press", Schema.FieldType.STRING)
- .addNullableField("p_channel_event", Schema.FieldType.STRING)
- .addNullableField("p_channel_demo", Schema.FieldType.STRING)
- .addNullableField("p_channel_details", Schema.FieldType.STRING)
- .addNullableField("p_purpose", Schema.FieldType.STRING)
- .addNullableField("p_discount_active", Schema.FieldType.STRING)
- .build();
-
- private static Schema reasonSchema =
- Schema.builder()
- .addField("r_reason_sk", Schema.FieldType.INT64)
- .addField("r_reason_id", Schema.FieldType.STRING)
- .addNullableField("r_reason_desc", Schema.FieldType.STRING)
- .build();
-
- private static Schema shipModeSchema =
- Schema.builder()
- .addField("sm_ship_mode_sk", Schema.FieldType.INT64)
- .addField("sm_ship_mode_id", Schema.FieldType.STRING)
- .addNullableField("sm_type", Schema.FieldType.STRING)
- .addNullableField("sm_code", Schema.FieldType.STRING)
- .addNullableField("sm_carrier", Schema.FieldType.STRING)
- .addNullableField("sm_contract", Schema.FieldType.STRING)
- .build();
-
- private static Schema storeSchema =
- Schema.builder()
- .addField("s_store_sk", Schema.FieldType.INT64)
- .addField("s_store_id", Schema.FieldType.STRING)
- .addNullableField("s_rec_start_date", Schema.FieldType.STRING)
- .addNullableField("s_rec_end_date", Schema.FieldType.STRING)
- .addNullableField("s_closed_date_sk", Schema.FieldType.INT64)
- .addNullableField("s_store_name", Schema.FieldType.STRING)
- .addNullableField("s_number_employees", Schema.FieldType.INT64)
- .addNullableField("s_floor_space", Schema.FieldType.INT64)
- .addNullableField("s_hours", Schema.FieldType.STRING)
- .addNullableField("S_manager", Schema.FieldType.STRING)
- .addNullableField("S_market_id", Schema.FieldType.INT64)
- .addNullableField("S_geography_class", Schema.FieldType.STRING)
- .addNullableField("S_market_desc", Schema.FieldType.STRING)
- .addNullableField("s_market_manager", Schema.FieldType.STRING)
- .addNullableField("s_division_id", Schema.FieldType.INT64)
- .addNullableField("s_division_name", Schema.FieldType.STRING)
- .addNullableField("s_company_id", Schema.FieldType.INT64)
- .addNullableField("s_company_name", Schema.FieldType.STRING)
- .addNullableField("s_street_number", Schema.FieldType.STRING)
- .addNullableField("s_street_name", Schema.FieldType.STRING)
- .addNullableField("s_street_type", Schema.FieldType.STRING)
- .addNullableField("s_suite_number", Schema.FieldType.STRING)
- .addNullableField("s_city", Schema.FieldType.STRING)
- .addNullableField("s_county", Schema.FieldType.STRING)
- .addNullableField("s_state", Schema.FieldType.STRING)
- .addNullableField("s_zip", Schema.FieldType.STRING)
- .addNullableField("s_country", Schema.FieldType.STRING)
- .addNullableField("s_gmt_offset", Schema.FieldType.DOUBLE)
- .addNullableField("s_tax_percentage", Schema.FieldType.DOUBLE)
- .build();
-
- private static Schema storeReturnsSchema =
- Schema.builder()
- .addNullableField("sr_returned_date_sk", Schema.FieldType.INT64)
- .addNullableField("sr_return_time_sk", Schema.FieldType.INT64)
- .addField("sr_item_sk", Schema.FieldType.INT64)
- .addNullableField("sr_customer_sk", Schema.FieldType.INT64)
- .addNullableField("sr_cdemo_sk", Schema.FieldType.INT64)
- .addNullableField("sr_hdemo_sk", Schema.FieldType.INT64)
- .addNullableField("sr_addr_sk", Schema.FieldType.INT64)
- .addNullableField("sr_store_sk", Schema.FieldType.INT64)
- .addNullableField("sr_reason_sk", Schema.FieldType.INT64)
- .addField("sr_ticket_number", Schema.FieldType.INT64)
- .addNullableField("sr_return_quantity", Schema.FieldType.INT64)
- .addNullableField("sr_return_amt", Schema.FieldType.DOUBLE)
- .addNullableField("sr_return_tax", Schema.FieldType.DOUBLE)
- .addNullableField("sr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
- .addNullableField("sr_fee", Schema.FieldType.DOUBLE)
- .addNullableField("sr_return_ship_cost", Schema.FieldType.DOUBLE)
- .addNullableField("sr_refunded_cash", Schema.FieldType.DOUBLE)
- .addNullableField("sr_reversed_charge", Schema.FieldType.DOUBLE)
- .addNullableField("sr_store_credit", Schema.FieldType.DOUBLE)
- .addNullableField("sr_net_loss", Schema.FieldType.DOUBLE)
- .build();
-
- private static Schema storeSalesSchema =
- Schema.builder()
- .addNullableField("ss_sold_date_sk", Schema.FieldType.INT64)
- .addNullableField("ss_sold_time_sk", Schema.FieldType.INT64)
- .addField("ss_item_sk", Schema.FieldType.INT64)
- .addNullableField("ss_customer_sk", Schema.FieldType.INT64)
- .addNullableField("ss_cdemo_sk", Schema.FieldType.INT64)
- .addNullableField("ss_hdemo_sk", Schema.FieldType.INT64)
- .addNullableField("ss_addr_sk", Schema.FieldType.INT64)
- .addNullableField("ss_store_sk", Schema.FieldType.INT64)
- .addNullableField("ss_promo_sk", Schema.FieldType.INT64)
- .addField("ss_ticket_number", Schema.FieldType.INT64)
- .addNullableField("ss_quantity", Schema.FieldType.INT64)
- .addNullableField("ss_wholesale_cost", Schema.FieldType.DOUBLE)
- .addNullableField("ss_list_price", Schema.FieldType.DOUBLE)
- .addNullableField("ss_sales_price", Schema.FieldType.DOUBLE)
- .addNullableField("ss_ext_discount_amt", Schema.FieldType.DOUBLE)
- .addNullableField("ss_ext_sales_price", Schema.FieldType.DOUBLE)
- .addNullableField("ss_ext_wholesale_cost", Schema.FieldType.DOUBLE)
- .addNullableField("ss_ext_list_price", Schema.FieldType.DOUBLE)
- .addNullableField("ss_ext_tax", Schema.FieldType.DOUBLE)
- .addNullableField("ss_coupon_amt", Schema.FieldType.DOUBLE)
- .addNullableField("ss_net_paid", Schema.FieldType.DOUBLE)
- .addNullableField("ss_net_paid_inc_tax", Schema.FieldType.DOUBLE)
- .addNullableField("ss_net_profit", Schema.FieldType.DOUBLE)
- .build();
-
- private static Schema timeDimSchema =
- Schema.builder()
- .addField("t_time_sk", Schema.FieldType.INT64)
- .addField("t_time_id", Schema.FieldType.STRING)
- .addNullableField("t_time", Schema.FieldType.INT64)
- .addNullableField("t_hour", Schema.FieldType.INT64)
- .addNullableField("t_minute", Schema.FieldType.INT64)
- .addNullableField("t_second", Schema.FieldType.INT64)
- .addNullableField("t_am_pm", Schema.FieldType.STRING)
- .addNullableField("t_shift", Schema.FieldType.STRING)
- .addNullableField("t_sub_shift", Schema.FieldType.STRING)
- .addNullableField("t_meal_time", Schema.FieldType.STRING)
- .build();
-
- private static Schema warehouseSchema =
- Schema.builder()
- .addField("w_warehouse_sk", Schema.FieldType.INT64)
- .addField("w_warehouse_id", Schema.FieldType.STRING)
- .addNullableField("w_warehouse_name", Schema.FieldType.STRING)
- .addNullableField("w_warehouse_sq_ft", Schema.FieldType.INT64)
- .addNullableField("w_street_number", Schema.FieldType.STRING)
- .addNullableField("w_street_name", Schema.FieldType.STRING)
- .addNullableField("w_street_type", Schema.FieldType.STRING)
- .addNullableField("w_suite_number", Schema.FieldType.STRING)
- .addNullableField("w_city", Schema.FieldType.STRING)
- .addNullableField("w_county", Schema.FieldType.STRING)
- .addNullableField("w_state", Schema.FieldType.STRING)
- .addNullableField("w_zip", Schema.FieldType.STRING)
- .addNullableField("w_country", Schema.FieldType.STRING)
- .addNullableField("w_gmt_offset", Schema.FieldType.DOUBLE)
- .build();
-
- private static Schema webPageSchema =
- Schema.builder()
- .addField("wp_web_page_sk", Schema.FieldType.INT64)
- .addField("wp_web_page_id", Schema.FieldType.STRING)
- .addNullableField("wp_rec_start_date", Schema.FieldType.STRING)
- .addNullableField("wp_rec_end_date", Schema.FieldType.STRING)
- .addNullableField("wp_creation_date_sk", Schema.FieldType.INT64)
- .addNullableField("wp_access_date_sk", Schema.FieldType.INT64)
- .addNullableField("wp_autogen_flag", Schema.FieldType.STRING)
- .addNullableField("wp_customer_sk", Schema.FieldType.INT64)
- .addNullableField("wp_url", Schema.FieldType.STRING)
- .addNullableField("wp_type", Schema.FieldType.STRING)
- .addNullableField("wp_char_count", Schema.FieldType.INT64)
- .addNullableField("wp_link_count", Schema.FieldType.INT64)
- .addNullableField("wp_image_count", Schema.FieldType.INT64)
- .addNullableField("wp_max_ad_count", Schema.FieldType.INT64)
- .build();
-
- private static Schema webReturnsSchema =
- Schema.builder()
- .addNullableField("wr_returned_date_sk", Schema.FieldType.INT64)
- .addNullableField("wr_returned_time_sk", Schema.FieldType.INT64)
- .addField("wr_item_sk", Schema.FieldType.INT64)
- .addNullableField("wr_refunded_customer_sk", Schema.FieldType.INT64)
- .addNullableField("wr_refunded_cdemo_sk", Schema.FieldType.INT64)
- .addNullableField("wr_refunded_hdemo_sk", Schema.FieldType.INT64)
- .addNullableField("wr_refunded_addr_sk", Schema.FieldType.INT64)
- .addNullableField("wr_returning_customer_sk", Schema.FieldType.INT64)
- .addNullableField("wr_returning_cdemo_sk", Schema.FieldType.INT64)
- .addNullableField("wr_returning_hdemo_sk", Schema.FieldType.INT64)
- .addNullableField("wr_returning_addr_sk", Schema.FieldType.INT64)
- .addNullableField("wr_web_page_sk", Schema.FieldType.INT64)
- .addNullableField("wr_reason_sk", Schema.FieldType.INT64)
- .addField("wr_order_number", Schema.FieldType.INT64)
- .addNullableField("wr_return_quantity", Schema.FieldType.INT64)
- .addNullableField("wr_return_amt", Schema.FieldType.DOUBLE)
- .addNullableField("wr_return_tax", Schema.FieldType.DOUBLE)
- .addNullableField("wr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
- .addNullableField("wr_fee", Schema.FieldType.DOUBLE)
- .addNullableField("wr_return_ship_cost", Schema.FieldType.DOUBLE)
- .addNullableField("wr_refunded_cash", Schema.FieldType.DOUBLE)
- .addNullableField("wr_reversed_charge", Schema.FieldType.DOUBLE)
- .addNullableField("wr_account_credit", Schema.FieldType.DOUBLE)
- .addNullableField("wr_net_loss", Schema.FieldType.DOUBLE)
- .build();
-
- private static Schema webSalesSchema =
- Schema.builder()
- .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32)
- .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32)
- .addNullableField("ws_ship_date_sk", Schema.FieldType.INT32)
- .addField("ws_item_sk", Schema.FieldType.INT32)
- .addNullableField("ws_bill_customer_sk", Schema.FieldType.INT32)
- .addNullableField("ws_bill_cdemo_sk", Schema.FieldType.INT32)
- .addNullableField("ws_bill_hdemo_sk", Schema.FieldType.INT32)
- .addNullableField("ws_bill_addr_sk", Schema.FieldType.INT32)
- .addNullableField("ws_ship_customer_sk", Schema.FieldType.INT32)
- .addNullableField("ws_ship_cdemo_sk", Schema.FieldType.INT32)
- .addNullableField("ws_ship_hdemo_sk", Schema.FieldType.INT32)
- .addNullableField("ws_ship_addr_sk", Schema.FieldType.INT32)
- .addNullableField("ws_web_page_sk", Schema.FieldType.INT32)
- .addNullableField("ws_web_site_sk", Schema.FieldType.INT32)
- .addNullableField("ws_ship_mode_sk", Schema.FieldType.INT32)
- .addNullableField("ws_warehouse_sk", Schema.FieldType.INT32)
- .addNullableField("ws_promo_sk", Schema.FieldType.INT32)
- .addField("ws_order_number", Schema.FieldType.INT64)
- .addNullableField("ws_quantity", Schema.FieldType.INT32)
- .addNullableField("ws_wholesale_cost", Schema.FieldType.DOUBLE)
- .addNullableField("ws_list_price", Schema.FieldType.DOUBLE)
- .addNullableField("ws_sales_price", Schema.FieldType.DOUBLE)
- .addNullableField("ws_ext_discount_amt", Schema.FieldType.DOUBLE)
- .addNullableField("ws_ext_sales_price", Schema.FieldType.DOUBLE)
- .addNullableField("ws_ext_wholesale_cost", Schema.FieldType.DOUBLE)
- .addNullableField("ws_ext_list_price", Schema.FieldType.DOUBLE)
- .addNullableField("ws_ext_tax", Schema.FieldType.DOUBLE)
- .addNullableField("ws_coupon_amt", Schema.FieldType.DOUBLE)
- .addNullableField("ws_ext_ship_cost", Schema.FieldType.DOUBLE)
- .addNullableField("ws_net_paid", Schema.FieldType.DOUBLE)
- .addNullableField("ws_net_paid_inc_tax", Schema.FieldType.DOUBLE)
- .addNullableField("ws_net_paid_inc_ship", Schema.FieldType.DOUBLE)
- .addNullableField("ws_net_paid_inc_ship_tax", Schema.FieldType.DOUBLE)
- .addNullableField("ws_net_profit", Schema.FieldType.DOUBLE)
- .build();
-
- private static Schema webSiteSchema =
- Schema.builder()
- .addField("web_site_sk", Schema.FieldType.STRING)
- .addField("web_site_id", Schema.FieldType.STRING)
- .addNullableField("web_rec_start_date", Schema.FieldType.STRING)
- .addNullableField("web_rec_end_date", Schema.FieldType.STRING)
- .addNullableField("web_name", Schema.FieldType.STRING)
- .addNullableField("web_open_date_sk", Schema.FieldType.INT32)
- .addNullableField("web_close_date_sk", Schema.FieldType.INT32)
- .addNullableField("web_class", Schema.FieldType.STRING)
- .addNullableField("web_manager", Schema.FieldType.STRING)
- .addNullableField("web_mkt_id", Schema.FieldType.INT32)
- .addNullableField("web_mkt_class", Schema.FieldType.STRING)
- .addNullableField("web_mkt_desc", Schema.FieldType.STRING)
- .addNullableField("web_market_manager", Schema.FieldType.STRING)
- .addNullableField("web_company_id", Schema.FieldType.INT32)
- .addNullableField("web_company_name", Schema.FieldType.STRING)
- .addNullableField("web_street_number", Schema.FieldType.STRING)
- .addNullableField("web_street_name", Schema.FieldType.STRING)
- .addNullableField("web_street_type", Schema.FieldType.STRING)
- .addNullableField("web_suite_number", Schema.FieldType.STRING)
- .addNullableField("web_city", Schema.FieldType.STRING)
- .addNullableField("web_county", Schema.FieldType.STRING)
- .addNullableField("web_state", Schema.FieldType.STRING)
- .addNullableField("web_zip", Schema.FieldType.STRING)
- .addNullableField("web_country", Schema.FieldType.STRING)
- .addNullableField("web_gmt_offset", Schema.FieldType.DOUBLE)
- .addNullableField("web_tax_percentage", Schema.FieldType.DOUBLE)
- .build();
+ Schema tableSchema = schemaBuilder.build();
+ schemaMap.put(tableName, tableSchema);
+ }
+ return schemaMap;
+ }
+
+ /**
+ * Get all tpcds table schemas according to tpcds official specification. Some fields are set to
+ * be not nullable.
+ *
+ * @return A map of all tpcds table schemas with their table names as keys.
+ */
+ public static Map<String, Schema> getTpcdsSchemasImmutableMap() {
+ ImmutableMap<String, Schema> immutableSchemaMap =
+ ImmutableMap.<String, Schema>builder()
+ .put("call_center", callCenterSchema)
+ .put("catalog_page", catalogPageSchema)
+ .put("catalog_returns", catalogReturnsSchema)
+ .put("catalog_sales", catalogSalesSchema)
+ .put("customer", customerSchema)
+ .put("customer_address", customerAddressSchema)
+ .put("customer_demographics", customerDemographicsSchema)
+ .put("date_dim", dateDimSchema)
+ .put("household_demographics", householdDemographicsSchema)
+ .put("income_band", incomeBandSchema)
+ .put("inventory", inventorySchema)
+ .put("item", itemSchema)
+ .put("promotion", promotionSchema)
+ .put("reason", reasonSchema)
+ .put("ship_mode", shipModeSchema)
+ .put("store", storeSchema)
+ .put("store_returns", storeReturnsSchema)
+ .put("store_sales", storeSalesSchema)
+ .put("time_dim", timeDimSchema)
+ .put("warehouse", warehouseSchema)
+ .put("web_page", webPageSchema)
+ .put("web_returns", webReturnsSchema)
+ .put("web_sales", webSalesSchema)
+ .put("web_site", webSiteSchema)
+ .build();
+ return immutableSchemaMap;
+ }
+
+ public static Schema getCallCenterSchema() {
+ return callCenterSchema;
+ }
+
+ public static Schema getCatalogPageSchema() {
+ return catalogPageSchema;
+ }
+
+ public static Schema getCatalogReturnsSchema() {
+ return catalogReturnsSchema;
+ }
+
+ public static Schema getCatalogSalesSchema() {
+ return catalogSalesSchema;
+ }
+
+ public static Schema getCustomerSchema() {
+ return customerSchema;
+ }
+
+ public static Schema getCustomerAddressSchema() {
+ return customerAddressSchema;
+ }
+
+ public static Schema getCustomerDemographicsSchema() {
+ return customerDemographicsSchema;
+ }
+
+ public static Schema getDateDimSchema() {
+ return dateDimSchema;
+ }
+
+ public static Schema getHouseholdDemographicsSchema() {
+ return householdDemographicsSchema;
+ }
+
+ public static Schema getIncomeBandSchema() {
+ return incomeBandSchema;
+ }
+
+ public static Schema getInventorySchema() {
+ return inventorySchema;
+ }
+
+ public static Schema getItemSchema() {
+ return itemSchema;
+ }
+
+ public static Schema getPromotionSchema() {
+ return promotionSchema;
+ }
+
+ public static Schema getReasonSchema() {
+ return reasonSchema;
+ }
+
+ public static Schema getShipModeSchema() {
+ return shipModeSchema;
+ }
+
+ public static Schema getStoreSchema() {
+ return storeSchema;
+ }
+
+ public static Schema getStoreReturnsSchema() {
+ return storeReturnsSchema;
+ }
+
+ public static Schema getStoreSalesSchema() {
+ return storeSalesSchema;
+ }
+
+ public static Schema getTimeDimSchema() {
+ return timeDimSchema;
+ }
+
+ public static Schema getWarehouseSchema() {
+ return warehouseSchema;
+ }
+
+ public static Schema getWebpageSchema() {
+ return webPageSchema;
+ }
+
+ public static Schema getWebReturnsSchema() {
+ return webReturnsSchema;
+ }
+
+ public static Schema getWebSalesSchema() {
+ return webSalesSchema;
+ }
+
+ public static Schema getWebSiteSchema() {
+ return webSiteSchema;
+ }
+
+ private static Schema callCenterSchema =
+ Schema.builder()
+ .addField("cc_call_center_sk", Schema.FieldType.INT64)
+ .addField("cc_call_center_id", Schema.FieldType.STRING)
+ .addNullableField("cc_rec_start_date", Schema.FieldType.STRING)
+ .addNullableField("cc_rec_end_date", Schema.FieldType.STRING)
+ .addNullableField("cc_closed_date_sk", Schema.FieldType.INT64)
+ .addNullableField("cc_open_date_sk", Schema.FieldType.INT64)
+ .addNullableField("cc_name", Schema.FieldType.STRING)
+ .addNullableField("cc_class", Schema.FieldType.STRING)
+ .addNullableField("cc_employees", Schema.FieldType.INT64)
+ .addNullableField("cc_sq_ft", Schema.FieldType.INT64)
+ .addNullableField("cc_hours", Schema.FieldType.STRING)
+ .addNullableField("cc_manager", Schema.FieldType.STRING)
+ .addNullableField("cc_mkt_id", Schema.FieldType.INT64)
+ .addNullableField("cc_mkt_class", Schema.FieldType.STRING)
+ .addNullableField("cc_mkt_desc", Schema.FieldType.STRING)
+ .addNullableField("cc_market_manager", Schema.FieldType.STRING)
+ .addNullableField("cc_division", Schema.FieldType.INT64)
+ .addNullableField("cc_division_name", Schema.FieldType.STRING)
+ .addNullableField("cc_company", Schema.FieldType.INT64)
+ .addNullableField("cc_company_name", Schema.FieldType.STRING)
+ .addNullableField("cc_street_number", Schema.FieldType.STRING)
+ .addNullableField("cc_street_name", Schema.FieldType.STRING)
+ .addNullableField("cc_street_type", Schema.FieldType.STRING)
+ .addNullableField("cc_suite_number", Schema.FieldType.STRING)
+ .addNullableField("cc_city", Schema.FieldType.STRING)
+ .addNullableField("cc_county", Schema.FieldType.STRING)
+ .addNullableField("cc_state", Schema.FieldType.STRING)
+ .addNullableField("cc_zip", Schema.FieldType.STRING)
+ .addNullableField("cc_country", Schema.FieldType.STRING)
+ .addNullableField("cc_gmt_offset", Schema.FieldType.DOUBLE)
+ .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
+ .build();
+
+ private static Schema catalogPageSchema =
+ Schema.builder()
+ .addField("cp_catalog_page_sk", Schema.FieldType.INT64)
+ .addField("cp_catalog_page_id", Schema.FieldType.STRING)
+ .addNullableField("cp_start_date_sk", Schema.FieldType.INT64)
+ .addNullableField("cp_end_date_sk", Schema.FieldType.INT64)
+ .addNullableField("cp_department", Schema.FieldType.STRING)
+ .addNullableField("cp_catalog_number", Schema.FieldType.INT64)
+ .addNullableField("cp_catalog_page_number", Schema.FieldType.INT64)
+ .addNullableField("cp_description", Schema.FieldType.STRING)
+ .addNullableField("cp_type", Schema.FieldType.STRING)
+ .build();
+
+ private static Schema catalogReturnsSchema =
+ Schema.builder()
+ .addNullableField("cr_returned_date_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_returned_time_sk", Schema.FieldType.INT64)
+ .addField("cr_item_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_refunded_customer_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_refunded_cdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_refunded_hdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_refunded_addr_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_returning_customer_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_returning_cdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_returning_hdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_returning_addr_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_call_center_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_catalog_page_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_ship_mode_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_warehouse_sk", Schema.FieldType.INT64)
+ .addNullableField("cr_reason_sk", Schema.FieldType.INT64)
+ .addField("cr_order_number", Schema.FieldType.INT64)
+ .addNullableField("cr_return_quantity", Schema.FieldType.INT64)
+ .addNullableField("cr_return_amount", Schema.FieldType.DOUBLE)
+ .addNullableField("cr_return_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("cr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("cr_fee", Schema.FieldType.DOUBLE)
+ .addNullableField("cr_return_ship_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("cr_refunded_cash", Schema.FieldType.DOUBLE)
+ .addNullableField("cr_reversed_charge", Schema.FieldType.DOUBLE)
+ .addNullableField("cr_store_credit", Schema.FieldType.DOUBLE)
+ .addNullableField("cr_net_loss", Schema.FieldType.DOUBLE)
+ .build();
+
+ private static Schema catalogSalesSchema =
+ Schema.builder()
+ .addNullableField("cs_sold_date_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_sold_time_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_ship_date_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_bill_customer_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_bill_cdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_bill_hdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_bill_addr_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_ship_customer_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_ship_cdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_ship_hdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_ship_addr_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_call_center_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_catalog_page_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_ship_mode_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_warehouse_sk", Schema.FieldType.INT64)
+ .addField("cs_item_sk", Schema.FieldType.INT64)
+ .addNullableField("cs_promo_sk", Schema.FieldType.INT64)
+ .addField("cs_order_number", Schema.FieldType.INT64)
+ .addNullableField("cs_quantity", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_wholesale_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_list_price", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_sales_price", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_ext_discount_amt", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_ext_sales_price", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_ext_wholesale_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_ext_list_price", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_ext_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_coupon_amt", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_ext_ship_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_net_paid", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_net_paid_inc_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_net_paid_inc_ship", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_net_paid_inc_ship_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("cs_net_profit", Schema.FieldType.DOUBLE)
+ .build();
+
+ private static Schema customerSchema =
+ Schema.builder()
+ .addField("c_customer_sk", Schema.FieldType.INT64)
+ .addField("c_customer_id", Schema.FieldType.STRING)
+ .addNullableField("c_current_cdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("c_current_hdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("c_current_addr_sk", Schema.FieldType.INT64)
+ .addNullableField("c_first_shipto_date_sk", Schema.FieldType.INT64)
+ .addNullableField("c_first_sales_date_sk", Schema.FieldType.INT64)
+ .addNullableField("c_salutation", Schema.FieldType.STRING)
+ .addNullableField("c_first_name", Schema.FieldType.STRING)
+ .addNullableField("c_last_name", Schema.FieldType.STRING)
+ .addNullableField("c_preferred_cust_flag", Schema.FieldType.STRING)
+ .addNullableField("c_birth_day", Schema.FieldType.INT64)
+ .addNullableField("c_birth_month", Schema.FieldType.INT64)
+ .addNullableField("c_birth_year", Schema.FieldType.INT64)
+ .addNullableField("c_birth_country", Schema.FieldType.STRING)
+ .addNullableField("c_login", Schema.FieldType.STRING)
+ .addNullableField("c_email_address", Schema.FieldType.STRING)
+ .addNullableField("c_last_review_date_sk", Schema.FieldType.INT64)
+ .build();
+
+ private static Schema customerAddressSchema =
+ Schema.builder()
+ .addField("ca_address_sk", Schema.FieldType.INT64)
+ .addField("ca_address_id", Schema.FieldType.STRING)
+ .addNullableField("ca_street_number", Schema.FieldType.STRING)
+ .addNullableField("ca_street_name", Schema.FieldType.STRING)
+ .addNullableField("ca_street_type", Schema.FieldType.STRING)
+ .addNullableField("ca_suite_number", Schema.FieldType.STRING)
+ .addNullableField("ca_city", Schema.FieldType.STRING)
+ .addNullableField("ca_county", Schema.FieldType.STRING)
+ .addNullableField("ca_state", Schema.FieldType.STRING)
+ .addNullableField("ca_zip", Schema.FieldType.STRING)
+ .addNullableField("ca_country", Schema.FieldType.STRING)
+ .addNullableField("ca_gmt_offset", Schema.FieldType.DOUBLE)
+ .addNullableField("ca_location_type", Schema.FieldType.STRING)
+ .build();
+
+ private static Schema customerDemographicsSchema =
+ Schema.builder()
+ .addField("cd_demo_sk", Schema.FieldType.INT64)
+ .addNullableField("cd_gender", Schema.FieldType.STRING)
+ .addNullableField("cd_marital_status", Schema.FieldType.STRING)
+ .addNullableField("cd_education_status", Schema.FieldType.STRING)
+ .addNullableField("cd_purchase_estimate", Schema.FieldType.INT64)
+ .addNullableField("cd_credit_rating", Schema.FieldType.STRING)
+ .addNullableField("cd_dep_count", Schema.FieldType.INT64)
+ .addNullableField("cd_dep_employed_count", Schema.FieldType.INT64)
+ .addNullableField("cd_dep_college_count", Schema.FieldType.INT64)
+ .build();
+
+ private static Schema dateDimSchema =
+ Schema.builder()
+ .addField("d_date_sk", Schema.FieldType.INT64)
+ .addField("d_date_id", Schema.FieldType.STRING)
+ .addNullableField("d_date", Schema.FieldType.STRING)
+ .addNullableField("d_month_seq", Schema.FieldType.INT64)
+ .addNullableField("d_week_seq", Schema.FieldType.INT64)
+ .addNullableField("d_quarter_seq", Schema.FieldType.INT64)
+ .addNullableField("d_year", Schema.FieldType.INT64)
+ .addNullableField("d_dow", Schema.FieldType.INT64)
+ .addNullableField("d_moy", Schema.FieldType.INT64)
+ .addNullableField("d_dom", Schema.FieldType.INT64)
+ .addNullableField("d_qoy", Schema.FieldType.INT64)
+ .addNullableField("d_fy_year", Schema.FieldType.INT64)
+ .addNullableField("d_fy_quarter_seq", Schema.FieldType.INT64)
+ .addNullableField("d_fy_week_seq", Schema.FieldType.INT64)
+ .addNullableField("d_day_name", Schema.FieldType.STRING)
+ .addNullableField("d_quarter_name", Schema.FieldType.STRING)
+ .addNullableField("d_holiday", Schema.FieldType.STRING)
+ .addNullableField("d_weekend", Schema.FieldType.STRING)
+ .addNullableField("d_following_holiday", Schema.FieldType.STRING)
+ .addNullableField("d_first_dom", Schema.FieldType.INT64)
+ .addNullableField("d_last_dom", Schema.FieldType.INT64)
+ .addNullableField("d_same_day_ly", Schema.FieldType.INT64)
+ .addNullableField("d_same_day_lq", Schema.FieldType.INT64)
+ .addNullableField("d_current_day", Schema.FieldType.STRING)
+ .addNullableField("d_current_week", Schema.FieldType.STRING)
+ .addNullableField("d_current_month", Schema.FieldType.STRING)
+ .addNullableField("d_current_quarter", Schema.FieldType.STRING)
+ .addNullableField("d_current_year", Schema.FieldType.STRING)
+ .build();
+
+ private static Schema householdDemographicsSchema =
+ Schema.builder()
+ .addField("hd_demo_sk", Schema.FieldType.INT64)
+ .addNullableField("hd_income_band_sk", Schema.FieldType.INT64)
+ .addNullableField("hd_buy_potential", Schema.FieldType.STRING)
+ .addNullableField("hd_dep_count", Schema.FieldType.INT64)
+ .addNullableField("hd_vehicle_count", Schema.FieldType.INT64)
+ .build();
+
+ private static Schema incomeBandSchema =
+ Schema.builder()
+ .addField("ib_income_band_sk", Schema.FieldType.INT64)
+ .addNullableField("ib_lower_bound", Schema.FieldType.INT64)
+ .addNullableField("ib_upper_bound", Schema.FieldType.INT64)
+ .build();
+
+ private static Schema inventorySchema =
+ Schema.builder()
+ .addField("inv_date_sk", Schema.FieldType.INT32)
+ .addField("inv_item_sk", Schema.FieldType.INT32)
+ .addField("inv_warehouse_sk", Schema.FieldType.INT32)
+ .addNullableField("inv_quantity_on_hand", Schema.FieldType.INT32)
+ .build();
+
+ private static Schema itemSchema =
+ Schema.builder()
+ .addField("i_item_sk", Schema.FieldType.INT64)
+ .addField("i_item_id", Schema.FieldType.STRING)
+ .addNullableField("i_rec_start_date", Schema.FieldType.STRING)
+ .addNullableField("i_rec_end_date", Schema.FieldType.STRING)
+ .addNullableField("i_item_desc", Schema.FieldType.STRING)
+ .addNullableField("i_current_price", Schema.FieldType.DOUBLE)
+ .addNullableField("i_wholesale_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("i_brand_id", Schema.FieldType.INT64)
+ .addNullableField("i_brand", Schema.FieldType.STRING)
+ .addNullableField("i_class_id", Schema.FieldType.INT64)
+ .addNullableField("i_class", Schema.FieldType.STRING)
+ .addNullableField("i_category_id", Schema.FieldType.INT64)
+ .addNullableField("i_category", Schema.FieldType.STRING)
+ .addNullableField("i_manufact_id", Schema.FieldType.INT64)
+ .addNullableField("i_manufact", Schema.FieldType.STRING)
+ .addNullableField("i_size", Schema.FieldType.STRING)
+ .addNullableField("i_formulation", Schema.FieldType.STRING)
+ .addNullableField("i_color", Schema.FieldType.STRING)
+ .addNullableField("i_units", Schema.FieldType.STRING)
+ .addNullableField("i_container", Schema.FieldType.STRING)
+ .addNullableField("i_manager_id", Schema.FieldType.INT64)
+ .addNullableField("i_product_name", Schema.FieldType.STRING)
+ .build();
+
+ private static Schema promotionSchema =
+ Schema.builder()
+ .addField("p_promo_sk", Schema.FieldType.INT64)
+ .addField("p_promo_id", Schema.FieldType.STRING)
+ .addNullableField("p_start_date_sk", Schema.FieldType.INT64)
+ .addNullableField("p_end_date_sk", Schema.FieldType.INT64)
+ .addNullableField("p_item_sk", Schema.FieldType.INT64)
+ .addNullableField("p_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("p_response_target", Schema.FieldType.INT64)
+ .addNullableField("p_promo_name", Schema.FieldType.STRING)
+ .addNullableField("p_channel_dmail", Schema.FieldType.STRING)
+ .addNullableField("p_channel_email", Schema.FieldType.STRING)
+ .addNullableField("p_channel_catalog", Schema.FieldType.STRING)
+ .addNullableField("p_channel_tv", Schema.FieldType.STRING)
+ .addNullableField("p_channel_radio", Schema.FieldType.STRING)
+ .addNullableField("p_channel_press", Schema.FieldType.STRING)
+ .addNullableField("p_channel_event", Schema.FieldType.STRING)
+ .addNullableField("p_channel_demo", Schema.FieldType.STRING)
+ .addNullableField("p_channel_details", Schema.FieldType.STRING)
+ .addNullableField("p_purpose", Schema.FieldType.STRING)
+ .addNullableField("p_discount_active", Schema.FieldType.STRING)
+ .build();
+
+ private static Schema reasonSchema =
+ Schema.builder()
+ .addField("r_reason_sk", Schema.FieldType.INT64)
+ .addField("r_reason_id", Schema.FieldType.STRING)
+ .addNullableField("r_reason_desc", Schema.FieldType.STRING)
+ .build();
+
+ private static Schema shipModeSchema =
+ Schema.builder()
+ .addField("sm_ship_mode_sk", Schema.FieldType.INT64)
+ .addField("sm_ship_mode_id", Schema.FieldType.STRING)
+ .addNullableField("sm_type", Schema.FieldType.STRING)
+ .addNullableField("sm_code", Schema.FieldType.STRING)
+ .addNullableField("sm_carrier", Schema.FieldType.STRING)
+ .addNullableField("sm_contract", Schema.FieldType.STRING)
+ .build();
+
+ private static Schema storeSchema =
+ Schema.builder()
+ .addField("s_store_sk", Schema.FieldType.INT64)
+ .addField("s_store_id", Schema.FieldType.STRING)
+ .addNullableField("s_rec_start_date", Schema.FieldType.STRING)
+ .addNullableField("s_rec_end_date", Schema.FieldType.STRING)
+ .addNullableField("s_closed_date_sk", Schema.FieldType.INT64)
+ .addNullableField("s_store_name", Schema.FieldType.STRING)
+ .addNullableField("s_number_employees", Schema.FieldType.INT64)
+ .addNullableField("s_floor_space", Schema.FieldType.INT64)
+ .addNullableField("s_hours", Schema.FieldType.STRING)
+ .addNullableField("S_manager", Schema.FieldType.STRING)
+ .addNullableField("S_market_id", Schema.FieldType.INT64)
+ .addNullableField("S_geography_class", Schema.FieldType.STRING)
+ .addNullableField("S_market_desc", Schema.FieldType.STRING)
+ .addNullableField("s_market_manager", Schema.FieldType.STRING)
+ .addNullableField("s_division_id", Schema.FieldType.INT64)
+ .addNullableField("s_division_name", Schema.FieldType.STRING)
+ .addNullableField("s_company_id", Schema.FieldType.INT64)
+ .addNullableField("s_company_name", Schema.FieldType.STRING)
+ .addNullableField("s_street_number", Schema.FieldType.STRING)
+ .addNullableField("s_street_name", Schema.FieldType.STRING)
+ .addNullableField("s_street_type", Schema.FieldType.STRING)
+ .addNullableField("s_suite_number", Schema.FieldType.STRING)
+ .addNullableField("s_city", Schema.FieldType.STRING)
+ .addNullableField("s_county", Schema.FieldType.STRING)
+ .addNullableField("s_state", Schema.FieldType.STRING)
+ .addNullableField("s_zip", Schema.FieldType.STRING)
+ .addNullableField("s_country", Schema.FieldType.STRING)
+ .addNullableField("s_gmt_offset", Schema.FieldType.DOUBLE)
+ .addNullableField("s_tax_percentage", Schema.FieldType.DOUBLE)
+ .build();
+
+ private static Schema storeReturnsSchema =
+ Schema.builder()
+ .addNullableField("sr_returned_date_sk", Schema.FieldType.INT64)
+ .addNullableField("sr_return_time_sk", Schema.FieldType.INT64)
+ .addField("sr_item_sk", Schema.FieldType.INT64)
+ .addNullableField("sr_customer_sk", Schema.FieldType.INT64)
+ .addNullableField("sr_cdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("sr_hdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("sr_addr_sk", Schema.FieldType.INT64)
+ .addNullableField("sr_store_sk", Schema.FieldType.INT64)
+ .addNullableField("sr_reason_sk", Schema.FieldType.INT64)
+ .addField("sr_ticket_number", Schema.FieldType.INT64)
+ .addNullableField("sr_return_quantity", Schema.FieldType.INT64)
+ .addNullableField("sr_return_amt", Schema.FieldType.DOUBLE)
+ .addNullableField("sr_return_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("sr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("sr_fee", Schema.FieldType.DOUBLE)
+ .addNullableField("sr_return_ship_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("sr_refunded_cash", Schema.FieldType.DOUBLE)
+ .addNullableField("sr_reversed_charge", Schema.FieldType.DOUBLE)
+ .addNullableField("sr_store_credit", Schema.FieldType.DOUBLE)
+ .addNullableField("sr_net_loss", Schema.FieldType.DOUBLE)
+ .build();
+
+ private static Schema storeSalesSchema =
+ Schema.builder()
+ .addNullableField("ss_sold_date_sk", Schema.FieldType.INT64)
+ .addNullableField("ss_sold_time_sk", Schema.FieldType.INT64)
+ .addField("ss_item_sk", Schema.FieldType.INT64)
+ .addNullableField("ss_customer_sk", Schema.FieldType.INT64)
+ .addNullableField("ss_cdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("ss_hdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("ss_addr_sk", Schema.FieldType.INT64)
+ .addNullableField("ss_store_sk", Schema.FieldType.INT64)
+ .addNullableField("ss_promo_sk", Schema.FieldType.INT64)
+ .addField("ss_ticket_number", Schema.FieldType.INT64)
+ .addNullableField("ss_quantity", Schema.FieldType.INT64)
+ .addNullableField("ss_wholesale_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("ss_list_price", Schema.FieldType.DOUBLE)
+ .addNullableField("ss_sales_price", Schema.FieldType.DOUBLE)
+ .addNullableField("ss_ext_discount_amt", Schema.FieldType.DOUBLE)
+ .addNullableField("ss_ext_sales_price", Schema.FieldType.DOUBLE)
+ .addNullableField("ss_ext_wholesale_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("ss_ext_list_price", Schema.FieldType.DOUBLE)
+ .addNullableField("ss_ext_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("ss_coupon_amt", Schema.FieldType.DOUBLE)
+ .addNullableField("ss_net_paid", Schema.FieldType.DOUBLE)
+ .addNullableField("ss_net_paid_inc_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("ss_net_profit", Schema.FieldType.DOUBLE)
+ .build();
+
+ private static Schema timeDimSchema =
+ Schema.builder()
+ .addField("t_time_sk", Schema.FieldType.INT64)
+ .addField("t_time_id", Schema.FieldType.STRING)
+ .addNullableField("t_time", Schema.FieldType.INT64)
+ .addNullableField("t_hour", Schema.FieldType.INT64)
+ .addNullableField("t_minute", Schema.FieldType.INT64)
+ .addNullableField("t_second", Schema.FieldType.INT64)
+ .addNullableField("t_am_pm", Schema.FieldType.STRING)
+ .addNullableField("t_shift", Schema.FieldType.STRING)
+ .addNullableField("t_sub_shift", Schema.FieldType.STRING)
+ .addNullableField("t_meal_time", Schema.FieldType.STRING)
+ .build();
+
+ private static Schema warehouseSchema =
+ Schema.builder()
+ .addField("w_warehouse_sk", Schema.FieldType.INT64)
+ .addField("w_warehouse_id", Schema.FieldType.STRING)
+ .addNullableField("w_warehouse_name", Schema.FieldType.STRING)
+ .addNullableField("w_warehouse_sq_ft", Schema.FieldType.INT64)
+ .addNullableField("w_street_number", Schema.FieldType.STRING)
+ .addNullableField("w_street_name", Schema.FieldType.STRING)
+ .addNullableField("w_street_type", Schema.FieldType.STRING)
+ .addNullableField("w_suite_number", Schema.FieldType.STRING)
+ .addNullableField("w_city", Schema.FieldType.STRING)
+ .addNullableField("w_county", Schema.FieldType.STRING)
+ .addNullableField("w_state", Schema.FieldType.STRING)
+ .addNullableField("w_zip", Schema.FieldType.STRING)
+ .addNullableField("w_country", Schema.FieldType.STRING)
+ .addNullableField("w_gmt_offset", Schema.FieldType.DOUBLE)
+ .build();
+
+ private static Schema webPageSchema =
+ Schema.builder()
+ .addField("wp_web_page_sk", Schema.FieldType.INT64)
+ .addField("wp_web_page_id", Schema.FieldType.STRING)
+ .addNullableField("wp_rec_start_date", Schema.FieldType.STRING)
+ .addNullableField("wp_rec_end_date", Schema.FieldType.STRING)
+ .addNullableField("wp_creation_date_sk", Schema.FieldType.INT64)
+ .addNullableField("wp_access_date_sk", Schema.FieldType.INT64)
+ .addNullableField("wp_autogen_flag", Schema.FieldType.STRING)
+ .addNullableField("wp_customer_sk", Schema.FieldType.INT64)
+ .addNullableField("wp_url", Schema.FieldType.STRING)
+ .addNullableField("wp_type", Schema.FieldType.STRING)
+ .addNullableField("wp_char_count", Schema.FieldType.INT64)
+ .addNullableField("wp_link_count", Schema.FieldType.INT64)
+ .addNullableField("wp_image_count", Schema.FieldType.INT64)
+ .addNullableField("wp_max_ad_count", Schema.FieldType.INT64)
+ .build();
+
+ private static Schema webReturnsSchema =
+ Schema.builder()
+ .addNullableField("wr_returned_date_sk", Schema.FieldType.INT64)
+ .addNullableField("wr_returned_time_sk", Schema.FieldType.INT64)
+ .addField("wr_item_sk", Schema.FieldType.INT64)
+ .addNullableField("wr_refunded_customer_sk", Schema.FieldType.INT64)
+ .addNullableField("wr_refunded_cdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("wr_refunded_hdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("wr_refunded_addr_sk", Schema.FieldType.INT64)
+ .addNullableField("wr_returning_customer_sk", Schema.FieldType.INT64)
+ .addNullableField("wr_returning_cdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("wr_returning_hdemo_sk", Schema.FieldType.INT64)
+ .addNullableField("wr_returning_addr_sk", Schema.FieldType.INT64)
+ .addNullableField("wr_web_page_sk", Schema.FieldType.INT64)
+ .addNullableField("wr_reason_sk", Schema.FieldType.INT64)
+ .addField("wr_order_number", Schema.FieldType.INT64)
+ .addNullableField("wr_return_quantity", Schema.FieldType.INT64)
+ .addNullableField("wr_return_amt", Schema.FieldType.DOUBLE)
+ .addNullableField("wr_return_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("wr_return_amt_inc_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("wr_fee", Schema.FieldType.DOUBLE)
+ .addNullableField("wr_return_ship_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("wr_refunded_cash", Schema.FieldType.DOUBLE)
+ .addNullableField("wr_reversed_charge", Schema.FieldType.DOUBLE)
+ .addNullableField("wr_account_credit", Schema.FieldType.DOUBLE)
+ .addNullableField("wr_net_loss", Schema.FieldType.DOUBLE)
+ .build();
+
+ private static Schema webSalesSchema =
+ Schema.builder()
+ .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_ship_date_sk", Schema.FieldType.INT32)
+ .addField("ws_item_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_bill_customer_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_bill_cdemo_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_bill_hdemo_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_bill_addr_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_ship_customer_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_ship_cdemo_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_ship_hdemo_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_ship_addr_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_web_page_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_web_site_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_ship_mode_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_warehouse_sk", Schema.FieldType.INT32)
+ .addNullableField("ws_promo_sk", Schema.FieldType.INT32)
+ .addField("ws_order_number", Schema.FieldType.INT64)
+ .addNullableField("ws_quantity", Schema.FieldType.INT32)
+ .addNullableField("ws_wholesale_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_list_price", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_sales_price", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_ext_discount_amt", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_ext_sales_price", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_ext_wholesale_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_ext_list_price", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_ext_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_coupon_amt", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_ext_ship_cost", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_net_paid", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_net_paid_inc_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_net_paid_inc_ship", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_net_paid_inc_ship_tax", Schema.FieldType.DOUBLE)
+ .addNullableField("ws_net_profit", Schema.FieldType.DOUBLE)
+ .build();
+
+ private static Schema webSiteSchema =
+ Schema.builder()
+ .addField("web_site_sk", Schema.FieldType.STRING)
+ .addField("web_site_id", Schema.FieldType.STRING)
+ .addNullableField("web_rec_start_date", Schema.FieldType.STRING)
+ .addNullableField("web_rec_end_date", Schema.FieldType.STRING)
+ .addNullableField("web_name", Schema.FieldType.STRING)
+ .addNullableField("web_open_date_sk", Schema.FieldType.INT32)
+ .addNullableField("web_close_date_sk", Schema.FieldType.INT32)
+ .addNullableField("web_class", Schema.FieldType.STRING)
+ .addNullableField("web_manager", Schema.FieldType.STRING)
+ .addNullableField("web_mkt_id", Schema.FieldType.INT32)
+ .addNullableField("web_mkt_class", Schema.FieldType.STRING)
+ .addNullableField("web_mkt_desc", Schema.FieldType.STRING)
+ .addNullableField("web_market_manager", Schema.FieldType.STRING)
+ .addNullableField("web_company_id", Schema.FieldType.INT32)
+ .addNullableField("web_company_name", Schema.FieldType.STRING)
+ .addNullableField("web_street_number", Schema.FieldType.STRING)
+ .addNullableField("web_street_name", Schema.FieldType.STRING)
+ .addNullableField("web_street_type", Schema.FieldType.STRING)
+ .addNullableField("web_suite_number", Schema.FieldType.STRING)
+ .addNullableField("web_city", Schema.FieldType.STRING)
+ .addNullableField("web_county", Schema.FieldType.STRING)
+ .addNullableField("web_state", Schema.FieldType.STRING)
+ .addNullableField("web_zip", Schema.FieldType.STRING)
+ .addNullableField("web_country", Schema.FieldType.STRING)
+ .addNullableField("web_gmt_offset", Schema.FieldType.DOUBLE)
+ .addNullableField("web_tax_percentage", Schema.FieldType.DOUBLE)
+ .build();
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/package-info.java
similarity index 59%
copy from sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
copy to sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/package-info.java
index d1ddc9d..8c2f339 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptionsRegistrar.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/package-info.java
@@ -15,19 +15,5 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+/** TPC-DS test suite. */
package org.apache.beam.sdk.tpcds;
-
-import com.google.auto.service.AutoService;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
-
-/** {@link AutoService} registrar for {@link TpcdsOptions}. */
-@AutoService(PipelineOptionsRegistrar.class)
-public class TpcdsOptionsRegistrar implements PipelineOptionsRegistrar{
-
- @Override
- public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.of(TpcdsOptions.class);
- }
-}
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/QueryReaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/QueryReaderTest.java
index 5696410..42f7d5b 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/QueryReaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/QueryReaderTest.java
@@ -18,188 +18,193 @@
package org.apache.beam.sdk.tpcds;
import static org.junit.Assert.assertEquals;
+
import org.junit.Test;
public class QueryReaderTest {
- private final String headers = "-- Licensed to the Apache Software Foundation (ASF) under one\n" +
- "-- or more contributor license agreements. See the NOTICE file\n" +
- "-- distributed with this work for additional information\n" +
- "-- regarding copyright ownership. The ASF licenses this file\n" +
- "-- to you under the Apache License, Version 2.0 (the\n" +
- "-- \"License\"); you may not use this file except in compliance\n" +
- "-- with the License. You may obtain a copy of the License at\n" +
- "--\n" +
- "-- http://www.apache.org/licenses/LICENSE-2.0\n" +
- "--\n" +
- "-- Unless required by applicable law or agreed to in writing, software\n" +
- "-- distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
- "-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
- "-- See the License for the specific language governing permissions and\n" +
- "-- limitations under the License.\n";
+ private final String headers =
+ "-- Licensed to the Apache Software Foundation (ASF) under one\n"
+ + "-- or more contributor license agreements. See the NOTICE file\n"
+ + "-- distributed with this work for additional information\n"
+ + "-- regarding copyright ownership. The ASF licenses this file\n"
+ + "-- to you under the Apache License, Version 2.0 (the\n"
+ + "-- \"License\"); you may not use this file except in compliance\n"
+ + "-- with the License. You may obtain a copy of the License at\n"
+ + "--\n"
+ + "-- http://www.apache.org/licenses/LICENSE-2.0\n"
+ + "--\n"
+ + "-- Unless required by applicable law or agreed to in writing, software\n"
+ + "-- distributed under the License is distributed on an \"AS IS\" BASIS,\n"
+ + "-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n"
+ + "-- See the License for the specific language governing permissions and\n"
+ + "-- limitations under the License.\n";
- @Test
- public void testQuery3String() throws Exception {
- String query3String = QueryReader.readQuery("query3");
- String expected = "select dt.d_year \n" +
- " ,item.i_brand_id brand_id \n" +
- " ,item.i_brand brand\n" +
- " ,sum(ss_ext_sales_price) sum_agg\n" +
- " from date_dim dt \n" +
- " ,store_sales\n" +
- " ,item\n" +
- " where dt.d_date_sk = store_sales.ss_sold_date_sk\n" +
- " and store_sales.ss_item_sk = item.i_item_sk\n" +
- " and item.i_manufact_id = 436\n" +
- " and dt.d_moy=12\n" +
- " group by dt.d_year\n" +
- " ,item.i_brand\n" +
- " ,item.i_brand_id\n" +
- " order by dt.d_year\n" +
- " ,sum_agg desc\n" +
- " ,brand_id\n" +
- " limit 100";
- String query3StringNoSpaces = query3String.replaceAll("\\s+", "");
- String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
- assertEquals(expectedNoSpaces, query3StringNoSpaces);
- }
+ @Test
+ public void testQuery3String() throws Exception {
+ String query3String = QueryReader.readQuery("query3");
+ String expected =
+ "select dt.d_year \n"
+ + " ,item.i_brand_id brand_id \n"
+ + " ,item.i_brand brand\n"
+ + " ,sum(ss_ext_sales_price) sum_agg\n"
+ + " from date_dim dt \n"
+ + " ,store_sales\n"
+ + " ,item\n"
+ + " where dt.d_date_sk = store_sales.ss_sold_date_sk\n"
+ + " and store_sales.ss_item_sk = item.i_item_sk\n"
+ + " and item.i_manufact_id = 436\n"
+ + " and dt.d_moy=12\n"
+ + " group by dt.d_year\n"
+ + " ,item.i_brand\n"
+ + " ,item.i_brand_id\n"
+ + " order by dt.d_year\n"
+ + " ,sum_agg desc\n"
+ + " ,brand_id\n"
+ + " limit 100";
+ String query3StringNoSpaces = query3String.replaceAll("\\s+", "");
+ String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
+ assertEquals(expectedNoSpaces, query3StringNoSpaces);
+ }
- @Test
- public void testQuery4String() throws Exception {
- String query4String = QueryReader.readQuery("query4");
- String expected = "with year_total as (\n" +
- " select c_customer_id customer_id\n" +
- " ,c_first_name customer_first_name\n" +
- " ,c_last_name customer_last_name\n" +
- " ,c_preferred_cust_flag customer_preferred_cust_flag\n" +
- " ,c_birth_country customer_birth_country\n" +
- " ,c_login customer_login\n" +
- " ,c_email_address customer_email_address\n" +
- " ,d_year dyear\n" +
- " ,sum(((ss_ext_list_price-ss_ext_wholesale_cost-ss_ext_discount_amt)+ss_ext_sales_price)/2) year_total\n" +
- " ,'s' sale_type\n" +
- " from customer\n" +
- " ,store_sales\n" +
- " ,date_dim\n" +
- " where c_customer_sk = ss_customer_sk\n" +
- " and ss_sold_date_sk = d_date_sk\n" +
- " group by c_customer_id\n" +
- " ,c_first_name\n" +
- " ,c_last_name\n" +
- " ,c_preferred_cust_flag\n" +
- " ,c_birth_country\n" +
- " ,c_login\n" +
- " ,c_email_address\n" +
- " ,d_year\n" +
- " union all\n" +
- " select c_customer_id customer_id\n" +
- " ,c_first_name customer_first_name\n" +
- " ,c_last_name customer_last_name\n" +
- " ,c_preferred_cust_flag customer_preferred_cust_flag\n" +
- " ,c_birth_country customer_birth_country\n" +
- " ,c_login customer_login\n" +
- " ,c_email_address customer_email_address\n" +
- " ,d_year dyear\n" +
- " ,sum((((cs_ext_list_price-cs_ext_wholesale_cost-cs_ext_discount_amt)+cs_ext_sales_price)/2) ) year_total\n" +
- " ,'c' sale_type\n" +
- " from customer\n" +
- " ,catalog_sales\n" +
- " ,date_dim\n" +
- " where c_customer_sk = cs_bill_customer_sk\n" +
- " and cs_sold_date_sk = d_date_sk\n" +
- " group by c_customer_id\n" +
- " ,c_first_name\n" +
- " ,c_last_name\n" +
- " ,c_preferred_cust_flag\n" +
- " ,c_birth_country\n" +
- " ,c_login\n" +
- " ,c_email_address\n" +
- " ,d_year\n" +
- "union all\n" +
- " select c_customer_id customer_id\n" +
- " ,c_first_name customer_first_name\n" +
- " ,c_last_name customer_last_name\n" +
- " ,c_preferred_cust_flag customer_preferred_cust_flag\n" +
- " ,c_birth_country customer_birth_country\n" +
- " ,c_login customer_login\n" +
- " ,c_email_address customer_email_address\n" +
- " ,d_year dyear\n" +
- " ,sum((((ws_ext_list_price-ws_ext_wholesale_cost-ws_ext_discount_amt)+ws_ext_sales_price)/2) ) year_total\n" +
- " ,'w' sale_type\n" +
- " from customer\n" +
- " ,web_sales\n" +
- " ,date_dim\n" +
- " where c_customer_sk = ws_bill_customer_sk\n" +
- " and ws_sold_date_sk = d_date_sk\n" +
- " group by c_customer_id\n" +
- " ,c_first_name\n" +
- " ,c_last_name\n" +
- " ,c_preferred_cust_flag\n" +
- " ,c_birth_country\n" +
- " ,c_login\n" +
- " ,c_email_address\n" +
- " ,d_year\n" +
- " )\n" +
- " select \n" +
- " t_s_secyear.customer_id\n" +
- " ,t_s_secyear.customer_first_name\n" +
- " ,t_s_secyear.customer_last_name\n" +
- " ,t_s_secyear.customer_email_address\n" +
- " from year_total t_s_firstyear\n" +
- " ,year_total t_s_secyear\n" +
- " ,year_total t_c_firstyear\n" +
- " ,year_total t_c_secyear\n" +
- " ,year_total t_w_firstyear\n" +
- " ,year_total t_w_secyear\n" +
- " where t_s_secyear.customer_id = t_s_firstyear.customer_id\n" +
- " and t_s_firstyear.customer_id = t_c_secyear.customer_id\n" +
- " and t_s_firstyear.customer_id = t_c_firstyear.customer_id\n" +
- " and t_s_firstyear.customer_id = t_w_firstyear.customer_id\n" +
- " and t_s_firstyear.customer_id = t_w_secyear.customer_id\n" +
- " and t_s_firstyear.sale_type = 's'\n" +
- " and t_c_firstyear.sale_type = 'c'\n" +
- " and t_w_firstyear.sale_type = 'w'\n" +
- " and t_s_secyear.sale_type = 's'\n" +
- " and t_c_secyear.sale_type = 'c'\n" +
- " and t_w_secyear.sale_type = 'w'\n" +
- " and t_s_firstyear.dyear = 2001\n" +
- " and t_s_secyear.dyear = 2001+1\n" +
- " and t_c_firstyear.dyear = 2001\n" +
- " and t_c_secyear.dyear = 2001+1\n" +
- " and t_w_firstyear.dyear = 2001\n" +
- " and t_w_secyear.dyear = 2001+1\n" +
- " and t_s_firstyear.year_total > 0\n" +
- " and t_c_firstyear.year_total > 0\n" +
- " and t_w_firstyear.year_total > 0\n" +
- " and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end\n" +
- " > case when t_s_firstyear.year_total > 0 then t_s_secyear.year_total / t_s_firstyear.year_total else null end\n" +
- " and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end\n" +
- " > case when t_w_firstyear.year_total > 0 then t_w_secyear.year_total / t_w_firstyear.year_total else null end\n" +
- " order by t_s_secyear.customer_id\n" +
- " ,t_s_secyear.customer_first_name\n" +
- " ,t_s_secyear.customer_last_name\n" +
- " ,t_s_secyear.customer_email_address\n" +
- "limit 100";
- String query4StringNoSpaces = query4String.replaceAll("\\s+", "");
- String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
- assertEquals(expectedNoSpaces, query4StringNoSpaces);
- }
+ @Test
+ public void testQuery4String() throws Exception {
+ String query4String = QueryReader.readQuery("query4");
+ String expected =
+ "with year_total as (\n"
+ + " select c_customer_id customer_id\n"
+ + " ,c_first_name customer_first_name\n"
+ + " ,c_last_name customer_last_name\n"
+ + " ,c_preferred_cust_flag customer_preferred_cust_flag\n"
+ + " ,c_birth_country customer_birth_country\n"
+ + " ,c_login customer_login\n"
+ + " ,c_email_address customer_email_address\n"
+ + " ,d_year dyear\n"
+ + " ,sum(((ss_ext_list_price-ss_ext_wholesale_cost-ss_ext_discount_amt)+ss_ext_sales_price)/2) year_total\n"
+ + " ,'s' sale_type\n"
+ + " from customer\n"
+ + " ,store_sales\n"
+ + " ,date_dim\n"
+ + " where c_customer_sk = ss_customer_sk\n"
+ + " and ss_sold_date_sk = d_date_sk\n"
+ + " group by c_customer_id\n"
+ + " ,c_first_name\n"
+ + " ,c_last_name\n"
+ + " ,c_preferred_cust_flag\n"
+ + " ,c_birth_country\n"
+ + " ,c_login\n"
+ + " ,c_email_address\n"
+ + " ,d_year\n"
+ + " union all\n"
+ + " select c_customer_id customer_id\n"
+ + " ,c_first_name customer_first_name\n"
+ + " ,c_last_name customer_last_name\n"
+ + " ,c_preferred_cust_flag customer_preferred_cust_flag\n"
+ + " ,c_birth_country customer_birth_country\n"
+ + " ,c_login customer_login\n"
+ + " ,c_email_address customer_email_address\n"
+ + " ,d_year dyear\n"
+ + " ,sum((((cs_ext_list_price-cs_ext_wholesale_cost-cs_ext_discount_amt)+cs_ext_sales_price)/2) ) year_total\n"
+ + " ,'c' sale_type\n"
+ + " from customer\n"
+ + " ,catalog_sales\n"
+ + " ,date_dim\n"
+ + " where c_customer_sk = cs_bill_customer_sk\n"
+ + " and cs_sold_date_sk = d_date_sk\n"
+ + " group by c_customer_id\n"
+ + " ,c_first_name\n"
+ + " ,c_last_name\n"
+ + " ,c_preferred_cust_flag\n"
+ + " ,c_birth_country\n"
+ + " ,c_login\n"
+ + " ,c_email_address\n"
+ + " ,d_year\n"
+ + "union all\n"
+ + " select c_customer_id customer_id\n"
+ + " ,c_first_name customer_first_name\n"
+ + " ,c_last_name customer_last_name\n"
+ + " ,c_preferred_cust_flag customer_preferred_cust_flag\n"
+ + " ,c_birth_country customer_birth_country\n"
+ + " ,c_login customer_login\n"
+ + " ,c_email_address customer_email_address\n"
+ + " ,d_year dyear\n"
+ + " ,sum((((ws_ext_list_price-ws_ext_wholesale_cost-ws_ext_discount_amt)+ws_ext_sales_price)/2) ) year_total\n"
+ + " ,'w' sale_type\n"
+ + " from customer\n"
+ + " ,web_sales\n"
+ + " ,date_dim\n"
+ + " where c_customer_sk = ws_bill_customer_sk\n"
+ + " and ws_sold_date_sk = d_date_sk\n"
+ + " group by c_customer_id\n"
+ + " ,c_first_name\n"
+ + " ,c_last_name\n"
+ + " ,c_preferred_cust_flag\n"
+ + " ,c_birth_country\n"
+ + " ,c_login\n"
+ + " ,c_email_address\n"
+ + " ,d_year\n"
+ + " )\n"
+ + " select \n"
+ + " t_s_secyear.customer_id\n"
+ + " ,t_s_secyear.customer_first_name\n"
+ + " ,t_s_secyear.customer_last_name\n"
+ + " ,t_s_secyear.customer_email_address\n"
+ + " from year_total t_s_firstyear\n"
+ + " ,year_total t_s_secyear\n"
+ + " ,year_total t_c_firstyear\n"
+ + " ,year_total t_c_secyear\n"
+ + " ,year_total t_w_firstyear\n"
+ + " ,year_total t_w_secyear\n"
+ + " where t_s_secyear.customer_id = t_s_firstyear.customer_id\n"
+ + " and t_s_firstyear.customer_id = t_c_secyear.customer_id\n"
+ + " and t_s_firstyear.customer_id = t_c_firstyear.customer_id\n"
+ + " and t_s_firstyear.customer_id = t_w_firstyear.customer_id\n"
+ + " and t_s_firstyear.customer_id = t_w_secyear.customer_id\n"
+ + " and t_s_firstyear.sale_type = 's'\n"
+ + " and t_c_firstyear.sale_type = 'c'\n"
+ + " and t_w_firstyear.sale_type = 'w'\n"
+ + " and t_s_secyear.sale_type = 's'\n"
+ + " and t_c_secyear.sale_type = 'c'\n"
+ + " and t_w_secyear.sale_type = 'w'\n"
+ + " and t_s_firstyear.dyear = 2001\n"
+ + " and t_s_secyear.dyear = 2001+1\n"
+ + " and t_c_firstyear.dyear = 2001\n"
+ + " and t_c_secyear.dyear = 2001+1\n"
+ + " and t_w_firstyear.dyear = 2001\n"
+ + " and t_w_secyear.dyear = 2001+1\n"
+ + " and t_s_firstyear.year_total > 0\n"
+ + " and t_c_firstyear.year_total > 0\n"
+ + " and t_w_firstyear.year_total > 0\n"
+ + " and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end\n"
+ + " > case when t_s_firstyear.year_total > 0 then t_s_secyear.year_total / t_s_firstyear.year_total else null end\n"
+ + " and case when t_c_firstyear.year_total > 0 then t_c_secyear.year_total / t_c_firstyear.year_total else null end\n"
+ + " > case when t_w_firstyear.year_total > 0 then t_w_secyear.year_total / t_w_firstyear.year_total else null end\n"
+ + " order by t_s_secyear.customer_id\n"
+ + " ,t_s_secyear.customer_first_name\n"
+ + " ,t_s_secyear.customer_last_name\n"
+ + " ,t_s_secyear.customer_email_address\n"
+ + "limit 100";
+ String query4StringNoSpaces = query4String.replaceAll("\\s+", "");
+ String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
+ assertEquals(expectedNoSpaces, query4StringNoSpaces);
+ }
- @Test
- public void testQuery55String() throws Exception {
- String query55String = QueryReader.readQuery("query55");
- String expected = "select i_brand_id brand_id, i_brand brand,\n" +
- " \tsum(ss_ext_sales_price) ext_price\n" +
- " from date_dim, store_sales, item\n" +
- " where d_date_sk = ss_sold_date_sk\n" +
- " \tand ss_item_sk = i_item_sk\n" +
- " \tand i_manager_id=36\n" +
- " \tand d_moy=12\n" +
- " \tand d_year=2001\n" +
- " group by i_brand, i_brand_id\n" +
- " order by ext_price desc, i_brand_id\n" +
- "limit 100";
- String query55StringNoSpaces = query55String.replaceAll("\\s+", "");
- String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
- assertEquals(expectedNoSpaces, query55StringNoSpaces);
- }
+ @Test
+ public void testQuery55String() throws Exception {
+ String query55String = QueryReader.readQuery("query55");
+ String expected =
+ "select i_brand_id brand_id, i_brand brand,\n"
+ + " \tsum(ss_ext_sales_price) ext_price\n"
+ + " from date_dim, store_sales, item\n"
+ + " where d_date_sk = ss_sold_date_sk\n"
+ + " \tand ss_item_sk = i_item_sk\n"
+ + " \tand i_manager_id=36\n"
+ + " \tand d_moy=12\n"
+ + " \tand d_year=2001\n"
+ + " group by i_brand, i_brand_id\n"
+ + " order by ext_price desc, i_brand_id\n"
+ + "limit 100";
+ String query55StringNoSpaces = query55String.replaceAll("\\s+", "");
+ String expectedNoSpaces = (headers + expected).replaceAll("\\s+", "");
+ assertEquals(expectedNoSpaces, query55StringNoSpaces);
+ }
}
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
index 7748bee..1d597f0 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java
@@ -18,134 +18,160 @@
package org.apache.beam.sdk.tpcds;
import static org.junit.Assert.assertEquals;
-import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-
+import org.junit.Test;
public class TableSchemaJSONLoaderTest {
- @Test
- public void testStoreReturnsTable() throws Exception {
- String storeReturnsSchemaString = TableSchemaJSONLoader.parseTableSchema("store_returns");
- String expected = "sr_returned_date_sk bigint,"
- + "sr_return_time_sk bigint,"
- + "sr_item_sk bigint,"
- + "sr_customer_sk bigint,"
- + "sr_cdemo_sk bigint,"
- + "sr_hdemo_sk bigint,"
- + "sr_addr_sk bigint,"
- + "sr_store_sk bigint,"
- + "sr_reason_sk bigint,"
- + "sr_ticket_number bigint,"
- + "sr_return_quantity bigint,"
- + "sr_return_amt double,"
- + "sr_return_tax double,"
- + "sr_return_amt_inc_tax double,"
- + "sr_fee double,"
- + "sr_return_ship_cost double,"
- + "sr_refunded_cash double,"
- + "sr_reversed_charge double,"
- + "sr_store_credit double,"
- + "sr_net_loss double";
- assertEquals(expected, storeReturnsSchemaString);
- }
+ @Test
+ public void testStoreReturnsTable() throws Exception {
+ String storeReturnsSchemaString = TableSchemaJSONLoader.parseTableSchema("store_returns");
+ String expected =
+ "sr_returned_date_sk bigint,"
+ + "sr_return_time_sk bigint,"
+ + "sr_item_sk bigint,"
+ + "sr_customer_sk bigint,"
+ + "sr_cdemo_sk bigint,"
+ + "sr_hdemo_sk bigint,"
+ + "sr_addr_sk bigint,"
+ + "sr_store_sk bigint,"
+ + "sr_reason_sk bigint,"
+ + "sr_ticket_number bigint,"
+ + "sr_return_quantity bigint,"
+ + "sr_return_amt double,"
+ + "sr_return_tax double,"
+ + "sr_return_amt_inc_tax double,"
+ + "sr_fee double,"
+ + "sr_return_ship_cost double,"
+ + "sr_refunded_cash double,"
+ + "sr_reversed_charge double,"
+ + "sr_store_credit double,"
+ + "sr_net_loss double";
+ assertEquals(expected, storeReturnsSchemaString);
+ }
- @Test
- public void testItemTable() throws Exception {
- String itemSchemaString = TableSchemaJSONLoader.parseTableSchema("item");
- String expected = "i_item_sk bigint,"
- + "i_item_id varchar,"
- + "i_rec_start_date varchar,"
- + "i_rec_end_date varchar,"
- + "i_item_desc varchar,"
- + "i_current_price double,"
- + "i_wholesale_cost double,"
- + "i_brand_id bigint,"
- + "i_brand varchar,"
- + "i_class_id bigint,"
- + "i_class varchar,"
- + "i_category_id bigint,"
- + "i_category varchar,"
- + "i_manufact_id bigint,"
- + "i_manufact varchar,"
- + "i_size varchar,"
- + "i_formulation varchar,"
- + "i_color varchar,"
- + "i_units varchar,"
- + "i_container varchar,"
- + "i_manager_id bigint,"
- + "i_product_name varchar";
- assertEquals(expected, itemSchemaString);
- }
+ @Test
+ public void testItemTable() throws Exception {
+ String itemSchemaString = TableSchemaJSONLoader.parseTableSchema("item");
+ String expected =
+ "i_item_sk bigint,"
+ + "i_item_id varchar,"
+ + "i_rec_start_date varchar,"
+ + "i_rec_end_date varchar,"
+ + "i_item_desc varchar,"
+ + "i_current_price double,"
+ + "i_wholesale_cost double,"
+ + "i_brand_id bigint,"
+ + "i_brand varchar,"
+ + "i_class_id bigint,"
+ + "i_class varchar,"
+ + "i_category_id bigint,"
+ + "i_category varchar,"
+ + "i_manufact_id bigint,"
+ + "i_manufact varchar,"
+ + "i_size varchar,"
+ + "i_formulation varchar,"
+ + "i_color varchar,"
+ + "i_units varchar,"
+ + "i_container varchar,"
+ + "i_manager_id bigint,"
+ + "i_product_name varchar";
+ assertEquals(expected, itemSchemaString);
+ }
- @Test
- public void testDateDimTable() throws Exception {
- String dateDimSchemaString = TableSchemaJSONLoader.parseTableSchema("date_dim");
- String expected = "d_date_sk bigint,"
- + "d_date_id varchar,"
- + "d_date varchar,"
- + "d_month_seq bigint,"
- + "d_week_seq bigint,"
- + "d_quarter_seq bigint,"
- + "d_year bigint,"
- + "d_dow bigint,"
- + "d_moy bigint,"
- + "d_dom bigint,"
- + "d_qoy bigint,"
- + "d_fy_year bigint,"
- + "d_fy_quarter_seq bigint,"
- + "d_fy_week_seq bigint,"
- + "d_day_name varchar,"
- + "d_quarter_name varchar,"
- + "d_holiday varchar,"
- + "d_weekend varchar,"
- + "d_following_holiday varchar,"
- + "d_first_dom bigint,"
- + "d_last_dom bigint,"
- + "d_same_day_ly bigint,"
- + "d_same_day_lq bigint,"
- + "d_current_day varchar,"
- + "d_current_week varchar,"
- + "d_current_month varchar,"
- + "d_current_quarter varchar,"
- + "d_current_year varchar";
- assertEquals(expected, dateDimSchemaString);
- }
+ @Test
+ public void testDateDimTable() throws Exception {
+ String dateDimSchemaString = TableSchemaJSONLoader.parseTableSchema("date_dim");
+ String expected =
+ "d_date_sk bigint,"
+ + "d_date_id varchar,"
+ + "d_date varchar,"
+ + "d_month_seq bigint,"
+ + "d_week_seq bigint,"
+ + "d_quarter_seq bigint,"
+ + "d_year bigint,"
+ + "d_dow bigint,"
+ + "d_moy bigint,"
+ + "d_dom bigint,"
+ + "d_qoy bigint,"
+ + "d_fy_year bigint,"
+ + "d_fy_quarter_seq bigint,"
+ + "d_fy_week_seq bigint,"
+ + "d_day_name varchar,"
+ + "d_quarter_name varchar,"
+ + "d_holiday varchar,"
+ + "d_weekend varchar,"
+ + "d_following_holiday varchar,"
+ + "d_first_dom bigint,"
+ + "d_last_dom bigint,"
+ + "d_same_day_ly bigint,"
+ + "d_same_day_lq bigint,"
+ + "d_current_day varchar,"
+ + "d_current_week varchar,"
+ + "d_current_month varchar,"
+ + "d_current_quarter varchar,"
+ + "d_current_year varchar";
+ assertEquals(expected, dateDimSchemaString);
+ }
- @Test
- public void testWarehouseTable() throws Exception {
- String warehouseSchemaString = TableSchemaJSONLoader.parseTableSchema("warehouse");
- String expected = "w_warehouse_sk bigint,"
- + "w_warehouse_id varchar,"
- + "w_warehouse_name varchar,"
- + "w_warehouse_sq_ft bigint,"
- + "w_street_number varchar,"
- + "w_street_name varchar,"
- + "w_street_type varchar,"
- + "w_suite_number varchar,"
- + "w_city varchar,"
- + "w_county varchar,"
- + "w_state varchar,"
- + "w_zip varchar,"
- + "w_country varchar,"
- + "w_gmt_offset double";
- assertEquals(expected, warehouseSchemaString);
- }
+ @Test
+ public void testWarehouseTable() throws Exception {
+ String warehouseSchemaString = TableSchemaJSONLoader.parseTableSchema("warehouse");
+ String expected =
+ "w_warehouse_sk bigint,"
+ + "w_warehouse_id varchar,"
+ + "w_warehouse_name varchar,"
+ + "w_warehouse_sq_ft bigint,"
+ + "w_street_number varchar,"
+ + "w_street_name varchar,"
+ + "w_street_type varchar,"
+ + "w_suite_number varchar,"
+ + "w_city varchar,"
+ + "w_county varchar,"
+ + "w_state varchar,"
+ + "w_zip varchar,"
+ + "w_country varchar,"
+ + "w_gmt_offset double";
+ assertEquals(expected, warehouseSchemaString);
+ }
- @Test
- public void testGetAllTableNames() {
- List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
- Collections.sort(tableNames);
- List<String> expectedTableNames = Arrays.asList("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics",
- "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim",
- "warehouse", "web_page", "web_returns", "web_sales", "web_site");
+ @Test
+ public void testGetAllTableNames() {
+ List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
+ Collections.sort(tableNames);
+ List<String> expectedTableNames =
+ Arrays.asList(
+ "call_center",
+ "catalog_page",
+ "catalog_returns",
+ "catalog_sales",
+ "customer",
+ "customer_address",
+ "customer_demographics",
+ "date_dim",
+ "household_demographics",
+ "income_band",
+ "inventory",
+ "item",
+ "promotion",
+ "reason",
+ "ship_mode",
+ "store",
+ "store_returns",
+ "store_sales",
+ "time_dim",
+ "warehouse",
+ "web_page",
+ "web_returns",
+ "web_sales",
+ "web_site");
- assertEquals(expectedTableNames.size(), tableNames.size());
+ assertEquals(expectedTableNames.size(), tableNames.size());
- for (int i = 0; i < tableNames.size(); i++) {
- assertEquals(expectedTableNames.get(i), tableNames.get(i));
- }
+ for (int i = 0; i < tableNames.size(); i++) {
+ assertEquals(expectedTableNames.get(i), tableNames.get(i));
}
+ }
}
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsParametersReaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsParametersReaderTest.java
index 3f8c951..2cd1b0b 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsParametersReaderTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsParametersReaderTest.java
@@ -17,76 +17,76 @@
*/
package org.apache.beam.sdk.tpcds;
+import static org.junit.Assert.assertEquals;
+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
public class TpcdsParametersReaderTest {
- private TpcdsOptions tpcdsOptions;
- private TpcdsOptions tpcdsOptionsError;
+ private TpcdsOptions tpcdsOptions;
+ private TpcdsOptions tpcdsOptionsError;
- @Before
- public void initializeTpcdsOptions() {
- tpcdsOptions = PipelineOptionsFactory.as(TpcdsOptions.class);
- tpcdsOptionsError = PipelineOptionsFactory.as(TpcdsOptions.class);
+ @Before
+ public void initializeTpcdsOptions() {
+ tpcdsOptions = PipelineOptionsFactory.as(TpcdsOptions.class);
+ tpcdsOptionsError = PipelineOptionsFactory.as(TpcdsOptions.class);
- tpcdsOptions.setDataSize("1G");
- tpcdsOptions.setQueries("1,2,3");
- tpcdsOptions.setTpcParallel(2);
+ tpcdsOptions.setDataSize("1G");
+ tpcdsOptions.setQueries("1,2,3");
+ tpcdsOptions.setTpcParallel(2);
- tpcdsOptionsError.setDataSize("5G");
- tpcdsOptionsError.setQueries("0,100");
- tpcdsOptionsError.setTpcParallel(0);
- }
+ tpcdsOptionsError.setDataSize("5G");
+ tpcdsOptionsError.setQueries("0,100");
+ tpcdsOptionsError.setTpcParallel(0);
+ }
- @Test
- public void testGetAndCheckDataSize() throws Exception {
- String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
- String expected = "1G";
- assertEquals(expected, dataSize);
- }
+ @Test
+ public void testGetAndCheckDataSize() throws Exception {
+ String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+ String expected = "1G";
+ assertEquals(expected, dataSize);
+ }
- @Test( expected = Exception.class)
- public void testGetAndCheckDataSizeException() throws Exception {
- TpcdsParametersReader.getAndCheckDataSize(tpcdsOptionsError);
- }
+ @Test(expected = Exception.class)
+ public void testGetAndCheckDataSizeException() throws Exception {
+ TpcdsParametersReader.getAndCheckDataSize(tpcdsOptionsError);
+ }
- @Test
- public void testGetAndCheckQueries() throws Exception {
- TpcdsOptions tpcdsOptionsAll = PipelineOptionsFactory.as(TpcdsOptions.class);
- tpcdsOptionsAll.setQueries("all");
- String[] queryNameArray = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptionsAll);
- String[] expected = new String[99];
- for (int i = 0; i < 99; i++) {
- expected[i] = "query" + (i + 1);
- }
- Assert.assertArrayEquals(expected, queryNameArray);
+ @Test
+ public void testGetAndCheckQueries() throws Exception {
+ TpcdsOptions tpcdsOptionsAll = PipelineOptionsFactory.as(TpcdsOptions.class);
+ tpcdsOptionsAll.setQueries("all");
+ String[] queryNameArray = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptionsAll);
+ String[] expected = new String[99];
+ for (int i = 0; i < 99; i++) {
+ expected[i] = "query" + (i + 1);
}
+ Assert.assertArrayEquals(expected, queryNameArray);
+ }
- @Test
- public void testGetAndCheckAllQueries() throws Exception {
- String[] queryNameArray = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
- String[] expected = {"query1", "query2", "query3"};
- Assert.assertArrayEquals(expected, queryNameArray);
- }
+ @Test
+ public void testGetAndCheckAllQueries() throws Exception {
+ String[] queryNameArray = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
+ String[] expected = {"query1", "query2", "query3"};
+ Assert.assertArrayEquals(expected, queryNameArray);
+ }
- @Test( expected = Exception.class)
- public void testGetAndCheckQueriesException() throws Exception {
- TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptionsError);
- }
+ @Test(expected = Exception.class)
+ public void testGetAndCheckQueriesException() throws Exception {
+ TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptionsError);
+ }
- @Test
- public void testGetAndCheckTpcParallel() throws Exception {
- int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
- int expected = 2;
- assertEquals(expected, nThreads);
- }
+ @Test
+ public void testGetAndCheckTpcParallel() throws Exception {
+ int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
+ int expected = 2;
+ assertEquals(expected, nThreads);
+ }
- @Test( expected = Exception.class)
- public void ttestGetAndCheckTpcParallelException() throws Exception {
- TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptionsError);
- }
+ @Test(expected = Exception.class)
+ public void ttestGetAndCheckTpcParallelException() throws Exception {
+ TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptionsError);
+ }
}
diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java
index 05402f2..9c43935 100644
--- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java
+++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TpcdsSchemasTest.java
@@ -17,108 +17,107 @@
*/
package org.apache.beam.sdk.tpcds;
-import org.apache.beam.sdk.schemas.Schema;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Map;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+import org.junit.Before;
+import org.junit.Test;
public class TpcdsSchemasTest {
- private Map<String, Schema> schemaMap;
- private Map<String, Schema> immutableSchemaMap;
+ private Map<String, Schema> schemaMap;
+ private Map<String, Schema> immutableSchemaMap;
- @Before
- public void initializeMaps() throws Exception {
- schemaMap = TpcdsSchemas.getTpcdsSchemas();
- immutableSchemaMap = TpcdsSchemas.getTpcdsSchemasImmutableMap();
- }
+ @Before
+ public void initializeMaps() throws Exception {
+ schemaMap = TpcdsSchemas.getTpcdsSchemas();
+ immutableSchemaMap = TpcdsSchemas.getTpcdsSchemasImmutableMap();
+ }
- @Test
- public void testCallCenterSchema() throws Exception {
- Schema callCenterSchema =
- Schema.builder()
- .addField("cc_call_center_sk", Schema.FieldType.INT64)
- .addField("cc_call_center_id", Schema.FieldType.STRING)
- .addNullableField("cc_rec_start_date", Schema.FieldType.STRING)
- .addNullableField("cc_rec_end_date", Schema.FieldType.STRING)
- .addNullableField("cc_closed_date_sk", Schema.FieldType.INT64)
- .addNullableField("cc_open_date_sk", Schema.FieldType.INT64)
- .addNullableField("cc_name", Schema.FieldType.STRING)
- .addNullableField("cc_class", Schema.FieldType.STRING)
- .addNullableField("cc_employees", Schema.FieldType.INT64)
- .addNullableField("cc_sq_ft", Schema.FieldType.INT64)
- .addNullableField("cc_hours", Schema.FieldType.STRING)
- .addNullableField("cc_manager", Schema.FieldType.STRING)
- .addNullableField("cc_mkt_id", Schema.FieldType.INT64)
- .addNullableField("cc_mkt_class", Schema.FieldType.STRING)
- .addNullableField("cc_mkt_desc", Schema.FieldType.STRING)
- .addNullableField("cc_market_manager", Schema.FieldType.STRING)
- .addNullableField("cc_division", Schema.FieldType.INT64)
- .addNullableField("cc_division_name", Schema.FieldType.STRING)
- .addNullableField("cc_company", Schema.FieldType.INT64)
- .addNullableField("cc_company_name", Schema.FieldType.STRING)
- .addNullableField("cc_street_number", Schema.FieldType.STRING)
- .addNullableField("cc_street_name", Schema.FieldType.STRING)
- .addNullableField("cc_street_type", Schema.FieldType.STRING)
- .addNullableField("cc_suite_number", Schema.FieldType.STRING)
- .addNullableField("cc_city", Schema.FieldType.STRING)
- .addNullableField("cc_county", Schema.FieldType.STRING)
- .addNullableField("cc_state", Schema.FieldType.STRING)
- .addNullableField("cc_zip", Schema.FieldType.STRING)
- .addNullableField("cc_country", Schema.FieldType.STRING)
- .addNullableField("cc_gmt_offset", Schema.FieldType.DOUBLE)
- .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
- .build();
+ @Test
+ public void testCallCenterSchema() throws Exception {
+ Schema callCenterSchema =
+ Schema.builder()
+ .addField("cc_call_center_sk", Schema.FieldType.INT64)
+ .addField("cc_call_center_id", Schema.FieldType.STRING)
+ .addNullableField("cc_rec_start_date", Schema.FieldType.STRING)
+ .addNullableField("cc_rec_end_date", Schema.FieldType.STRING)
+ .addNullableField("cc_closed_date_sk", Schema.FieldType.INT64)
+ .addNullableField("cc_open_date_sk", Schema.FieldType.INT64)
+ .addNullableField("cc_name", Schema.FieldType.STRING)
+ .addNullableField("cc_class", Schema.FieldType.STRING)
+ .addNullableField("cc_employees", Schema.FieldType.INT64)
+ .addNullableField("cc_sq_ft", Schema.FieldType.INT64)
+ .addNullableField("cc_hours", Schema.FieldType.STRING)
+ .addNullableField("cc_manager", Schema.FieldType.STRING)
+ .addNullableField("cc_mkt_id", Schema.FieldType.INT64)
+ .addNullableField("cc_mkt_class", Schema.FieldType.STRING)
+ .addNullableField("cc_mkt_desc", Schema.FieldType.STRING)
+ .addNullableField("cc_market_manager", Schema.FieldType.STRING)
+ .addNullableField("cc_division", Schema.FieldType.INT64)
+ .addNullableField("cc_division_name", Schema.FieldType.STRING)
+ .addNullableField("cc_company", Schema.FieldType.INT64)
+ .addNullableField("cc_company_name", Schema.FieldType.STRING)
+ .addNullableField("cc_street_number", Schema.FieldType.STRING)
+ .addNullableField("cc_street_name", Schema.FieldType.STRING)
+ .addNullableField("cc_street_type", Schema.FieldType.STRING)
+ .addNullableField("cc_suite_number", Schema.FieldType.STRING)
+ .addNullableField("cc_city", Schema.FieldType.STRING)
+ .addNullableField("cc_county", Schema.FieldType.STRING)
+ .addNullableField("cc_state", Schema.FieldType.STRING)
+ .addNullableField("cc_zip", Schema.FieldType.STRING)
+ .addNullableField("cc_country", Schema.FieldType.STRING)
+ .addNullableField("cc_gmt_offset", Schema.FieldType.DOUBLE)
+ .addNullableField("cc_tax_percentage", Schema.FieldType.DOUBLE)
+ .build();
- assertNotEquals(schemaMap.get("call_center"), callCenterSchema);
- assertEquals(immutableSchemaMap.get("call_center"), callCenterSchema);
- }
+ assertNotEquals(schemaMap.get("call_center"), callCenterSchema);
+ assertEquals(immutableSchemaMap.get("call_center"), callCenterSchema);
+ }
- @Test
- public void testCatalogPageSchemaNullable() throws Exception {
- Schema catalogPageSchemaNullable =
- Schema.builder()
- .addNullableField("cp_catalog_page_sk", Schema.FieldType.INT64)
- .addNullableField("cp_catalog_page_id", Schema.FieldType.STRING)
- .addNullableField("cp_start_date_sk", Schema.FieldType.INT64)
- .addNullableField("cp_end_date_sk", Schema.FieldType.INT64)
- .addNullableField("cp_department", Schema.FieldType.STRING)
- .addNullableField("cp_catalog_number", Schema.FieldType.INT64)
- .addNullableField("cp_catalog_page_number", Schema.FieldType.INT64)
- .addNullableField("cp_description", Schema.FieldType.STRING)
- .addNullableField("cp_type", Schema.FieldType.STRING)
- .build();
+ @Test
+ public void testCatalogPageSchemaNullable() throws Exception {
+ Schema catalogPageSchemaNullable =
+ Schema.builder()
+ .addNullableField("cp_catalog_page_sk", Schema.FieldType.INT64)
+ .addNullableField("cp_catalog_page_id", Schema.FieldType.STRING)
+ .addNullableField("cp_start_date_sk", Schema.FieldType.INT64)
+ .addNullableField("cp_end_date_sk", Schema.FieldType.INT64)
+ .addNullableField("cp_department", Schema.FieldType.STRING)
+ .addNullableField("cp_catalog_number", Schema.FieldType.INT64)
+ .addNullableField("cp_catalog_page_number", Schema.FieldType.INT64)
+ .addNullableField("cp_description", Schema.FieldType.STRING)
+ .addNullableField("cp_type", Schema.FieldType.STRING)
+ .build();
- assertEquals(schemaMap.get("catalog_page"), catalogPageSchemaNullable);
- assertNotEquals(schemaMap.get("catalog_page"), TpcdsSchemas.getCatalogPageSchema());
- assertEquals(immutableSchemaMap.get("catalog_page"), TpcdsSchemas.getCatalogPageSchema());
- }
+ assertEquals(schemaMap.get("catalog_page"), catalogPageSchemaNullable);
+ assertNotEquals(schemaMap.get("catalog_page"), TpcdsSchemas.getCatalogPageSchema());
+ assertEquals(immutableSchemaMap.get("catalog_page"), TpcdsSchemas.getCatalogPageSchema());
+ }
- @Test
- public void testCustomerAddressSchemaNullable() throws Exception {
- Schema customerAddressSchemaNullable =
- Schema.builder()
- .addNullableField("ca_address_sk", Schema.FieldType.INT64)
- .addNullableField("ca_address_id", Schema.FieldType.STRING)
- .addNullableField("ca_street_number", Schema.FieldType.STRING)
- .addNullableField("ca_street_name", Schema.FieldType.STRING)
- .addNullableField("ca_street_type", Schema.FieldType.STRING)
- .addNullableField("ca_suite_number", Schema.FieldType.STRING)
- .addNullableField("ca_city", Schema.FieldType.STRING)
- .addNullableField("ca_county", Schema.FieldType.STRING)
- .addNullableField("ca_state", Schema.FieldType.STRING)
- .addNullableField("ca_zip", Schema.FieldType.STRING)
- .addNullableField("ca_country", Schema.FieldType.STRING)
- .addNullableField("ca_gmt_offset", Schema.FieldType.DOUBLE)
- .addNullableField("ca_location_type", Schema.FieldType.STRING)
- .build();
+ @Test
+ public void testCustomerAddressSchemaNullable() throws Exception {
+ Schema customerAddressSchemaNullable =
+ Schema.builder()
+ .addNullableField("ca_address_sk", Schema.FieldType.INT64)
+ .addNullableField("ca_address_id", Schema.FieldType.STRING)
+ .addNullableField("ca_street_number", Schema.FieldType.STRING)
+ .addNullableField("ca_street_name", Schema.FieldType.STRING)
+ .addNullableField("ca_street_type", Schema.FieldType.STRING)
+ .addNullableField("ca_suite_number", Schema.FieldType.STRING)
+ .addNullableField("ca_city", Schema.FieldType.STRING)
+ .addNullableField("ca_county", Schema.FieldType.STRING)
+ .addNullableField("ca_state", Schema.FieldType.STRING)
+ .addNullableField("ca_zip", Schema.FieldType.STRING)
+ .addNullableField("ca_country", Schema.FieldType.STRING)
+ .addNullableField("ca_gmt_offset", Schema.FieldType.DOUBLE)
+ .addNullableField("ca_location_type", Schema.FieldType.STRING)
+ .build();
- assertEquals(schemaMap.get("customer_address"), customerAddressSchemaNullable);
- assertNotEquals(schemaMap.get("customer_address"), TpcdsSchemas.getCustomerAddressSchema());
- assertEquals(immutableSchemaMap.get("customer_address"), TpcdsSchemas.getCustomerAddressSchema());
- }
+ assertEquals(schemaMap.get("customer_address"), customerAddressSchemaNullable);
+ assertNotEquals(schemaMap.get("customer_address"), TpcdsSchemas.getCustomerAddressSchema());
+ assertEquals(
+ immutableSchemaMap.get("customer_address"), TpcdsSchemas.getCustomerAddressSchema());
+ }
}