You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/02/07 02:28:31 UTC
[flink] 02/02: [FLINK-15935][example] Add Streaming Window SQL
example
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fa9d559ffbee7b7595ce1ab632710937e1b79655
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Feb 6 16:39:34 2020 +0800
[FLINK-15935][example] Add Streaming Window SQL example
---
.../examples/java/StreamWindowSQLExample.java | 98 ++++++++++++++++++++++
1 file changed, 98 insertions(+)
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java
new file mode 100644
index 0000000..4b1acf9
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import java.io.File;
+
+/**
+ * Simple example for demonstrating the use of SQL in Java.
+ *
+ * <p>This example shows how to:
+ * - Register a table via DDL
+ * - Declare an event time attribute in the DDL
+ * - Run a streaming window aggregate on the registered table
+ */
+public class StreamWindowSQLExample {
+
+ public static void main(String[] args) throws Exception {
+
+ // set up execution environment
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ // use blink planner in streaming mode,
+ // because watermark statement is only available in blink planner.
+ EnvironmentSettings settings = EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
+
+ // write source data into temporary file and get the absolute path
+ String contents = "1,beer,3,2019-12-12 00:00:01\n" +
+ "1,diaper,4,2019-12-12 00:00:02\n" +
+ "2,pen,3,2019-12-12 00:00:04\n" +
+ "2,rubber,3,2019-12-12 00:00:06\n" +
+ "3,rubber,2,2019-12-12 00:00:05\n" +
+ "4,beer,1,2019-12-12 00:00:08";
+ File tempFile = File.createTempFile("orders", ".csv");
+ tempFile.deleteOnExit();
+ FileUtils.writeFileUtf8(tempFile, contents);
+ String path = tempFile.toURI().toString();
+ System.out.println(path);
+
+ // register table via DDL with watermark,
+ // the events are out of order, hence, we use 3 seconds to wait the late events
+ String ddl = "CREATE TABLE orders (\n" +
+ " user_id INT,\n" +
+ " product STRING,\n" +
+ " amount INT,\n" +
+ " ts TIMESTAMP(3),\n" +
+ " WATERMARK FOR ts AS ts - INTERVAL '3' SECOND\n" +
+ ") WITH (\n" +
+ " 'connector.type' = 'filesystem',\n" +
+ " 'connector.path' = '" + path + "',\n" +
+ " 'format.type' = 'csv'\n" +
+ ")";
+ tEnv.sqlUpdate(ddl);
+
+ // run a SQL query on the table and retrieve the result as a new Table
+ String query = "SELECT\n" +
+ " CAST(TUMBLE_START(ts, INTERVAL '5' SECOND) AS STRING) window_start,\n" +
+ " COUNT(*) order_num,\n" +
+ " SUM(amount) total_amount,\n" +
+ " COUNT(DISTINCT product) unique_products\n" +
+ "FROM orders\n" +
+ "GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)";
+ Table result = tEnv.sqlQuery(query);
+ tEnv.toAppendStream(result, Row.class).print();
+
+ // submit the job
+ tEnv.execute("Streaming Window SQL Job");
+
+ // should output:
+ // 2019-12-12 00:00:00.000,3,10,3
+ // 2019-12-12 00:00:05.000,3,6,2
+ }
+}