You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/02 03:52:13 UTC
[flink] 02/02: [FLINK-13436][e2e] Add TPC-H queries as E2E tests
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 274ff58e52c826c5f358ddbc539c96de4d8af801
Author: TsReaper <ts...@gmail.com>
AuthorDate: Thu Aug 1 15:55:18 2019 +0800
[FLINK-13436][e2e] Add TPC-H queries as E2E tests
This closes #9312
---
flink-end-to-end-tests/flink-tpch-test/pom.xml | 60 ++++
.../apache/flink/table/tpch/TpchDataGenerator.java | 127 ++++++++
.../flink/table/tpch/TpchResultComparator.java | 123 ++++++++
flink-end-to-end-tests/pom.xml | 1 +
flink-end-to-end-tests/run-nightly-tests.sh | 2 +
.../test-data/tpch/modified-query/q11.sql | 30 ++
.../test-data/tpch/modified-query/q15.sql | 69 ++++
.../test-data/tpch/modified-query/q20.sql | 36 +++
.../test-data/tpch/modified-query/q6.sql | 11 +
.../test-scripts/test-data/tpch/sink/q1.yaml | 51 +++
.../test-scripts/test-data/tpch/sink/q10.yaml | 43 +++
.../test-scripts/test-data/tpch/sink/q11.yaml | 19 ++
.../test-scripts/test-data/tpch/sink/q12.yaml | 23 ++
.../test-scripts/test-data/tpch/sink/q13.yaml | 19 ++
.../test-scripts/test-data/tpch/sink/q14.yaml | 15 +
.../test-scripts/test-data/tpch/sink/q15.yaml | 31 ++
.../test-scripts/test-data/tpch/sink/q16.yaml | 27 ++
.../test-scripts/test-data/tpch/sink/q17.yaml | 15 +
.../test-scripts/test-data/tpch/sink/q18.yaml | 35 +++
.../test-scripts/test-data/tpch/sink/q19.yaml | 15 +
.../test-scripts/test-data/tpch/sink/q2.yaml | 43 +++
.../test-scripts/test-data/tpch/sink/q20.yaml | 19 ++
.../test-scripts/test-data/tpch/sink/q21.yaml | 19 ++
.../test-scripts/test-data/tpch/sink/q22.yaml | 23 ++
.../test-scripts/test-data/tpch/sink/q3.yaml | 27 ++
.../test-scripts/test-data/tpch/sink/q4.yaml | 19 ++
.../test-scripts/test-data/tpch/sink/q5.yaml | 19 ++
.../test-scripts/test-data/tpch/sink/q6.yaml | 15 +
.../test-scripts/test-data/tpch/sink/q7.yaml | 27 ++
.../test-scripts/test-data/tpch/sink/q8.yaml | 19 ++
.../test-scripts/test-data/tpch/sink/q9.yaml | 23 ++
.../test-scripts/test-data/tpch/source.yaml | 349 +++++++++++++++++++++
flink-end-to-end-tests/test-scripts/test_tpch.sh | 91 ++++++
pom.xml | 2 +-
34 files changed, 1446 insertions(+), 1 deletion(-)
diff --git a/flink-end-to-end-tests/flink-tpch-test/pom.xml b/flink-end-to-end-tests/flink-tpch-test/pom.xml
new file mode 100644
index 0000000..e317f31
--- /dev/null
+++ b/flink-end-to-end-tests/flink-tpch-test/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.10-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-tpch-test</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.airlift.tpch</groupId>
+ <artifactId>tpch</artifactId>
+ <version>0.10</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.10</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchDataGenerator.java b/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchDataGenerator.java
new file mode 100644
index 0000000..4646a02
--- /dev/null
+++ b/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchDataGenerator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.tpch;
+
+import io.airlift.tpch.TpchEntity;
+import io.airlift.tpch.TpchTable;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+
+/**
+ * TPC-H test data generator.
+ */
+public class TpchDataGenerator {
+
+ public static final int QUERY_NUM = 22;
+
+ public static void main(String[] args) throws IOException {
+ if (args.length != 2) {
+ System.out.println("Exactly 1 double value and 1 path should be provided as argument");
+ return;
+ }
+
+ double scale = Double.valueOf(args[0]);
+ String path = args[1];
+ generateTable(scale, path);
+ generateQuery(path);
+ generateExpected(path);
+ }
+
+ private static void generateTable(double scale, String path) throws IOException {
+ File dir = new File(path + "/table");
+ dir.mkdir();
+
+ for (TpchTable table : TpchTable.getTables()) {
+ Iterable generator = table.createGenerator(scale, 1, 1);
+
+ StringBuilder builder = new StringBuilder();
+ generator.forEach(s -> {
+ String line = ((TpchEntity) s).toLine().trim();
+ if (line.endsWith("|")) {
+ line = line.substring(0, line.length() - 1);
+ }
+ builder.append(line).append('\n');
+ });
+
+ try (BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(
+ new FileOutputStream(path + "/table/" + table.getTableName() + ".csv")))
+ ) {
+ writer.write(builder.toString());
+ }
+ }
+ }
+
+ private static void generateQuery(String path) throws IOException {
+ File dir = new File(path + "/query");
+ dir.mkdir();
+
+ for (int i = 0; i < QUERY_NUM; i++) {
+ try (
+ InputStream in = TpchDataGenerator.class.getResourceAsStream(
+ "/io/airlift/tpch/queries/q" + (i + 1) + ".sql");
+ OutputStream out = new FileOutputStream(path + "/query/q" + (i + 1) + ".sql")
+ ) {
+ byte[] buffer = new byte[4096];
+ int bytesRead = 0;
+ while ((bytesRead = in.read(buffer)) > 0) {
+ out.write(buffer, 0, bytesRead);
+ }
+ }
+ }
+ }
+
+ private static void generateExpected(String path) throws IOException {
+ File dir = new File(path + "/expected");
+ dir.mkdir();
+
+ for (int i = 0; i < QUERY_NUM; i++) {
+ try (
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(TpchDataGenerator.class.getResourceAsStream(
+ "/io/airlift/tpch/queries/q" + (i + 1) + ".result")));
+ BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(
+ new FileOutputStream(path + "/expected/q" + (i + 1) + ".csv")))
+ ) {
+ int lineNumber = 0;
+ String line;
+ while ((line = reader.readLine()) != null) {
+ line = line.trim().replace("null", "");
+ lineNumber++;
+ if (lineNumber == 1) {
+ continue;
+ }
+ if (line.length() > 0 && line.endsWith("|")) {
+ line = line.substring(0, line.length() - 1);
+ }
+ writer.write(line + "\n");
+ }
+ }
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchResultComparator.java b/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchResultComparator.java
new file mode 100644
index 0000000..e9bbea3
--- /dev/null
+++ b/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchResultComparator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.tpch;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+
+/**
+ * Result comparator for TPC-H test, according to the TPC-H standard specification v2.18.0.
+ */
+public class TpchResultComparator {
+
+ public static void main(String[] args) throws IOException {
+ if (args.length != 2) {
+ System.out.println(
+ "Exactly 2 paths must be provided, the expected result path and the actual result path");
+ System.exit(1);
+ }
+
+ String expectedPath = args[0];
+ String actualPath = args[1];
+
+ try (
+ BufferedReader expectedReader = new BufferedReader(new FileReader(expectedPath));
+ BufferedReader actualReader = new BufferedReader(new FileReader(actualPath))
+ ) {
+ int expectedLineNum = 0;
+ int actualLineNum = 0;
+
+ String expectedLine, actualLine;
+ while (
+ (expectedLine = expectedReader.readLine()) != null &&
+ (actualLine = actualReader.readLine()) != null
+ ) {
+ String[] expected = expectedLine.split("\\|");
+ expectedLineNum++;
+ String[] actual = actualLine.split("\\|");
+ actualLineNum++;
+
+ if (expected.length != actual.length) {
+ System.out.println(
+ "Incorrect number of columns on line " + actualLineNum +
+ "! Expecting " + expected.length + " columns, but found " + actual.length + " columns.");
+ System.exit(1);
+ }
+ for (int i = 0; i < expected.length; i++) {
+ boolean failed;
+ try {
+ long e = Long.valueOf(expected[i]);
+ long a = Long.valueOf(actual[i]);
+ failed = (e != a);
+ } catch (NumberFormatException nfe) {
+ try {
+ double e = Double.valueOf(expected[i]);
+ double a = Double.valueOf(actual[i]);
+ if (e < 0 && a > 0 || e > 0 && a < 0) {
+ failed = true;
+ } else {
+ if (e < 0) {
+ e = -e;
+ a = -a;
+ }
+ double t = round(a, 2);
+ // defined in TPC-H standard specification v2.18.0 section 2.1.3.5
+ failed = (e * 0.99 > t || e * 1.01 < t);
+ }
+ } catch (NumberFormatException nfe2) {
+ failed = !expected[i].trim().equals(actual[i].trim());
+ }
+ }
+ if (failed) {
+ System.out.println("Incorrect result on line " + actualLineNum + " column " + (i + 1) +
+ "! Expecting " + expected[i] + ", but found " + actual[i] + ".");
+ System.exit(1);
+ }
+ }
+ }
+
+ while (expectedReader.readLine() != null) {
+ expectedLineNum++;
+ }
+ while (actualReader.readLine() != null) {
+ actualLineNum++;
+ }
+ if (expectedLineNum != actualLineNum) {
+ System.out.println(
+ "Incorrect number of lines! Expecting " + expectedLineNum +
+ " lines, but found " + actualLineNum + " lines.");
+ System.exit(1);
+ }
+ }
+ }
+
+ /**
+ * Rounding function defined in TPC-H standard specification v2.18.0 chapter 10.
+ */
+ private static double round(double x, int m) {
+ if (x < 0) {
+ throw new IllegalArgumentException("x must be non-negative");
+ }
+ double y = x + 5 * Math.pow(10, -m - 1);
+ double z = y * Math.pow(10, m);
+ double q = Math.floor(z);
+ return q / Math.pow(10, m);
+ }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index d676291..85af090 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -68,6 +68,7 @@ under the License.
<module>flink-streaming-kafka011-test</module>
<module>flink-streaming-kafka010-test</module>
<module>flink-plugins-test</module>
+ <module>flink-tpch-test</module>
</modules>
<!-- See main pom.xml for explanation of profiles -->
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index b8a1a40..d50b328 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -152,6 +152,8 @@ run_test "SQL Client end-to-end test for Kafka 0.10" "$END_TO_END_DIR/test-scrip
run_test "SQL Client end-to-end test for Kafka 0.11" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh"
run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"
+run_test "TPC-H end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_tpch.sh"
+
run_test "Heavy deployment end-to-end test" "$END_TO_END_DIR/test-scripts/test_heavy_deployment.sh" "skip_check_exceptions"
run_test "ConnectedComponents iterations with high parallelism end-to-end test" "$END_TO_END_DIR/test-scripts/test_high_parallelism_iterations.sh 25"
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q11.sql b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q11.sql
new file mode 100644
index 0000000..be2cd09
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q11.sql
@@ -0,0 +1,30 @@
+-- database: presto; groups: tpch; tables: partsupp,supplier,nation
+SELECT
+ ps_partkey,
+-- sum(ps_supplycost * ps_availqty) AS value
+ sum(ps_supplycost * ps_availqty) AS `value`
+FROM
+ partsupp,
+ supplier,
+ nation
+WHERE
+ ps_suppkey = s_suppkey
+ AND s_nationkey = n_nationkey
+ AND n_name = 'GERMANY'
+GROUP BY
+ ps_partkey
+HAVING
+ sum(ps_supplycost * ps_availqty) > (
+ SELECT sum(ps_supplycost * ps_availqty) * 0.0001
+ FROM
+ partsupp,
+ supplier,
+ nation
+ WHERE
+ ps_suppkey = s_suppkey
+ AND s_nationkey = n_nationkey
+ AND n_name = 'GERMANY'
+ )
+ORDER BY
+-- value DESC
+ `value` DESC
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q15.sql b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q15.sql
new file mode 100644
index 0000000..8182b3e
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q15.sql
@@ -0,0 +1,69 @@
+-- database: presto; groups: tpch; tables: lineitem,supplier
+-- CREATE OR REPLACE VIEW revenue AS
+-- SELECT
+-- l_suppkey AS supplier_no,
+-- sum(l_extendedprice * (1 - l_discount)) AS total_revenue
+-- FROM
+-- lineitem
+-- WHERE
+-- l_shipdate >= DATE '1996-01-01'
+-- AND l_shipdate < DATE '1996-01-01' + INTERVAL '3' MONTH
+-- GROUP BY
+-- l_suppkey;
+--
+-- SELECT
+-- s_suppkey,
+-- s_name,
+-- s_address,
+-- s_phone,
+-- total_revenue
+-- FROM
+-- supplier,
+-- revenue
+-- WHERE
+-- s_suppkey = supplier_no
+-- AND total_revenue = (
+-- SELECT max(total_revenue)
+-- FROM
+-- revenue
+-- )
+-- ORDER BY
+-- s_suppkey;
+-- Blink does not support view
+
+SELECT
+ s_suppkey,
+ s_name,
+ s_address,
+ s_phone,
+ total_revenue
+FROM
+ supplier, (
+ SELECT
+ l_suppkey AS supplier_no,
+ sum(l_extendedprice * (1 - l_discount)) AS total_revenue
+ FROM
+ lineitem
+ WHERE
+ l_shipdate >= DATE '1996-01-01'
+ AND l_shipdate < DATE '1996-01-01' + INTERVAL '3' MONTH
+ GROUP BY
+ l_suppkey) AS revenue
+WHERE
+ s_suppkey = supplier_no
+ AND total_revenue = (
+ SELECT max(total_revenue)
+ FROM (
+ SELECT
+ l_suppkey AS supplier_no,
+ sum(l_extendedprice * (1 - l_discount)) AS total_revenue
+ FROM
+ lineitem
+ WHERE
+ l_shipdate >= DATE '1996-01-01'
+ AND l_shipdate < DATE '1996-01-01' + INTERVAL '3' MONTH
+ GROUP BY
+ l_suppkey) AS revenue
+ )
+ORDER BY
+ s_suppkey;
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q20.sql b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q20.sql
new file mode 100644
index 0000000..7445398
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q20.sql
@@ -0,0 +1,36 @@
+-- database: presto; groups: tpch; tables: supplier,nation,partsupp,lineitem,part
+SELECT
+ s_name,
+ s_address
+FROM
+ supplier, nation
+WHERE
+ s_suppkey IN (
+ SELECT ps_suppkey
+ FROM
+ partsupp
+ WHERE
+ ps_partkey IN (
+ SELECT p_partkey
+ FROM
+ part
+ WHERE
+ p_name LIKE 'forest%'
+ )
+ AND ps_availqty > (
+ SELECT 0.5 * sum(l_quantity)
+ FROM
+ lineitem
+ WHERE
+ l_partkey = ps_partkey
+ AND l_suppkey = ps_suppkey
+ -- AND l_shipdate >= date('1994-01-01')
+ -- AND l_shipdate < date('1994-01-01') + interval '1' YEAR
+ -- Blink does not support the above format
+ AND l_shipdate >= date '1994-01-01'
+ AND l_shipdate < date '1994-01-01' + interval '1' YEAR
+)
+)
+AND s_nationkey = n_nationkey
+AND n_name = 'CANADA'
+ORDER BY s_name
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q6.sql b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q6.sql
new file mode 100644
index 0000000..28fb52e
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q6.sql
@@ -0,0 +1,11 @@
+-- database: presto; groups: tpch; tables: lineitem
+SELECT sum(l_extendedprice * l_discount) AS revenue
+FROM
+ lineitem
+WHERE
+ l_shipdate >= DATE '1994-01-01'
+ AND l_shipdate < DATE '1994-01-01' + INTERVAL '1' YEAR
+-- AND l_discount BETWEEN decimal '0.06' - decimal '0.01' AND decimal '0.06' + decimal '0.01'
+-- Blink currently does not support the above feature
+AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
+AND l_quantity < 24
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q1.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q1.yaml
new file mode 100644
index 0000000..be89601
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q1.yaml
@@ -0,0 +1,51 @@
+tables:
+ - name: q1
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q1.csv"
+ format:
+ type: csv
+ fields:
+ - name: l_returnflag
+ type: VARCHAR
+ - name: l_linestatus
+ type: VARCHAR
+ - name: sum_qty
+ type: DOUBLE
+ - name: sum_base_price
+ type: DOUBLE
+ - name: sum_disc_price
+ type: DOUBLE
+ - name: sum_charge
+ type: DOUBLE
+ - name: avg_qty
+ type: DOUBLE
+ - name: avg_price
+ type: DOUBLE
+ - name: avg_disc
+ type: DOUBLE
+ - name: count_order
+ type: BIGINT
+ field-delimiter: "|"
+ schema:
+ - name: l_returnflag
+ type: VARCHAR
+ - name: l_linestatus
+ type: VARCHAR
+ - name: sum_qty
+ type: DOUBLE
+ - name: sum_base_price
+ type: DOUBLE
+ - name: sum_disc_price
+ type: DOUBLE
+ - name: sum_charge
+ type: DOUBLE
+ - name: avg_qty
+ type: DOUBLE
+ - name: avg_price
+ type: DOUBLE
+ - name: avg_disc
+ type: DOUBLE
+ - name: count_order
+ type: BIGINT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q10.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q10.yaml
new file mode 100644
index 0000000..3fcbb7d
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q10.yaml
@@ -0,0 +1,43 @@
+tables:
+ - name: q10
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q10.csv"
+ format:
+ type: csv
+ fields:
+ - name: c_custkey
+ type: BIGINT
+ - name: c_name
+ type: VARCHAR
+ - name: revenue
+ type: DOUBLE
+ - name: c_acctbal
+ type: DOUBLE
+ - name: n_name
+ type: VARCHAR
+ - name: c_address
+ type: VARCHAR
+ - name: c_phone
+ type: VARCHAR
+ - name: c_comment
+ type: VARCHAR
+ field-delimiter: "|"
+ schema:
+ - name: c_custkey
+ type: BIGINT
+ - name: c_name
+ type: VARCHAR
+ - name: revenue
+ type: DOUBLE
+ - name: c_acctbal
+ type: DOUBLE
+ - name: n_name
+ type: VARCHAR
+ - name: c_address
+ type: VARCHAR
+ - name: c_phone
+ type: VARCHAR
+ - name: c_comment
+ type: VARCHAR
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q11.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q11.yaml
new file mode 100644
index 0000000..c729305
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q11.yaml
@@ -0,0 +1,19 @@
+tables:
+ - name: q11
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q11.csv"
+ format:
+ type: csv
+ fields:
+ - name: ps_partkey
+ type: BIGINT
+ - name: value
+ type: DOUBLE
+ field-delimiter: "|"
+ schema:
+ - name: ps_partkey
+ type: BIGINT
+ - name: value
+ type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q12.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q12.yaml
new file mode 100644
index 0000000..d990cf0
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q12.yaml
@@ -0,0 +1,23 @@
+tables:
+ - name: q12
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q12.csv"
+ format:
+ type: csv
+ fields:
+ - name: l_shipmode
+ type: VARCHAR
+ - name: high_line_count
+ type: INT
+ - name: low_line_count
+ type: INT
+ field-delimiter: "|"
+ schema:
+ - name: l_shipmode
+ type: VARCHAR
+ - name: high_line_count
+ type: INT
+ - name: low_line_count
+ type: INT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q13.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q13.yaml
new file mode 100644
index 0000000..d3c30f9
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q13.yaml
@@ -0,0 +1,19 @@
+tables:
+ - name: q13
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q13.csv"
+ format:
+ type: csv
+ fields:
+ - name: c_count
+ type: BIGINT
+ - name: custdist
+ type: BIGINT
+ field-delimiter: "|"
+ schema:
+ - name: c_count
+ type: BIGINT
+ - name: custdist
+ type: BIGINT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q14.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q14.yaml
new file mode 100644
index 0000000..551339b
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q14.yaml
@@ -0,0 +1,15 @@
+tables:
+ - name: q14
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q14.csv"
+ format:
+ type: csv
+ fields:
+ - name: promo_revenue
+ type: DOUBLE
+ field-delimiter: "|"
+ schema:
+ - name: promo_revenue
+ type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q15.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q15.yaml
new file mode 100644
index 0000000..069757a
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q15.yaml
@@ -0,0 +1,31 @@
+tables:
+ - name: q15
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q15.csv"
+ format:
+ type: csv
+ fields:
+ - name: s_suppkey
+ type: BIGINT
+ - name: s_name
+ type: VARCHAR
+ - name: s_address
+ type: VARCHAR
+ - name: s_phone
+ type: VARCHAR
+ - name: total_revenue
+ type: DOUBLE
+ field-delimiter: "|"
+ schema:
+ - name: s_suppkey
+ type: BIGINT
+ - name: s_name
+ type: VARCHAR
+ - name: s_address
+ type: VARCHAR
+ - name: s_phone
+ type: VARCHAR
+ - name: total_revenue
+ type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q16.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q16.yaml
new file mode 100644
index 0000000..95b2293
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q16.yaml
@@ -0,0 +1,27 @@
+tables:
+ - name: q16
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q16.csv"
+ format:
+ type: csv
+ fields:
+ - name: p_brand
+ type: VARCHAR
+ - name: p_type
+ type: VARCHAR
+ - name: p_size
+ type: INT
+ - name: supplier_cnt
+ type: BIGINT
+ field-delimiter: "|"
+ schema:
+ - name: p_brand
+ type: VARCHAR
+ - name: p_type
+ type: VARCHAR
+ - name: p_size
+ type: INT
+ - name: supplier_cnt
+ type: BIGINT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q17.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q17.yaml
new file mode 100644
index 0000000..bf618a7
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q17.yaml
@@ -0,0 +1,15 @@
+tables:
+ - name: q17
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q17.csv"
+ format:
+ type: csv
+ fields:
+ - name: avg_yearly
+ type: DOUBLE
+ field-delimiter: "|"
+ schema:
+ - name: avg_yearly
+ type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q18.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q18.yaml
new file mode 100644
index 0000000..303c8b4
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q18.yaml
@@ -0,0 +1,35 @@
+tables:
+ - name: q18
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q18.csv"
+ format:
+ type: csv
+ fields:
+ - name: c_name
+ type: VARCHAR
+ - name: c_custkey
+ type: BIGINT
+ - name: o_orderkey
+ type: BIGINT
+ - name: o_orderdate
+ type: DATE
+ - name: o_totalprice
+ type: DOUBLE
+ - name: sum(l_quantity)
+ type: DOUBLE
+ field-delimiter: "|"
+ schema:
+ - name: c_name
+ type: VARCHAR
+ - name: c_custkey
+ type: BIGINT
+ - name: o_orderkey
+ type: BIGINT
+ - name: o_orderdate
+ type: DATE
+ - name: o_totalprice
+ type: DOUBLE
+ - name: sum(l_quantity)
+ type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q19.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q19.yaml
new file mode 100644
index 0000000..f84d177
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q19.yaml
@@ -0,0 +1,15 @@
+tables:
+ - name: q19
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q19.csv"
+ format:
+ type: csv
+ fields:
+ - name: revenue
+ type: DOUBLE
+ field-delimiter: "|"
+ schema:
+ - name: revenue
+ type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q2.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q2.yaml
new file mode 100644
index 0000000..17f58b1
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q2.yaml
@@ -0,0 +1,43 @@
+tables:
+ - name: q2
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q2.csv"
+ format:
+ type: csv
+ fields:
+ - name: s_acctbal
+ type: DOUBLE
+ - name: s_name
+ type: VARCHAR
+ - name: n_name
+ type: VARCHAR
+ - name: p_partkey
+ type: BIGINT
+ - name: p_mfgr
+ type: VARCHAR
+ - name: s_addres
+ type: VARCHAR
+ - name: s_phone
+ type: VARCHAR
+ - name: s_comment
+ type: VARCHAR
+ field-delimiter: "|"
+ schema:
+ - name: s_acctbal
+ type: DOUBLE
+ - name: s_name
+ type: VARCHAR
+ - name: n_name
+ type: VARCHAR
+ - name: p_partkey
+ type: BIGINT
+ - name: p_mfgr
+ type: VARCHAR
+ - name: s_addres
+ type: VARCHAR
+ - name: s_phone
+ type: VARCHAR
+ - name: s_comment
+ type: VARCHAR
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q20.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q20.yaml
new file mode 100644
index 0000000..ca58a53
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q20.yaml
@@ -0,0 +1,19 @@
+tables:
+ - name: q20
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q20.csv"
+ format:
+ type: csv
+ fields:
+ - name: s_name
+ type: VARCHAR
+ - name: s_address
+ type: VARCHAR
+ field-delimiter: "|"
+ schema:
+ - name: s_name
+ type: VARCHAR
+ - name: s_address
+ type: VARCHAR
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q21.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q21.yaml
new file mode 100644
index 0000000..760573f
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q21.yaml
@@ -0,0 +1,19 @@
+tables:
+ - name: q21
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q21.csv"
+ format:
+ type: csv
+ fields:
+ - name: s_name
+ type: VARCHAR
+ - name: numwait
+ type: BIGINT
+ field-delimiter: "|"
+ schema:
+ - name: s_name
+ type: VARCHAR
+ - name: numwait
+ type: BIGINT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q22.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q22.yaml
new file mode 100644
index 0000000..f39035e
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q22.yaml
@@ -0,0 +1,23 @@
+tables:
+ - name: q22
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q22.csv"
+ format:
+ type: csv
+ fields:
+ - name: cntrycode
+ type: VARCHAR
+ - name: numcust
+ type: BIGINT
+ - name: totacctbal
+ type: DOUBLE
+ field-delimiter: "|"
+ schema:
+ - name: cntrycode
+ type: VARCHAR
+ - name: numcust
+ type: BIGINT
+ - name: totacctbal
+ type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q3.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q3.yaml
new file mode 100644
index 0000000..8617e50
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q3.yaml
@@ -0,0 +1,27 @@
+tables:
+ - name: q3
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q3.csv"
+ format:
+ type: csv
+ fields:
+ - name: l_orderkey
+ type: BIGINT
+ - name: revenue
+ type: DOUBLE
+ - name: o_orderdate
+ type: DATE
+ - name: o_shippriority
+ type: INT
+ field-delimiter: "|"
+ schema:
+ - name: l_orderkey
+ type: BIGINT
+ - name: revenue
+ type: DOUBLE
+ - name: o_orderdate
+ type: DATE
+ - name: o_shippriority
+ type: INT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q4.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q4.yaml
new file mode 100644
index 0000000..1a05c98
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q4.yaml
@@ -0,0 +1,19 @@
+tables:
+ - name: q4
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q4.csv"
+ format:
+ type: csv
+ fields:
+ - name: o_orderpriority
+ type: VARCHAR
+ - name: order_count
+ type: BIGINT
+ field-delimiter: "|"
+ schema:
+ - name: o_orderpriority
+ type: VARCHAR
+ - name: order_count
+ type: BIGINT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q5.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q5.yaml
new file mode 100644
index 0000000..845b131
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q5.yaml
@@ -0,0 +1,19 @@
+tables:
+ - name: q5
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q5.csv"
+ format:
+ type: csv
+ fields:
+ - name: n_name
+ type: VARCHAR
+ - name: revenue
+ type: DOUBLE
+ field-delimiter: "|"
+ schema:
+ - name: n_name
+ type: VARCHAR
+ - name: revenue
+ type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q6.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q6.yaml
new file mode 100644
index 0000000..3c0612f
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q6.yaml
@@ -0,0 +1,15 @@
+tables:
+ - name: q6
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q6.csv"
+ format:
+ type: csv
+ fields:
+ - name: revenue
+ type: DOUBLE
+ field-delimiter: "|"
+ schema:
+ - name: revenue
+ type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q7.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q7.yaml
new file mode 100644
index 0000000..d11da65
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q7.yaml
@@ -0,0 +1,27 @@
+tables:
+ - name: q7
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q7.csv"
+ format:
+ type: csv
+ fields:
+ - name: supp_nation
+ type: VARCHAR
+ - name: cust_nation
+ type: VARCHAR
+ - name: l_year
+ type: BIGINT
+ - name: revenue
+ type: DOUBLE
+ field-delimiter: "|"
+ schema:
+ - name: supp_nation
+ type: VARCHAR
+ - name: cust_nation
+ type: VARCHAR
+ - name: l_year
+ type: BIGINT
+ - name: revenue
+ type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q8.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q8.yaml
new file mode 100644
index 0000000..88b48df
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q8.yaml
@@ -0,0 +1,19 @@
+tables:
+ - name: q8
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q8.csv"
+ format:
+ type: csv
+ fields:
+ - name: o_year
+ type: BIGINT
+ - name: mkt_share
+ type: DOUBLE
+ field-delimiter: "|"
+ schema:
+ - name: o_year
+ type: BIGINT
+ - name: mkt_share
+ type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q9.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q9.yaml
new file mode 100644
index 0000000..b2030c7
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q9.yaml
@@ -0,0 +1,23 @@
+tables:
+ - name: q9
+ type: sink-table
+ connector:
+ type: filesystem
+ path: "$RESULT_DIR/q9.csv"
+ format:
+ type: csv
+ fields:
+ - name: nation
+ type: VARCHAR
+ - name: o_year
+ type: BIGINT
+ - name: sum_profit
+ type: DOUBLE
+ field-delimiter: "|"
+ schema:
+ - name: nation
+ type: VARCHAR
+ - name: o_year
+ type: BIGINT
+ - name: sum_profit
+ type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/source.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/source.yaml
new file mode 100644
index 0000000..83c1558
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/source.yaml
@@ -0,0 +1,349 @@
+tables:
+ - name: customer
+ type: source-table
+ update-mode: append
+ connector:
+ type: filesystem
+ path: "$TABLE_DIR/customer.csv"
+ format:
+ type: csv
+ fields:
+ - name: c_custkey
+ type: BIGINT
+ - name: c_name
+ type: VARCHAR
+ - name: c_address
+ type: VARCHAR
+ - name: c_nationkey
+ type: BIGINT
+ - name: c_phone
+ type: VARCHAR
+ - name: c_acctbal
+ type: DOUBLE
+ - name: c_mktsegment
+ type: VARCHAR
+ - name: c_comment
+ type: VARCHAR
+ field-delimiter: "|"
+ line-delimiter: "\n"
+ comment-prefix: "--"
+ schema:
+ - name: c_custkey
+ type: BIGINT
+ - name: c_name
+ type: VARCHAR
+ - name: c_address
+ type: VARCHAR
+ - name: c_nationkey
+ type: BIGINT
+ - name: c_phone
+ type: VARCHAR
+ - name: c_acctbal
+ type: DOUBLE
+ - name: c_mktsegment
+ type: VARCHAR
+ - name: c_comment
+ type: VARCHAR
+ - name: lineitem
+ type: source-table
+ update-mode: append
+ connector:
+ type: filesystem
+ path: "$TABLE_DIR/lineitem.csv"
+ format:
+ type: csv
+ fields:
+ - name: l_orderkey
+ type: BIGINT
+ - name: l_partkey
+ type: BIGINT
+ - name: l_suppkey
+ type: BIGINT
+ - name: l_linenumber
+ type: INT
+ - name: l_quantity
+ type: DOUBLE
+ - name: l_extendedprice
+ type: DOUBLE
+ - name: l_discount
+ type: DOUBLE
+ - name: l_tax
+ type: DOUBLE
+ - name: l_returnflag
+ type: VARCHAR
+ - name: l_linestatus
+ type: VARCHAR
+ - name: l_shipdate
+ type: DATE
+ - name: l_commitdate
+ type: DATE
+ - name: l_receiptdate
+ type: DATE
+ - name: l_shipinstruct
+ type: VARCHAR
+ - name: l_shipmode
+ type: VARCHAR
+ - name: l_comment
+ type: VARCHAR
+ field-delimiter: "|"
+ line-delimiter: "\n"
+ comment-prefix: "--"
+ schema:
+ - name: l_orderkey
+ type: BIGINT
+ - name: l_partkey
+ type: BIGINT
+ - name: l_suppkey
+ type: BIGINT
+ - name: l_linenumber
+ type: INT
+ - name: l_quantity
+ type: DOUBLE
+ - name: l_extendedprice
+ type: DOUBLE
+ - name: l_discount
+ type: DOUBLE
+ - name: l_tax
+ type: DOUBLE
+ - name: l_returnflag
+ type: VARCHAR
+ - name: l_linestatus
+ type: VARCHAR
+ - name: l_shipdate
+ type: DATE
+ - name: l_commitdate
+ type: DATE
+ - name: l_receiptdate
+ type: DATE
+ - name: l_shipinstruct
+ type: VARCHAR
+ - name: l_shipmode
+ type: VARCHAR
+ - name: l_comment
+ type: VARCHAR
+ - name: nation
+ type: source-table
+ update-mode: append
+ connector:
+ type: filesystem
+ path: "$TABLE_DIR/nation.csv"
+ format:
+ type: csv
+ fields:
+ - name: n_nationkey
+ type: BIGINT
+ - name: n_name
+ type: VARCHAR
+ - name: n_regionkey
+ type: BIGINT
+ - name: n_comment
+ type: VARCHAR
+ field-delimiter: "|"
+ line-delimiter: "\n"
+ comment-prefix: "--"
+ schema:
+ - name: n_nationkey
+ type: BIGINT
+ - name: n_name
+ type: VARCHAR
+ - name: n_regionkey
+ type: BIGINT
+ - name: n_comment
+ type: VARCHAR
+ - name: orders
+ type: source-table
+ update-mode: append
+ connector:
+ type: filesystem
+ path: "$TABLE_DIR/orders.csv"
+ format:
+ type: csv
+ fields:
+ - name: o_orderkey
+ type: BIGINT
+ - name: o_custkey
+ type: BIGINT
+ - name: o_orderstatus
+ type: VARCHAR
+ - name: o_totalprice
+ type: DOUBLE
+ - name: o_orderdate
+ type: DATE
+ - name: o_orderpriority
+ type: VARCHAR
+ - name: o_clerk
+ type: VARCHAR
+ - name: o_shippriority
+ type: INT
+ - name: o_comment
+ type: VARCHAR
+ field-delimiter: "|"
+ line-delimiter: "\n"
+ comment-prefix: "--"
+ schema:
+ - name: o_orderkey
+ type: BIGINT
+ - name: o_custkey
+ type: BIGINT
+ - name: o_orderstatus
+ type: VARCHAR
+ - name: o_totalprice
+ type: DOUBLE
+ - name: o_orderdate
+ type: DATE
+ - name: o_orderpriority
+ type: VARCHAR
+ - name: o_clerk
+ type: VARCHAR
+ - name: o_shippriority
+ type: INT
+ - name: o_comment
+ type: VARCHAR
+ - name: part
+ type: source-table
+ update-mode: append
+ connector:
+ type: filesystem
+ path: "$TABLE_DIR/part.csv"
+ format:
+ type: csv
+ fields:
+ - name: p_partkey
+ type: BIGINT
+ - name: p_name
+ type: VARCHAR
+ - name: p_mfgr
+ type: VARCHAR
+ - name: p_brand
+ type: VARCHAR
+ - name: p_type
+ type: VARCHAR
+ - name: p_size
+ type: INT
+ - name: p_container
+ type: VARCHAR
+ - name: p_retailprice
+ type: DOUBLE
+ - name: p_comment
+ type: VARCHAR
+ field-delimiter: "|"
+ line-delimiter: "\n"
+ comment-prefix: "--"
+ schema:
+ - name: p_partkey
+ type: BIGINT
+ - name: p_name
+ type: VARCHAR
+ - name: p_mfgr
+ type: VARCHAR
+ - name: p_brand
+ type: VARCHAR
+ - name: p_type
+ type: VARCHAR
+ - name: p_size
+ type: INT
+ - name: p_container
+ type: VARCHAR
+ - name: p_retailprice
+ type: DOUBLE
+ - name: p_comment
+ type: VARCHAR
+ - name: partsupp
+ type: source-table
+ update-mode: append
+ connector:
+ type: filesystem
+ path: "$TABLE_DIR/partsupp.csv"
+ format:
+ type: csv
+ fields:
+ - name: ps_partkey
+ type: BIGINT
+ - name: ps_suppkey
+ type: BIGINT
+ - name: ps_availqty
+ type: INT
+ - name: ps_supplycost
+ type: DOUBLE
+ - name: ps_comment
+ type: VARCHAR
+ field-delimiter: "|"
+ line-delimiter: "\n"
+ comment-prefix: "--"
+ schema:
+ - name: ps_partkey
+ type: BIGINT
+ - name: ps_suppkey
+ type: BIGINT
+ - name: ps_availqty
+ type: INT
+ - name: ps_supplycost
+ type: DOUBLE
+ - name: ps_comment
+ type: VARCHAR
+ - name: region
+ type: source-table
+ update-mode: append
+ connector:
+ type: filesystem
+ path: "$TABLE_DIR/region.csv"
+ format:
+ type: csv
+ fields:
+ - name: r_regionkey
+ type: BIGINT
+ - name: r_name
+ type: VARCHAR
+ - name: r_comment
+ type: VARCHAR
+ field-delimiter: "|"
+ line-delimiter: "\n"
+ comment-prefix: "--"
+ schema:
+ - name: r_regionkey
+ type: BIGINT
+ - name: r_name
+ type: VARCHAR
+ - name: r_comment
+ type: VARCHAR
+ - name: supplier
+ type: source-table
+ update-mode: append
+ connector:
+ type: filesystem
+ path: "$TABLE_DIR/supplier.csv"
+ format:
+ type: csv
+ fields:
+ - name: s_suppkey
+ type: BIGINT
+ - name: s_name
+ type: VARCHAR
+ - name: s_address
+ type: VARCHAR
+ - name: s_nationkey
+ type: BIGINT
+ - name: s_phone
+ type: VARCHAR
+ - name: s_acctbal
+ type: DOUBLE
+ - name: s_comment
+ type: VARCHAR
+ field-delimiter: "|"
+ line-delimiter: "\n"
+ comment-prefix: "--"
+ schema:
+ - name: s_suppkey
+ type: BIGINT
+ - name: s_name
+ type: VARCHAR
+ - name: s_address
+ type: VARCHAR
+ - name: s_nationkey
+ type: BIGINT
+ - name: s_phone
+ type: VARCHAR
+ - name: s_acctbal
+ type: DOUBLE
+ - name: s_comment
+ type: VARCHAR
diff --git a/flink-end-to-end-tests/test-scripts/test_tpch.sh b/flink-end-to-end-tests/test-scripts/test_tpch.sh
new file mode 100755
index 0000000..ab4c61c
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_tpch.sh
@@ -0,0 +1,91 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -Eeuo pipefail
+
+SCALE="0.01"
+
+source "$(dirname "$0")"/common.sh
+
+################################################################################
+# Generate test data
+################################################################################
+
+echo "Generating test data..."
+
+TARGET_DIR="$END_TO_END_DIR/flink-tpch-test/target"
+TPCH_DATA_DIR="$END_TO_END_DIR/test-scripts/test-data/tpch"
+java -cp "$TARGET_DIR/flink-tpch-test-1.10-SNAPSHOT.jar:$TARGET_DIR/lib/*" org.apache.flink.table.tpch.TpchDataGenerator "$SCALE" "$TARGET_DIR"
+
+################################################################################
+# Prepare Flink
+################################################################################
+
+echo "Preparing Flink..."
+
+start_cluster
+
+################################################################################
+# Run SQL statements
+################################################################################
+
+TABLE_DIR="$TARGET_DIR/table"
+ORIGIN_QUERY_DIR="$TARGET_DIR/query"
+MODIFIED_QUERY_DIR="$TPCH_DATA_DIR/modified-query"
+EXPECTED_DIR="$TARGET_DIR/expected"
+RESULT_DIR="$TEST_DATA_DIR/result"
+SQL_CONF="$TEST_DATA_DIR/sql-client-session.conf"
+
+mkdir "$RESULT_DIR"
+
+SOURCES_YAML=$(cat "$TPCH_DATA_DIR/source.yaml")
+SOURCES_YAML=${SOURCES_YAML//\$TABLE_DIR/"$TABLE_DIR"}
+
+for i in {1..22}
+do
+ echo "Running query #$i..."
+
+ # First line in sink yaml is ignored
+ SINK_YAML=$(tail -n +2 "$TPCH_DATA_DIR/sink/q${i}.yaml")
+ SINK_YAML=${SINK_YAML//\$RESULT_DIR/"$RESULT_DIR"}
+
+ cat > "$SQL_CONF" << EOF
+${SOURCES_YAML}
+${SINK_YAML}
+execution:
+ planner: blink
+ type: batch
+ result-mode: table
+EOF
+
+ if [[ -e "$MODIFIED_QUERY_DIR/q$i.sql" ]]
+ then
+ SQL_STATEMENT="INSERT INTO q$i $(cat "$MODIFIED_QUERY_DIR/q$i.sql")"
+ else
+ SQL_STATEMENT="INSERT INTO q$i $(cat "$ORIGIN_QUERY_DIR/q$i.sql")"
+ fi
+
+ JOB_ID=$("$FLINK_DIR/bin/sql-client.sh" embedded \
+ --environment "$SQL_CONF" \
+ --update "$SQL_STATEMENT" | grep "Job ID:" | sed 's/.* //g')
+
+ wait_job_terminal_state "$JOB_ID" "FINISHED"
+
+ java -cp "$TARGET_DIR/flink-tpch-test-1.10-SNAPSHOT.jar" org.apache.flink.table.tpch.TpchResultComparator "$EXPECTED_DIR/q$i.csv" "$RESULT_DIR/q$i.csv"
+done
diff --git a/pom.xml b/pom.xml
index 44e21ac..ef17b18 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1403,7 +1403,7 @@ under the License.
<exclude>flink-table/flink-table-planner-blink/src/test/resources/digest/*.out</exclude>
<exclude>flink-table/flink-table-planner-blink/src/test/resources/explain/*.out</exclude>
<exclude>flink-yarn/src/test/resources/krb5.keytab</exclude>
- <exclude>flink-end-to-end-tests/test-scripts/test-data/*</exclude>
+ <exclude>flink-end-to-end-tests/test-scripts/test-data/**</exclude>
<exclude>flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/keystore.jks</exclude>
<exclude>flink-connectors/flink-connector-kafka/src/test/resources/**</exclude>
<exclude>flink-connectors/flink-connector-kafka-0.11/src/test/resources/**</exclude>