You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by kcz <57...@qq.com> on 2020/07/29 07:07:26 UTC

flink-1.11 hive-1.2.1 ddl 无法写入数据

确认数据源有数据,全部代码如下,但是hive就是没有数据

package com.hive;

import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

import java.time.Duration;

public class HiveTest {
    private static final String path = "hdfs_path";
    public static void main(String []args)  {
        System.setProperty("HADOOP_USER_NAME", "work");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        env.setStateBackend(new FsStateBackend(path));
        EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,tableEnvSettings);
        tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
        tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20));

        String name            = "myhive";
        String defaultDatabase = "situation";
        String hiveConfDir     = "/load/data/hive/hive-conf"; // a local path
        String version         = "1.2.1";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
        tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
        tableEnv.useCatalog("myhive");
        tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation");
        tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table");


        tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
                "\thost STRING,\n" +
                "\turl STRING,\n" +
                "\tpublic_date STRING\n" +
                ") WITH (\n" +
                "\t'connector.type' = 'kafka',\n" +
                "\t'connector.version' = 'universal',\n" +
                "\t'connector.startup-mode' = 'latest-offset',\n" +
                "\t'connector.topic' = 'sendMessage',\n" +
                "\t'connector.properties.group.id' = 'domain_testGroup',\n" +
                "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
                "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
                "\t'update-mode' = 'append',\n" +
                "\t'format.type' = 'json',\n" +
                "\t'format.derive-schema' = 'true'\n" +
                ")");

        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table");

        String hiveSql = "\n" +
                "  CREATE TABLE situation.fs_table (\n" +
                " \n" +
                "    host STRING,\n" +
                "    url STRING,\n" +
                "    public_date STRING\n" +
                "  \n" +
                "  ) PARTITIONED BY (\n" +
                "    ts_date STRING,\n" +
                "    ts_hour STRING,\n" +
                "    ts_minute STRING\n" +
                "  ) STORED AS PARQUET\n" +
                "  TBLPROPERTIES (\n" +
                "    'sink.partition-commit.trigger' = 'process time',\n" +
                "    'sink.partition-commit.delay' = '1 min',\n" +
                "    'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" +
                "    'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'\n" +
                "  )\n" +
                "  ";
        tableEnv.executeSql(hiveSql);

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        tableEnv.executeSql("INSERT INTO  situation.fs_table SELECT host, url,public_date," +
                " DATE_FORMAT(public_date,'yyyy-MM-dd') ,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm')  FROM situation.source_table");



    }
}

Re:flink-1.11 hive-1.2.1 ddl 无法写入数据

Posted by hailongwang <18...@163.com>.


有什么异常信息吗




