You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/02 03:52:13 UTC

[flink] 02/02: [FLINK-13436][e2e] Add TPC-H queries as E2E tests

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 274ff58e52c826c5f358ddbc539c96de4d8af801
Author: TsReaper <ts...@gmail.com>
AuthorDate: Thu Aug 1 15:55:18 2019 +0800

    [FLINK-13436][e2e] Add TPC-H queries as E2E tests
    
    This closes #9312
---
 flink-end-to-end-tests/flink-tpch-test/pom.xml     |  60 ++++
 .../apache/flink/table/tpch/TpchDataGenerator.java | 127 ++++++++
 .../flink/table/tpch/TpchResultComparator.java     | 123 ++++++++
 flink-end-to-end-tests/pom.xml                     |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh        |   2 +
 .../test-data/tpch/modified-query/q11.sql          |  30 ++
 .../test-data/tpch/modified-query/q15.sql          |  69 ++++
 .../test-data/tpch/modified-query/q20.sql          |  36 +++
 .../test-data/tpch/modified-query/q6.sql           |  11 +
 .../test-scripts/test-data/tpch/sink/q1.yaml       |  51 +++
 .../test-scripts/test-data/tpch/sink/q10.yaml      |  43 +++
 .../test-scripts/test-data/tpch/sink/q11.yaml      |  19 ++
 .../test-scripts/test-data/tpch/sink/q12.yaml      |  23 ++
 .../test-scripts/test-data/tpch/sink/q13.yaml      |  19 ++
 .../test-scripts/test-data/tpch/sink/q14.yaml      |  15 +
 .../test-scripts/test-data/tpch/sink/q15.yaml      |  31 ++
 .../test-scripts/test-data/tpch/sink/q16.yaml      |  27 ++
 .../test-scripts/test-data/tpch/sink/q17.yaml      |  15 +
 .../test-scripts/test-data/tpch/sink/q18.yaml      |  35 +++
 .../test-scripts/test-data/tpch/sink/q19.yaml      |  15 +
 .../test-scripts/test-data/tpch/sink/q2.yaml       |  43 +++
 .../test-scripts/test-data/tpch/sink/q20.yaml      |  19 ++
 .../test-scripts/test-data/tpch/sink/q21.yaml      |  19 ++
 .../test-scripts/test-data/tpch/sink/q22.yaml      |  23 ++
 .../test-scripts/test-data/tpch/sink/q3.yaml       |  27 ++
 .../test-scripts/test-data/tpch/sink/q4.yaml       |  19 ++
 .../test-scripts/test-data/tpch/sink/q5.yaml       |  19 ++
 .../test-scripts/test-data/tpch/sink/q6.yaml       |  15 +
 .../test-scripts/test-data/tpch/sink/q7.yaml       |  27 ++
 .../test-scripts/test-data/tpch/sink/q8.yaml       |  19 ++
 .../test-scripts/test-data/tpch/sink/q9.yaml       |  23 ++
 .../test-scripts/test-data/tpch/source.yaml        | 349 +++++++++++++++++++++
 flink-end-to-end-tests/test-scripts/test_tpch.sh   |  91 ++++++
 pom.xml                                            |   2 +-
 34 files changed, 1446 insertions(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/flink-tpch-test/pom.xml b/flink-end-to-end-tests/flink-tpch-test/pom.xml
new file mode 100644
index 0000000..e317f31
--- /dev/null
+++ b/flink-end-to-end-tests/flink-tpch-test/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.10-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-tpch-test</artifactId>
+
+	<dependencies>
+		<dependency>
+			<groupId>io.airlift.tpch</groupId>
+			<artifactId>tpch</artifactId>
+			<version>0.10</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<version>2.10</version>
+				<executions>
+					<execution>
+						<id>copy-dependencies</id>
+						<phase>package</phase>
+						<goals>
+							<goal>copy-dependencies</goal>
+						</goals>
+						<configuration>
+							<outputDirectory>${project.build.directory}/lib</outputDirectory>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchDataGenerator.java b/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchDataGenerator.java
new file mode 100644
index 0000000..4646a02
--- /dev/null
+++ b/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchDataGenerator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.tpch;
+
+import io.airlift.tpch.TpchEntity;
+import io.airlift.tpch.TpchTable;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+
+/**
+ * TPC-H test data generator.
+ */
+public class TpchDataGenerator {
+
+	public static final int QUERY_NUM = 22;
+
+	public static void main(String[] args) throws IOException {
+		if (args.length != 2) {
+			System.out.println("Exactly 1 double value and 1 path should be provided as argument");
+			return;
+		}
+
+		double scale = Double.valueOf(args[0]);
+		String path = args[1];
+		generateTable(scale, path);
+		generateQuery(path);
+		generateExpected(path);
+	}
+
+	private static void generateTable(double scale, String path) throws IOException {
+		File dir = new File(path + "/table");
+		dir.mkdir();
+
+		for (TpchTable table : TpchTable.getTables()) {
+			Iterable generator = table.createGenerator(scale, 1, 1);
+
+			StringBuilder builder = new StringBuilder();
+			generator.forEach(s -> {
+				String line = ((TpchEntity) s).toLine().trim();
+				if (line.endsWith("|")) {
+					line = line.substring(0, line.length() - 1);
+				}
+				builder.append(line).append('\n');
+			});
+
+			try (BufferedWriter writer = new BufferedWriter(
+				new OutputStreamWriter(
+					new FileOutputStream(path + "/table/" + table.getTableName() + ".csv")))
+			) {
+				writer.write(builder.toString());
+			}
+		}
+	}
+
+	private static void generateQuery(String path) throws IOException {
+		File dir = new File(path + "/query");
+		dir.mkdir();
+
+		for (int i = 0; i < QUERY_NUM; i++) {
+			try (
+				InputStream in = TpchDataGenerator.class.getResourceAsStream(
+					"/io/airlift/tpch/queries/q" + (i + 1) + ".sql");
+				OutputStream out = new FileOutputStream(path + "/query/q" + (i + 1) + ".sql")
+			) {
+				byte[] buffer = new byte[4096];
+				int bytesRead = 0;
+				while ((bytesRead = in.read(buffer)) > 0) {
+					out.write(buffer, 0, bytesRead);
+				}
+			}
+		}
+	}
+
+	private static void generateExpected(String path) throws IOException {
+		File dir = new File(path + "/expected");
+		dir.mkdir();
+
+		for (int i = 0; i < QUERY_NUM; i++) {
+			try (
+				BufferedReader reader = new BufferedReader(
+					new InputStreamReader(TpchDataGenerator.class.getResourceAsStream(
+						"/io/airlift/tpch/queries/q" + (i + 1) + ".result")));
+				BufferedWriter writer = new BufferedWriter(
+					new OutputStreamWriter(
+						new FileOutputStream(path + "/expected/q" + (i + 1) + ".csv")))
+			) {
+				int lineNumber = 0;
+				String line;
+				while ((line = reader.readLine()) != null) {
+					line = line.trim().replace("null", "");
+					lineNumber++;
+					if (lineNumber == 1) {
+						continue;
+					}
+					if (line.length() > 0 && line.endsWith("|")) {
+						line = line.substring(0, line.length() - 1);
+					}
+					writer.write(line + "\n");
+				}
+			}
+		}
+	}
+}
diff --git a/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchResultComparator.java b/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchResultComparator.java
new file mode 100644
index 0000000..e9bbea3
--- /dev/null
+++ b/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchResultComparator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.tpch;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+
+/**
+ * Result comparator for TPC-H test, according to the TPC-H standard specification v2.18.0.
+ */
+public class TpchResultComparator {
+
+	public static void main(String[] args) throws IOException {
+		if (args.length != 2) {
+			System.out.println(
+				"Exactly 2 paths must be provided, the expected result path and the actual result path");
+			System.exit(1);
+		}
+
+		String expectedPath = args[0];
+		String actualPath = args[1];
+
+		try (
+			BufferedReader expectedReader = new BufferedReader(new FileReader(expectedPath));
+			BufferedReader actualReader = new BufferedReader(new FileReader(actualPath))
+		) {
+			int expectedLineNum = 0;
+			int actualLineNum = 0;
+
+			String expectedLine, actualLine;
+			while (
+				(expectedLine = expectedReader.readLine()) != null &&
+					(actualLine = actualReader.readLine()) != null
+			) {
+				String[] expected = expectedLine.split("\\|");
+				expectedLineNum++;
+				String[] actual = actualLine.split("\\|");
+				actualLineNum++;
+
+				if (expected.length != actual.length) {
+					System.out.println(
+						"Incorrect number of columns on line " + actualLineNum +
+							"! Expecting " + expected.length + " columns, but found " + actual.length + " columns.");
+					System.exit(1);
+				}
+				for (int i = 0; i < expected.length; i++) {
+					boolean failed;
+					try {
+						long e = Long.valueOf(expected[i]);
+						long a = Long.valueOf(actual[i]);
+						failed = (e != a);
+					} catch (NumberFormatException nfe) {
+						try {
+							double e = Double.valueOf(expected[i]);
+							double a = Double.valueOf(actual[i]);
+							if (e < 0 && a > 0 || e > 0 && a < 0) {
+								failed = true;
+							} else {
+								if (e < 0) {
+									e = -e;
+									a = -a;
+								}
+								double t = round(a, 2);
+								// defined in TPC-H standard specification v2.18.0 section 2.1.3.5
+								failed = (e * 0.99 > t || e * 1.01 < t);
+							}
+						} catch (NumberFormatException nfe2) {
+							failed = !expected[i].trim().equals(actual[i].trim());
+						}
+					}
+					if (failed) {
+						System.out.println("Incorrect result on line " + actualLineNum + " column " + (i + 1) +
+							"! Expecting " + expected[i] + ", but found " + actual[i] + ".");
+						System.exit(1);
+					}
+				}
+			}
+
+			while (expectedReader.readLine() != null) {
+				expectedLineNum++;
+			}
+			while (actualReader.readLine() != null) {
+				actualLineNum++;
+			}
+			if (expectedLineNum != actualLineNum) {
+				System.out.println(
+					"Incorrect number of lines! Expecting " + expectedLineNum +
+						" lines, but found " + actualLineNum + " lines.");
+				System.exit(1);
+			}
+		}
+	}
+
+	/**
+	 * Rounding function defined in TPC-H standard specification v2.18.0 chapter 10.
+	 */
+	private static double round(double x, int m) {
+		if (x < 0) {
+			throw new IllegalArgumentException("x must be non-negative");
+		}
+		double y = x + 5 * Math.pow(10, -m - 1);
+		double z = y * Math.pow(10, m);
+		double q = Math.floor(z);
+		return q / Math.pow(10, m);
+	}
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index d676291..85af090 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -68,6 +68,7 @@ under the License.
 		<module>flink-streaming-kafka011-test</module>
 		<module>flink-streaming-kafka010-test</module>
 		<module>flink-plugins-test</module>
+		<module>flink-tpch-test</module>
 	</modules>
 
 	<!-- See main pom.xml for explanation of profiles -->
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index b8a1a40..d50b328 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -152,6 +152,8 @@ run_test "SQL Client end-to-end test for Kafka 0.10" "$END_TO_END_DIR/test-scrip
 run_test "SQL Client end-to-end test for Kafka 0.11" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh"
 run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"
 
+run_test "TPC-H end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_tpch.sh"
+
 run_test "Heavy deployment end-to-end test" "$END_TO_END_DIR/test-scripts/test_heavy_deployment.sh" "skip_check_exceptions"
 
 run_test "ConnectedComponents iterations with high parallelism end-to-end test" "$END_TO_END_DIR/test-scripts/test_high_parallelism_iterations.sh 25"
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q11.sql b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q11.sql
new file mode 100644
index 0000000..be2cd09
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q11.sql
@@ -0,0 +1,30 @@
+-- database: presto; groups: tpch; tables: partsupp,supplier,nation
+SELECT
+  ps_partkey,
+--  sum(ps_supplycost * ps_availqty) AS value
+  sum(ps_supplycost * ps_availqty) AS `value`
+FROM
+  partsupp,
+  supplier,
+  nation
+WHERE
+  ps_suppkey = s_suppkey
+  AND s_nationkey = n_nationkey
+  AND n_name = 'GERMANY'
+GROUP BY
+  ps_partkey
+HAVING
+  sum(ps_supplycost * ps_availqty) > (
+    SELECT sum(ps_supplycost * ps_availqty) * 0.0001
+    FROM
+      partsupp,
+      supplier,
+      nation
+    WHERE
+      ps_suppkey = s_suppkey
+      AND s_nationkey = n_nationkey
+      AND n_name = 'GERMANY'
+  )
+ORDER BY
+--  value DESC
+  `value` DESC
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q15.sql b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q15.sql
new file mode 100644
index 0000000..8182b3e
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q15.sql
@@ -0,0 +1,69 @@
+-- database: presto; groups: tpch; tables: lineitem,supplier
+-- CREATE OR REPLACE VIEW revenue AS
+--   SELECT
+--     l_suppkey AS supplier_no,
+--     sum(l_extendedprice * (1 - l_discount)) AS total_revenue
+--   FROM
+--     lineitem
+--   WHERE
+--     l_shipdate >= DATE '1996-01-01'
+--     AND l_shipdate < DATE '1996-01-01' + INTERVAL '3' MONTH
+-- GROUP BY
+--   l_suppkey;
+--
+-- SELECT
+--   s_suppkey,
+--   s_name,
+--   s_address,
+--   s_phone,
+--   total_revenue
+-- FROM
+--   supplier,
+--   revenue
+-- WHERE
+--   s_suppkey = supplier_no
+--   AND total_revenue = (
+--     SELECT max(total_revenue)
+--     FROM
+--       revenue
+--   )
+-- ORDER BY
+--   s_suppkey;
+-- Blink does not support view
+
+SELECT
+  s_suppkey,
+  s_name,
+  s_address,
+  s_phone,
+  total_revenue
+FROM
+  supplier, (
+  SELECT
+    l_suppkey AS supplier_no,
+    sum(l_extendedprice * (1 - l_discount)) AS total_revenue
+  FROM
+    lineitem
+  WHERE
+    l_shipdate >= DATE '1996-01-01'
+    AND l_shipdate < DATE '1996-01-01' + INTERVAL '3' MONTH
+  GROUP BY
+    l_suppkey) AS revenue
+WHERE
+  s_suppkey = supplier_no
+  AND total_revenue = (
+    SELECT max(total_revenue)
+    FROM (
+      SELECT
+        l_suppkey AS supplier_no,
+        sum(l_extendedprice * (1 - l_discount)) AS total_revenue
+      FROM
+        lineitem
+      WHERE
+        l_shipdate >= DATE '1996-01-01'
+        AND l_shipdate < DATE '1996-01-01' + INTERVAL '3' MONTH
+      GROUP BY
+        l_suppkey) AS revenue
+  )
+ORDER BY
+  s_suppkey;
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q20.sql b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q20.sql
new file mode 100644
index 0000000..7445398
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q20.sql
@@ -0,0 +1,36 @@
+-- database: presto; groups: tpch; tables: supplier,nation,partsupp,lineitem,part
+SELECT
+  s_name,
+  s_address
+FROM
+  supplier, nation
+WHERE
+  s_suppkey IN (
+    SELECT ps_suppkey
+    FROM
+      partsupp
+    WHERE
+      ps_partkey IN (
+        SELECT p_partkey
+        FROM
+          part
+        WHERE
+          p_name LIKE 'forest%'
+      )
+      AND ps_availqty > (
+        SELECT 0.5 * sum(l_quantity)
+        FROM
+          lineitem
+        WHERE
+          l_partkey = ps_partkey
+          AND l_suppkey = ps_suppkey
+          -- AND l_shipdate >= date('1994-01-01')
+          -- AND l_shipdate < date('1994-01-01') + interval '1' YEAR
+          -- Blink does not support the above format
+          AND l_shipdate >= date '1994-01-01'
+          AND l_shipdate < date '1994-01-01' + interval '1' YEAR
+)
+)
+AND s_nationkey = n_nationkey
+AND n_name = 'CANADA'
+ORDER BY s_name
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q6.sql b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q6.sql
new file mode 100644
index 0000000..28fb52e
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q6.sql
@@ -0,0 +1,11 @@
+-- database: presto; groups: tpch; tables: lineitem
+SELECT sum(l_extendedprice * l_discount) AS revenue
+FROM
+  lineitem
+WHERE
+  l_shipdate >= DATE '1994-01-01'
+  AND l_shipdate < DATE '1994-01-01' + INTERVAL '1' YEAR
+-- AND l_discount BETWEEN decimal '0.06' - decimal '0.01' AND decimal '0.06' + decimal '0.01'
+-- Blink currently does not support the above feature
+AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
+AND l_quantity < 24
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q1.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q1.yaml
new file mode 100644
index 0000000..be89601
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q1.yaml
@@ -0,0 +1,51 @@
+tables:
+  - name: q1
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q1.csv"
+    format:
+      type: csv
+      fields:
+        - name: l_returnflag
+          type: VARCHAR
+        - name: l_linestatus
+          type: VARCHAR
+        - name: sum_qty
+          type: DOUBLE
+        - name: sum_base_price
+          type: DOUBLE
+        - name: sum_disc_price
+          type: DOUBLE
+        - name: sum_charge
+          type: DOUBLE
+        - name: avg_qty
+          type: DOUBLE
+        - name: avg_price
+          type: DOUBLE
+        - name: avg_disc
+          type: DOUBLE
+        - name: count_order
+          type: BIGINT
+      field-delimiter: "|"
+    schema:
+      - name: l_returnflag
+        type: VARCHAR
+      - name: l_linestatus
+        type: VARCHAR
+      - name: sum_qty
+        type: DOUBLE
+      - name: sum_base_price
+        type: DOUBLE
+      - name: sum_disc_price
+        type: DOUBLE
+      - name: sum_charge
+        type: DOUBLE
+      - name: avg_qty
+        type: DOUBLE
+      - name: avg_price
+        type: DOUBLE
+      - name: avg_disc
+        type: DOUBLE
+      - name: count_order
+        type: BIGINT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q10.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q10.yaml
new file mode 100644
index 0000000..3fcbb7d
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q10.yaml
@@ -0,0 +1,43 @@
+tables:
+  - name: q10
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q10.csv"
+    format:
+      type: csv
+      fields:
+        - name: c_custkey
+          type: BIGINT
+        - name: c_name
+          type: VARCHAR
+        - name: revenue
+          type: DOUBLE
+        - name: c_acctbal
+          type: DOUBLE
+        - name: n_name
+          type: VARCHAR
+        - name: c_address
+          type: VARCHAR
+        - name: c_phone
+          type: VARCHAR
+        - name: c_comment
+          type: VARCHAR
+      field-delimiter: "|"
+    schema:
+      - name: c_custkey
+        type: BIGINT
+      - name: c_name
+        type: VARCHAR
+      - name: revenue
+        type: DOUBLE
+      - name: c_acctbal
+        type: DOUBLE
+      - name: n_name
+        type: VARCHAR
+      - name: c_address
+        type: VARCHAR
+      - name: c_phone
+        type: VARCHAR
+      - name: c_comment
+        type: VARCHAR
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q11.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q11.yaml
new file mode 100644
index 0000000..c729305
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q11.yaml
@@ -0,0 +1,19 @@
+tables:
+  - name: q11
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q11.csv"
+    format:
+      type: csv
+      fields:
+        - name: ps_partkey
+          type: BIGINT
+        - name: value
+          type: DOUBLE
+      field-delimiter: "|"
+    schema:
+      - name: ps_partkey
+        type: BIGINT
+      - name: value
+        type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q12.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q12.yaml
new file mode 100644
index 0000000..d990cf0
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q12.yaml
@@ -0,0 +1,23 @@
+tables:
+  - name: q12
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q12.csv"
+    format:
+      type: csv
+      fields:
+        - name: l_shipmode
+          type: VARCHAR
+        - name: high_line_count
+          type: INT
+        - name: low_line_count
+          type: INT
+      field-delimiter: "|"
+    schema:
+      - name: l_shipmode
+        type: VARCHAR
+      - name: high_line_count
+        type: INT
+      - name: low_line_count
+        type: INT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q13.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q13.yaml
new file mode 100644
index 0000000..d3c30f9
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q13.yaml
@@ -0,0 +1,19 @@
+tables:
+  - name: q13
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q13.csv"
+    format:
+      type: csv
+      fields:
+        - name: c_count
+          type: BIGINT
+        - name: custdist
+          type: BIGINT
+      field-delimiter: "|"
+    schema:
+      - name: c_count
+        type: BIGINT
+      - name: custdist
+        type: BIGINT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q14.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q14.yaml
new file mode 100644
index 0000000..551339b
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q14.yaml
@@ -0,0 +1,15 @@
+tables:
+  - name: q14
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q14.csv"
+    format:
+      type: csv
+      fields:
+        - name: promo_revenue
+          type: DOUBLE
+      field-delimiter: "|"
+    schema:
+      - name: promo_revenue
+        type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q15.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q15.yaml
new file mode 100644
index 0000000..069757a
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q15.yaml
@@ -0,0 +1,31 @@
+tables:
+  - name: q15
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q15.csv"
+    format:
+      type: csv
+      fields:
+        - name: s_suppkey
+          type: BIGINT
+        - name: s_name
+          type: VARCHAR
+        - name: s_address
+          type: VARCHAR
+        - name: s_phone
+          type: VARCHAR
+        - name: total_revenue
+          type: DOUBLE
+      field-delimiter: "|"
+    schema:
+      - name: s_suppkey
+        type: BIGINT
+      - name: s_name
+        type: VARCHAR
+      - name: s_address
+        type: VARCHAR
+      - name: s_phone
+        type: VARCHAR
+      - name: total_revenue
+        type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q16.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q16.yaml
new file mode 100644
index 0000000..95b2293
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q16.yaml
@@ -0,0 +1,27 @@
+tables:
+  - name: q16
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q16.csv"
+    format:
+      type: csv
+      fields:
+        - name: p_brand
+          type: VARCHAR
+        - name: p_type
+          type: VARCHAR
+        - name: p_size
+          type: INT
+        - name: supplier_cnt
+          type: BIGINT
+      field-delimiter: "|"
+    schema:
+      - name: p_brand
+        type: VARCHAR
+      - name: p_type
+        type: VARCHAR
+      - name: p_size
+        type: INT
+      - name: supplier_cnt
+        type: BIGINT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q17.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q17.yaml
new file mode 100644
index 0000000..bf618a7
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q17.yaml
@@ -0,0 +1,15 @@
+tables:
+  - name: q17
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q17.csv"
+    format:
+      type: csv
+      fields:
+        - name: avg_yearly
+          type: DOUBLE
+      field-delimiter: "|"
+    schema:
+      - name: avg_yearly
+        type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q18.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q18.yaml
new file mode 100644
index 0000000..303c8b4
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q18.yaml
@@ -0,0 +1,35 @@
+tables:
+  - name: q18
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q18.csv"
+    format:
+      type: csv
+      fields:
+        - name: c_name
+          type: VARCHAR
+        - name: c_custkey
+          type: BIGINT
+        - name: o_orderkey
+          type: BIGINT
+        - name: o_orderdate
+          type: DATE
+        - name: o_totalprice
+          type: DOUBLE
+        - name: sum(l_quantity)
+          type: DOUBLE
+      field-delimiter: "|"
+    schema:
+      - name: c_name
+        type: VARCHAR
+      - name: c_custkey
+        type: BIGINT
+      - name: o_orderkey
+        type: BIGINT
+      - name: o_orderdate
+        type: DATE
+      - name: o_totalprice
+        type: DOUBLE
+      - name: sum(l_quantity)
+        type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q19.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q19.yaml
new file mode 100644
index 0000000..f84d177
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q19.yaml
@@ -0,0 +1,15 @@
+tables:
+  - name: q19
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q19.csv"
+    format:
+      type: csv
+      fields:
+        - name: revenue
+          type: DOUBLE
+      field-delimiter: "|"
+    schema:
+      - name: revenue
+        type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q2.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q2.yaml
new file mode 100644
index 0000000..17f58b1
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q2.yaml
@@ -0,0 +1,43 @@
+tables:
+  - name: q2
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q2.csv"
+    format:
+      type: csv
+      fields:
+        - name: s_acctbal
+          type: DOUBLE
+        - name: s_name
+          type: VARCHAR
+        - name: n_name
+          type: VARCHAR
+        - name: p_partkey
+          type: BIGINT
+        - name: p_mfgr
+          type: VARCHAR
+        - name: s_addres
+          type: VARCHAR
+        - name: s_phone
+          type: VARCHAR
+        - name: s_comment
+          type: VARCHAR
+      field-delimiter: "|"
+    schema:
+      - name: s_acctbal
+        type: DOUBLE
+      - name: s_name
+        type: VARCHAR
+      - name: n_name
+        type: VARCHAR
+      - name: p_partkey
+        type: BIGINT
+      - name: p_mfgr
+        type: VARCHAR
+      - name: s_addres
+        type: VARCHAR
+      - name: s_phone
+        type: VARCHAR
+      - name: s_comment
+        type: VARCHAR
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q20.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q20.yaml
new file mode 100644
index 0000000..ca58a53
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q20.yaml
@@ -0,0 +1,19 @@
+tables:
+  - name: q20
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q20.csv"
+    format:
+      type: csv
+      fields:
+        - name: s_name
+          type: VARCHAR
+        - name: s_address
+          type: VARCHAR
+      field-delimiter: "|"
+    schema:
+      - name: s_name
+        type: VARCHAR
+      - name: s_address
+        type: VARCHAR
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q21.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q21.yaml
new file mode 100644
index 0000000..760573f
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q21.yaml
@@ -0,0 +1,19 @@
+tables:
+  - name: q21
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q21.csv"
+    format:
+      type: csv
+      fields:
+        - name: s_name
+          type: VARCHAR
+        - name: numwait
+          type: BIGINT
+      field-delimiter: "|"
+    schema:
+      - name: s_name
+        type: VARCHAR
+      - name: numwait
+        type: BIGINT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q22.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q22.yaml
new file mode 100644
index 0000000..f39035e
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q22.yaml
@@ -0,0 +1,23 @@
+tables:
+  - name: q22
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q22.csv"
+    format:
+      type: csv
+      fields:
+        - name: cntrycode
+          type: VARCHAR
+        - name: numcust
+          type: BIGINT
+        - name: totacctbal
+          type: DOUBLE
+      field-delimiter: "|"
+    schema:
+      - name: cntrycode
+        type: VARCHAR
+      - name: numcust
+        type: BIGINT
+      - name: totacctbal
+        type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q3.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q3.yaml
new file mode 100644
index 0000000..8617e50
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q3.yaml
@@ -0,0 +1,27 @@
+tables:
+  - name: q3
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q3.csv"
+    format:
+      type: csv
+      fields:
+        - name: l_orderkey
+          type: BIGINT
+        - name: revenue
+          type: DOUBLE
+        - name: o_orderdate
+          type: DATE
+        - name: o_shippriority
+          type: INT
+      field-delimiter: "|"
+    schema:
+      - name: l_orderkey
+        type: BIGINT
+      - name: revenue
+        type: DOUBLE
+      - name: o_orderdate
+        type: DATE
+      - name: o_shippriority
+        type: INT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q4.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q4.yaml
new file mode 100644
index 0000000..1a05c98
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q4.yaml
@@ -0,0 +1,19 @@
+tables:
+  - name: q4
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q4.csv"
+    format:
+      type: csv
+      fields:
+        - name: o_orderpriority
+          type: VARCHAR
+        - name: order_count
+          type: BIGINT
+      field-delimiter: "|"
+    schema:
+      - name: o_orderpriority
+        type: VARCHAR
+      - name: order_count
+        type: BIGINT
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q5.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q5.yaml
new file mode 100644
index 0000000..845b131
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q5.yaml
@@ -0,0 +1,19 @@
+tables:
+  - name: q5
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q5.csv"
+    format:
+      type: csv
+      fields:
+        - name: n_name
+          type: VARCHAR
+        - name: revenue
+          type: DOUBLE
+      field-delimiter: "|"
+    schema:
+      - name: n_name
+        type: VARCHAR
+      - name: revenue
+        type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q6.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q6.yaml
new file mode 100644
index 0000000..3c0612f
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q6.yaml
@@ -0,0 +1,15 @@
+tables:
+  - name: q6
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q6.csv"
+    format:
+      type: csv
+      fields:
+        - name: revenue
+          type: DOUBLE
+      field-delimiter: "|"
+    schema:
+      - name: revenue
+        type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q7.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q7.yaml
new file mode 100644
index 0000000..d11da65
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q7.yaml
@@ -0,0 +1,27 @@
+tables:
+  - name: q7
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q7.csv"
+    format:
+      type: csv
+      fields:
+        - name: supp_nation
+          type: VARCHAR
+        - name: cust_nation
+          type: VARCHAR
+        - name: l_year
+          type: BIGINT
+        - name: revenue
+          type: DOUBLE
+      field-delimiter: "|"
+    schema:
+      - name: supp_nation
+        type: VARCHAR
+      - name: cust_nation
+        type: VARCHAR
+      - name: l_year
+        type: BIGINT
+      - name: revenue
+        type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q8.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q8.yaml
new file mode 100644
index 0000000..88b48df
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q8.yaml
@@ -0,0 +1,19 @@
+tables:
+  - name: q8
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q8.csv"
+    format:
+      type: csv
+      fields:
+        - name: o_year
+          type: BIGINT
+        - name: mkt_share
+          type: DOUBLE
+      field-delimiter: "|"
+    schema:
+      - name: o_year
+        type: BIGINT
+      - name: mkt_share
+        type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q9.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q9.yaml
new file mode 100644
index 0000000..b2030c7
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q9.yaml
@@ -0,0 +1,23 @@
+tables:
+  - name: q9
+    type: sink-table
+    connector:
+      type: filesystem
+      path: "$RESULT_DIR/q9.csv"
+    format:
+      type: csv
+      fields:
+        - name: nation
+          type: VARCHAR
+        - name: o_year
+          type: BIGINT
+        - name: sum_profit
+          type: DOUBLE
+      field-delimiter: "|"
+    schema:
+      - name: nation
+        type: VARCHAR
+      - name: o_year
+        type: BIGINT
+      - name: sum_profit
+        type: DOUBLE
diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/source.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/source.yaml
new file mode 100644
index 0000000..83c1558
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/source.yaml
@@ -0,0 +1,349 @@
+tables:
+  - name: customer
+    type: source-table
+    update-mode: append
+    connector:
+      type: filesystem
+      path: "$TABLE_DIR/customer.csv"
+    format:
+      type: csv
+      fields:
+        - name: c_custkey
+          type: BIGINT
+        - name: c_name
+          type: VARCHAR
+        - name: c_address
+          type: VARCHAR
+        - name: c_nationkey
+          type: BIGINT
+        - name: c_phone
+          type: VARCHAR
+        - name: c_acctbal
+          type: DOUBLE
+        - name: c_mktsegment
+          type: VARCHAR
+        - name: c_comment
+          type: VARCHAR
+      field-delimiter: "|"
+      line-delimiter: "\n"
+      comment-prefix: "--"
+    schema:
+      - name: c_custkey
+        type: BIGINT
+      - name: c_name
+        type: VARCHAR
+      - name: c_address
+        type: VARCHAR
+      - name: c_nationkey
+        type: BIGINT
+      - name: c_phone
+        type: VARCHAR
+      - name: c_acctbal
+        type: DOUBLE
+      - name: c_mktsegment
+        type: VARCHAR
+      - name: c_comment
+        type: VARCHAR
+  - name: lineitem
+    type: source-table
+    update-mode: append
+    connector:
+      type: filesystem
+      path: "$TABLE_DIR/lineitem.csv"
+    format:
+      type: csv
+      fields:
+        - name: l_orderkey
+          type: BIGINT
+        - name: l_partkey
+          type: BIGINT
+        - name: l_suppkey
+          type: BIGINT
+        - name: l_linenumber
+          type: INT
+        - name: l_quantity
+          type: DOUBLE
+        - name: l_extendedprice
+          type: DOUBLE
+        - name: l_discount
+          type: DOUBLE
+        - name: l_tax
+          type: DOUBLE
+        - name: l_returnflag
+          type: VARCHAR
+        - name: l_linestatus
+          type: VARCHAR
+        - name: l_shipdate
+          type: DATE
+        - name: l_commitdate
+          type: DATE
+        - name: l_receiptdate
+          type: DATE
+        - name: l_shipinstruct
+          type: VARCHAR
+        - name: l_shipmode
+          type: VARCHAR
+        - name: l_comment
+          type: VARCHAR
+      field-delimiter: "|"
+      line-delimiter: "\n"
+      comment-prefix: "--"
+    schema:
+      - name: l_orderkey
+        type: BIGINT
+      - name: l_partkey
+        type: BIGINT
+      - name: l_suppkey
+        type: BIGINT
+      - name: l_linenumber
+        type: INT
+      - name: l_quantity
+        type: DOUBLE
+      - name: l_extendedprice
+        type: DOUBLE
+      - name: l_discount
+        type: DOUBLE
+      - name: l_tax
+        type: DOUBLE
+      - name: l_returnflag
+        type: VARCHAR
+      - name: l_linestatus
+        type: VARCHAR
+      - name: l_shipdate
+        type: DATE
+      - name: l_commitdate
+        type: DATE
+      - name: l_receiptdate
+        type: DATE
+      - name: l_shipinstruct
+        type: VARCHAR
+      - name: l_shipmode
+        type: VARCHAR
+      - name: l_comment
+        type: VARCHAR
+  - name: nation
+    type: source-table
+    update-mode: append
+    connector:
+      type: filesystem
+      path: "$TABLE_DIR/nation.csv"
+    format:
+      type: csv
+      fields:
+        - name: n_nationkey
+          type: BIGINT
+        - name: n_name
+          type: VARCHAR
+        - name: n_regionkey
+          type: BIGINT
+        - name: n_comment
+          type: VARCHAR
+      field-delimiter: "|"
+      line-delimiter: "\n"
+      comment-prefix: "--"
+    schema:
+      - name: n_nationkey
+        type: BIGINT
+      - name: n_name
+        type: VARCHAR
+      - name: n_regionkey
+        type: BIGINT
+      - name: n_comment
+        type: VARCHAR
+  - name: orders
+    type: source-table
+    update-mode: append
+    connector:
+      type: filesystem
+      path: "$TABLE_DIR/orders.csv"
+    format:
+      type: csv
+      fields:
+        - name: o_orderkey
+          type: BIGINT
+        - name: o_custkey
+          type: BIGINT
+        - name: o_orderstatus
+          type: VARCHAR
+        - name: o_totalprice
+          type: DOUBLE
+        - name: o_orderdate
+          type: DATE
+        - name: o_orderpriority
+          type: VARCHAR
+        - name: o_clerk
+          type: VARCHAR
+        - name: o_shippriority
+          type: INT
+        - name: o_comment
+          type: VARCHAR
+      field-delimiter: "|"
+      line-delimiter: "\n"
+      comment-prefix: "--"
+    schema:
+      - name: o_orderkey
+        type: BIGINT
+      - name: o_custkey
+        type: BIGINT
+      - name: o_orderstatus
+        type: VARCHAR
+      - name: o_totalprice
+        type: DOUBLE
+      - name: o_orderdate
+        type: DATE
+      - name: o_orderpriority
+        type: VARCHAR
+      - name: o_clerk
+        type: VARCHAR
+      - name: o_shippriority
+        type: INT
+      - name: o_comment
+        type: VARCHAR
+  - name: part
+    type: source-table
+    update-mode: append
+    connector:
+      type: filesystem
+      path: "$TABLE_DIR/part.csv"
+    format:
+      type: csv
+      fields:
+        - name: p_partkey
+          type: BIGINT
+        - name: p_name
+          type: VARCHAR
+        - name: p_mfgr
+          type: VARCHAR
+        - name: p_brand
+          type: VARCHAR
+        - name: p_type
+          type: VARCHAR
+        - name: p_size
+          type: INT
+        - name: p_container
+          type: VARCHAR
+        - name: p_retailprice
+          type: DOUBLE
+        - name: p_comment
+          type: VARCHAR
+      field-delimiter: "|"
+      line-delimiter: "\n"
+      comment-prefix: "--"
+    schema:
+      - name: p_partkey
+        type: BIGINT
+      - name: p_name
+        type: VARCHAR
+      - name: p_mfgr
+        type: VARCHAR
+      - name: p_brand
+        type: VARCHAR
+      - name: p_type
+        type: VARCHAR
+      - name: p_size
+        type: INT
+      - name: p_container
+        type: VARCHAR
+      - name: p_retailprice
+        type: DOUBLE
+      - name: p_comment
+        type: VARCHAR
+  - name: partsupp
+    type: source-table
+    update-mode: append
+    connector:
+      type: filesystem
+      path: "$TABLE_DIR/partsupp.csv"
+    format:
+      type: csv
+      fields:
+        - name: ps_partkey
+          type: BIGINT
+        - name: ps_suppkey
+          type: BIGINT
+        - name: ps_availqty
+          type: INT
+        - name: ps_supplycost
+          type: DOUBLE
+        - name: ps_comment
+          type: VARCHAR
+      field-delimiter: "|"
+      line-delimiter: "\n"
+      comment-prefix: "--"
+    schema:
+      - name: ps_partkey
+        type: BIGINT
+      - name: ps_suppkey
+        type: BIGINT
+      - name: ps_availqty
+        type: INT
+      - name: ps_supplycost
+        type: DOUBLE
+      - name: ps_comment
+        type: VARCHAR
+  - name: region
+    type: source-table
+    update-mode: append
+    connector:
+      type: filesystem
+      path: "$TABLE_DIR/region.csv"
+    format:
+      type: csv
+      fields:
+        - name: r_regionkey
+          type: BIGINT
+        - name: r_name
+          type: VARCHAR
+        - name: r_comment
+          type: VARCHAR
+      field-delimiter: "|"
+      line-delimiter: "\n"
+      comment-prefix: "--"
+    schema:
+      - name: r_regionkey
+        type: BIGINT
+      - name: r_name
+        type: VARCHAR
+      - name: r_comment
+        type: VARCHAR
+  - name: supplier
+    type: source-table
+    update-mode: append
+    connector:
+      type: filesystem
+      path: "$TABLE_DIR/supplier.csv"
+    format:
+      type: csv
+      fields:
+        - name: s_suppkey
+          type: BIGINT
+        - name: s_name
+          type: VARCHAR
+        - name: s_address
+          type: VARCHAR
+        - name: s_nationkey
+          type: BIGINT
+        - name: s_phone
+          type: VARCHAR
+        - name: s_acctbal
+          type: DOUBLE
+        - name: s_comment
+          type: VARCHAR
+      field-delimiter: "|"
+      line-delimiter: "\n"
+      comment-prefix: "--"
+    schema:
+      - name: s_suppkey
+        type: BIGINT
+      - name: s_name
+        type: VARCHAR
+      - name: s_address
+        type: VARCHAR
+      - name: s_nationkey
+        type: BIGINT
+      - name: s_phone
+        type: VARCHAR
+      - name: s_acctbal
+        type: DOUBLE
+      - name: s_comment
+        type: VARCHAR
diff --git a/flink-end-to-end-tests/test-scripts/test_tpch.sh b/flink-end-to-end-tests/test-scripts/test_tpch.sh
new file mode 100755
index 0000000..ab4c61c
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_tpch.sh
@@ -0,0 +1,91 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -Eeuo pipefail
+
+SCALE="0.01"
+
+source "$(dirname "$0")"/common.sh
+
+################################################################################
+# Generate test data
+################################################################################
+
+echo "Generating test data..."
+
+TARGET_DIR="$END_TO_END_DIR/flink-tpch-test/target"
+TPCH_DATA_DIR="$END_TO_END_DIR/test-scripts/test-data/tpch"
+java -cp "$TARGET_DIR/flink-tpch-test-1.10-SNAPSHOT.jar:$TARGET_DIR/lib/*" org.apache.flink.table.tpch.TpchDataGenerator "$SCALE" "$TARGET_DIR"
+
+################################################################################
+# Prepare Flink
+################################################################################
+
+echo "Preparing Flink..."
+
+start_cluster
+
+################################################################################
+# Run SQL statements
+################################################################################
+
+TABLE_DIR="$TARGET_DIR/table"
+ORIGIN_QUERY_DIR="$TARGET_DIR/query"
+MODIFIED_QUERY_DIR="$TPCH_DATA_DIR/modified-query"
+EXPECTED_DIR="$TARGET_DIR/expected"
+RESULT_DIR="$TEST_DATA_DIR/result"
+SQL_CONF="$TEST_DATA_DIR/sql-client-session.conf"
+
+mkdir "$RESULT_DIR"
+
+SOURCES_YAML=$(cat "$TPCH_DATA_DIR/source.yaml")
+SOURCES_YAML=${SOURCES_YAML//\$TABLE_DIR/"$TABLE_DIR"}
+
+for i in {1..22}
+do
+    echo "Running query #$i..."
+
+    # First line in sink yaml is ignored
+    SINK_YAML=$(tail -n +2 "$TPCH_DATA_DIR/sink/q${i}.yaml")
+    SINK_YAML=${SINK_YAML//\$RESULT_DIR/"$RESULT_DIR"}
+
+    cat > "$SQL_CONF" << EOF
+${SOURCES_YAML}
+${SINK_YAML}
+execution:
+  planner: blink
+  type: batch
+  result-mode: table
+EOF
+
+    if [[ -e "$MODIFIED_QUERY_DIR/q$i.sql" ]]
+    then
+        SQL_STATEMENT="INSERT INTO q$i $(cat "$MODIFIED_QUERY_DIR/q$i.sql")"
+    else
+        SQL_STATEMENT="INSERT INTO q$i $(cat "$ORIGIN_QUERY_DIR/q$i.sql")"
+    fi
+
+    JOB_ID=$("$FLINK_DIR/bin/sql-client.sh" embedded \
+        --environment "$SQL_CONF" \
+        --update "$SQL_STATEMENT" | grep "Job ID:" | sed 's/.* //g')
+
+    wait_job_terminal_state "$JOB_ID" "FINISHED"
+
+    java -cp "$TARGET_DIR/flink-tpch-test-1.10-SNAPSHOT.jar" org.apache.flink.table.tpch.TpchResultComparator "$EXPECTED_DIR/q$i.csv" "$RESULT_DIR/q$i.csv"
+done
diff --git a/pom.xml b/pom.xml
index 44e21ac..ef17b18 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1403,7 +1403,7 @@ under the License.
 						<exclude>flink-table/flink-table-planner-blink/src/test/resources/digest/*.out</exclude>
 						<exclude>flink-table/flink-table-planner-blink/src/test/resources/explain/*.out</exclude>
 						<exclude>flink-yarn/src/test/resources/krb5.keytab</exclude>
-						<exclude>flink-end-to-end-tests/test-scripts/test-data/*</exclude>
+						<exclude>flink-end-to-end-tests/test-scripts/test-data/**</exclude>
 						<exclude>flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/keystore.jks</exclude>
 						<exclude>flink-connectors/flink-connector-kafka/src/test/resources/**</exclude>
 						<exclude>flink-connectors/flink-connector-kafka-0.11/src/test/resources/**</exclude>