You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by McClone <ab...@163.com> on 2020/10/16 13:31:55 UTC

hive streaning 问题请教

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().addConfiguration(
        new Configuration()
                .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(30)));
tEnv.executeSql("CREATE TEMPORARY FUNCTION TestFunca AS 'org.example.flink.TestFunca' LANGUAGE JAVA");
tEnv.executeSql("CREATE TABLE datagen (\n" +
        " name STRING,\n" +
        " pass STRING,\n" +
        " type1 INT,\n" +
        " t1 STRING,\n" +
        " t2 STRING,\n" +
        " ts AS localtimestamp,\n" +
        " WATERMARK FOR ts AS ts\n" +
        ") WITH (\n" +
        " 'connector' = 'datagen',\n" +
        " 'rows-per-second'='1',\n" +
        " 'fields.type1.min'='1',\n" +
        " 'fields.type1.max'='10'\n" +
        ")");

tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.executeSql("CREATE TABLE hive_table (\n" +
        "  user_id STRING,\n" +
        "  order_amount STRING\n" +
        ") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (\n" +
        "  'sink.partition-commit.trigger'='partition-time',\n" +
        "  'sink.partition-commit.delay'='1 h',\n" +
        "  'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
        ")");

tEnv.executeSql("insert into hive_table select t1,t2,TestFunca(type1),TestFunca(type1) from datagen");

Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector.
	at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
	... 18 more
发送自 Windows 10 版邮件应用


Re:hive streaning 问题请教

Posted by hailongwang <18...@163.com>.
Hi McClone:
You should register a hive catalog first. Hive factory is only work for catalog.
You can refer to HiveTableSinkITCase#testStreamingWrite (HiveTableSinkITCase.java).
Hope this can help you!
Best,
Hailong Wang

At 2020-10-16 20:31:55, "McClone" <ab...@163.com> wrote:
>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>env.disableOperatorChaining();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>tEnv.getConfig().addConfiguration(
>        new Configuration()
>                .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(30)));
>tEnv.executeSql("CREATE TEMPORARY FUNCTION TestFunca AS 'org.example.flink.TestFunca' LANGUAGE JAVA");
>tEnv.executeSql("CREATE TABLE datagen (\n" +
>        " name STRING,\n" +
>        " pass STRING,\n" +
>        " type1 INT,\n" +
>        " t1 STRING,\n" +
>        " t2 STRING,\n" +
>        " ts AS localtimestamp,\n" +
>        " WATERMARK FOR ts AS ts\n" +
>        ") WITH (\n" +
>        " 'connector' = 'datagen',\n" +
>        " 'rows-per-second'='1',\n" +
>        " 'fields.type1.min'='1',\n" +
>        " 'fields.type1.max'='10'\n" +
>        ")");
>
>tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>tEnv.executeSql("CREATE TABLE hive_table (\n" +
>        "  user_id STRING,\n" +
>        "  order_amount STRING\n" +
>        ") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (\n" +
>        "  'sink.partition-commit.trigger'='partition-time',\n" +
>        "  'sink.partition-commit.delay'='1 h',\n" +
>        "  'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
>        ")");
>
>tEnv.executeSql("insert into hive_table select t1,t2,TestFunca(type1),TestFunca(type1) from datagen");
>
>Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector.
>	at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
>	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>	... 18 more
>发送自 Windows 10 版邮件应用
>