在 2020-07-29 14:07:26,"kcz" <57...@qq.com> 写道:
>确认数据源有数据,全部代码如下,但是hive就是没有数据
>
>package com.hive;
>
>import org.apache.flink.runtime.state.filesystem.FsStateBackend;
>import org.apache.flink.streaming.api.CheckpointingMode;
>import org.apache.flink.streaming.api.TimeCharacteristic;
>import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.SqlDialect;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.catalog.hive.HiveCatalog;
>
>import java.time.Duration;
>
>public class HiveTest {
>    private static final String path = "hdfs_path";
>    public static void main(String []args)  {
>        System.setProperty("HADOOP_USER_NAME", "work");
>        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>        env.setParallelism(1);
>        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>        // 同一时间只允许进行一个检查点
>        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
>        env.setStateBackend(new FsStateBackend(path));
>        EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
>                .useBlinkPlanner()
>                .inStreamingMode()
>                .build();
>        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,tableEnvSettings);
>        tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
>        tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20));
>
>        String name            = "myhive";
>        String defaultDatabase = "situation";
>        String hiveConfDir     = "/load/data/hive/hive-conf"; // a local path
>        String version         = "1.2.1";
>
>        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
>        tableEnv.registerCatalog("myhive", hive);
>
>// set the HiveCatalog as the current catalog of the session
>        tableEnv.useCatalog("myhive");
>        tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation");
>        tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table");
>
>
>        tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
>                "\thost STRING,\n" +
>                "\turl STRING,\n" +
>                "\tpublic_date STRING\n" +
>                ") WITH (\n" +
>                "\t'connector.type' = 'kafka',\n" +
>                "\t'connector.version' = 'universal',\n" +
>                "\t'connector.startup-mode' = 'latest-offset',\n" +
>                "\t'connector.topic' = 'sendMessage',\n" +
>                "\t'connector.properties.group.id' = 'domain_testGroup',\n" +
>                "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
>                "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
>                "\t'update-mode' = 'append',\n" +
>                "\t'format.type' = 'json',\n" +
>                "\t'format.derive-schema' = 'true'\n" +
>                ")");
>
>        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>        tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table");
>
>        String hiveSql = "\n" +
>                "  CREATE TABLE situation.fs_table (\n" +
>                " \n" +
>                "    host STRING,\n" +
>                "    url STRING,\n" +
>                "    public_date STRING\n" +
>                "  \n" +
>                "  ) PARTITIONED BY (\n" +
>                "    ts_date STRING,\n" +
>                "    ts_hour STRING,\n" +
>                "    ts_minute STRING\n" +
>                "  ) STORED AS PARQUET\n" +
>                "  TBLPROPERTIES (\n" +
>                "    'sink.partition-commit.trigger' = 'process time',\n" +
>                "    'sink.partition-commit.delay' = '1 min',\n" +
>                "    'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" +
>                "    'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'\n" +
>                "  )\n" +
>                "  ";
>        tableEnv.executeSql(hiveSql);
>
>        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>
>        tableEnv.executeSql("INSERT INTO  situation.fs_table SELECT host, url,public_date," +
>                " DATE_FORMAT(public_date,'yyyy-MM-dd') ,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm')  FROM situation.source_table");
>
>
>
>    }
>}

回复: flink-1.11 hive-1.2.1 ddl 无法写入数据

Posted by kcz <57...@qq.com>.
sorry,我把idea的log4j弄坏了,没有出现错误提示,我下面的process-time 写成了 process time。改了log提示之后,有清楚的提示了。




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <xbjtdcq@gmail.com&gt;;
发送时间:&nbsp;2020年7月30日(星期四) 凌晨0:14
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: flink-1.11 hive-1.2.1 ddl 无法写入数据



Hi, kcz

看connector的properties还是1.10的格式,你换成1.11试试[1].


&gt; 在 2020年7月29日,15:07,kcz <573693104@qq.com&gt; 写道:
&gt; 
&gt;&nbsp; tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\thost STRING,\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\turl STRING,\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\tpublic_date STRING\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") WITH (\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.type' = 'kafka',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.version' = 'universal',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.startup-mode' = 'latest-offset',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.topic' = 'sendMessage',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.properties.group.id <http://connector.properties.group.id/&gt;' = 'domain_testGroup',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'update-mode' = 'append',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'format.type' = 'json',\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "\t'format.derive-schema' = 'true'\n" +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ")");


Best
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options&gt;

Re: flink-1.11 hive-1.2.1 ddl 无法写入数据

Posted by Leonard Xu <xb...@gmail.com>.
Hi, kcz

看connector的properties还是1.10的格式,你换成1.11试试[1].


> 在 2020年7月29日,15:07,kcz <57...@qq.com> 写道:
> 
>  tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
>                "\thost STRING,\n" +
>                "\turl STRING,\n" +
>                "\tpublic_date STRING\n" +
>                ") WITH (\n" +
>                "\t'connector.type' = 'kafka',\n" +
>                "\t'connector.version' = 'universal',\n" +
>                "\t'connector.startup-mode' = 'latest-offset',\n" +
>                "\t'connector.topic' = 'sendMessage',\n" +
>                "\t'connector.properties.group.id <http://connector.properties.group.id/>' = 'domain_testGroup',\n" +
>                "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
>                "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
>                "\t'update-mode' = 'append',\n" +
>                "\t'format.type' = 'json',\n" +
>                "\t'format.derive-schema' = 'true'\n" +
>                ")");


Best
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options>