You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by kcz <57...@qq.com> on 2020/07/29 07:07:26 UTC
flink-1.11 hive-1.2.1 ddl 无法写入数据
确认数据源有数据,全部代码如下,但是hive就是没有数据
package com.hive;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import java.time.Duration;
public class HiveTest {
private static final String path = "hdfs_path";
public static void main(String []args) {
System.setProperty("HADOOP_USER_NAME", "work");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new FsStateBackend(path));
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,tableEnvSettings);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20));
String name = "myhive";
String defaultDatabase = "situation";
String hiveConfDir = "/load/data/hive/hive-conf"; // a local path
String version = "1.2.1";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation");
tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table");
tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
"\thost STRING,\n" +
"\turl STRING,\n" +
"\tpublic_date STRING\n" +
") WITH (\n" +
"\t'connector.type' = 'kafka',\n" +
"\t'connector.version' = 'universal',\n" +
"\t'connector.startup-mode' = 'latest-offset',\n" +
"\t'connector.topic' = 'sendMessage',\n" +
"\t'connector.properties.group.id' = 'domain_testGroup',\n" +
"\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
"\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
"\t'update-mode' = 'append',\n" +
"\t'format.type' = 'json',\n" +
"\t'format.derive-schema' = 'true'\n" +
")");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table");
String hiveSql = "\n" +
" CREATE TABLE situation.fs_table (\n" +
" \n" +
" host STRING,\n" +
" url STRING,\n" +
" public_date STRING\n" +
" \n" +
" ) PARTITIONED BY (\n" +
" ts_date STRING,\n" +
" ts_hour STRING,\n" +
" ts_minute STRING\n" +
" ) STORED AS PARQUET\n" +
" TBLPROPERTIES (\n" +
" 'sink.partition-commit.trigger' = 'process time',\n" +
" 'sink.partition-commit.delay' = '1 min',\n" +
" 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" +
" 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'\n" +
" )\n" +
" ";
tableEnv.executeSql(hiveSql);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("INSERT INTO situation.fs_table SELECT host, url,public_date," +
" DATE_FORMAT(public_date,'yyyy-MM-dd') ,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm') FROM situation.source_table");
}
}
Re:flink-1.11 hive-1.2.1 ddl 无法写入数据
Posted by hailongwang <18...@163.com>.
有什么异常信息吗
在 2020-07-29 14:07:26,"kcz" <57...@qq.com> 写道:
>确认数据源有数据,全部代码如下,但是hive就是没有数据
>
>package com.hive;
>
>import org.apache.flink.runtime.state.filesystem.FsStateBackend;
>import org.apache.flink.streaming.api.CheckpointingMode;
>import org.apache.flink.streaming.api.TimeCharacteristic;
>import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.SqlDialect;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.catalog.hive.HiveCatalog;
>
>import java.time.Duration;
>
>public class HiveTest {
> private static final String path = "hdfs_path";
> public static void main(String []args) {
> System.setProperty("HADOOP_USER_NAME", "work");
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
> // 同一时间只允许进行一个检查点
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> env.setStateBackend(new FsStateBackend(path));
> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,tableEnvSettings);
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20));
>
> String name = "myhive";
> String defaultDatabase = "situation";
> String hiveConfDir = "/load/data/hive/hive-conf"; // a local path
> String version = "1.2.1";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
> tableEnv.registerCatalog("myhive", hive);
>
>// set the HiveCatalog as the current catalog of the session
> tableEnv.useCatalog("myhive");
> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation");
> tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table");
>
>
> tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
> "\thost STRING,\n" +
> "\turl STRING,\n" +
> "\tpublic_date STRING\n" +
> ") WITH (\n" +
> "\t'connector.type' = 'kafka',\n" +
> "\t'connector.version' = 'universal',\n" +
> "\t'connector.startup-mode' = 'latest-offset',\n" +
> "\t'connector.topic' = 'sendMessage',\n" +
> "\t'connector.properties.group.id' = 'domain_testGroup',\n" +
> "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
> "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
> "\t'update-mode' = 'append',\n" +
> "\t'format.type' = 'json',\n" +
> "\t'format.derive-schema' = 'true'\n" +
> ")");
>
> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table");
>
> String hiveSql = "\n" +
> " CREATE TABLE situation.fs_table (\n" +
> " \n" +
> " host STRING,\n" +
> " url STRING,\n" +
> " public_date STRING\n" +
> " \n" +
> " ) PARTITIONED BY (\n" +
> " ts_date STRING,\n" +
> " ts_hour STRING,\n" +
> " ts_minute STRING\n" +
> " ) STORED AS PARQUET\n" +
> " TBLPROPERTIES (\n" +
> " 'sink.partition-commit.trigger' = 'process time',\n" +
> " 'sink.partition-commit.delay' = '1 min',\n" +
> " 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" +
> " 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'\n" +
> " )\n" +
> " ";
> tableEnv.executeSql(hiveSql);
>
> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>
> tableEnv.executeSql("INSERT INTO situation.fs_table SELECT host, url,public_date," +
> " DATE_FORMAT(public_date,'yyyy-MM-dd') ,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm') FROM situation.source_table");
>
>
>
> }
>}
回复: flink-1.11 hive-1.2.1 ddl 无法写入数据
Posted by kcz <57...@qq.com>.
sorry,我把idea的log4j弄坏了,没有出现错误提示,我下面的process-time 写成了 process time。改了log提示之后,有清楚的提示了。
------------------ 原始邮件 ------------------
发件人: "user-zh" <xbjtdcq@gmail.com>;
发送时间: 2020年7月30日(星期四) 凌晨0:14
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: flink-1.11 hive-1.2.1 ddl 无法写入数据
Hi, kcz
看connector的properties还是1.10的格式,你换成1.11试试[1].
> 在 2020年7月29日,15:07,kcz <573693104@qq.com> 写道:
>
> tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
> "\thost STRING,\n" +
> "\turl STRING,\n" +
> "\tpublic_date STRING\n" +
> ") WITH (\n" +
> "\t'connector.type' = 'kafka',\n" +
> "\t'connector.version' = 'universal',\n" +
> "\t'connector.startup-mode' = 'latest-offset',\n" +
> "\t'connector.topic' = 'sendMessage',\n" +
> "\t'connector.properties.group.id <http://connector.properties.group.id/>' = 'domain_testGroup',\n" +
> "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
> "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
> "\t'update-mode' = 'append',\n" +
> "\t'format.type' = 'json',\n" +
> "\t'format.derive-schema' = 'true'\n" +
> ")");
Best
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options>
Re: flink-1.11 hive-1.2.1 ddl 无法写入数据
Posted by Leonard Xu <xb...@gmail.com>.
Hi, kcz
看connector的properties还是1.10的格式,你换成1.11试试[1].
> 在 2020年7月29日,15:07,kcz <57...@qq.com> 写道:
>
> tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
> "\thost STRING,\n" +
> "\turl STRING,\n" +
> "\tpublic_date STRING\n" +
> ") WITH (\n" +
> "\t'connector.type' = 'kafka',\n" +
> "\t'connector.version' = 'universal',\n" +
> "\t'connector.startup-mode' = 'latest-offset',\n" +
> "\t'connector.topic' = 'sendMessage',\n" +
> "\t'connector.properties.group.id <http://connector.properties.group.id/>' = 'domain_testGroup',\n" +
> "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
> "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
> "\t'update-mode' = 'append',\n" +
> "\t'format.type' = 'json',\n" +
> "\t'format.derive-schema' = 'true'\n" +
> ")");
Best
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options>