You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by jack <ws...@163.com> on 2020/06/16 05:47:32 UTC

pyflink连接elasticsearch5.4问题

我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
连接es的时候报错,findAndCreateTableSink   failed。 
是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。


Caused by Could not find a suitable  factory for   ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
Reason: Required context properties mismatch






from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch




def area_cnts():
    s_env = StreamExecutionEnvironment.get_execution_environment()
    s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    s_env.set_parallelism(1)


    # use blink table planner
    st_env = StreamTableEnvironment \
        .create(s_env, environment_settings=EnvironmentSettings
                .new_instance()
                .in_streaming_mode()
                .use_blink_planner().build())


    # register source and sink
    register_rides_source(st_env)
    register_cnt_sink(st_env)


    # query
    st_env.from_path("source")\
        .group_by("taxiId")\
        .select("taxiId, count(1) as cnt")\
        .insert_into("sink")


    # execute
    st_env.execute("6-write_with_elasticsearch")




def register_rides_source(st_env):
    st_env \
        .connect(  # declare the external system to connect to
        Kafka()
            .version("universal")
            .topic("Rides")
            .start_from_earliest()
            .property("zookeeper.connect", "zookeeper:2181")
            .property("bootstrap.servers", "kafka:9092")) \
        .with_format(  # declare a format for this system
        Json()
            .fail_on_missing_field(True)
            .schema(DataTypes.ROW([
            DataTypes.FIELD("rideId", DataTypes.BIGINT()),
            DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
            DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
            DataTypes.FIELD("lon", DataTypes.FLOAT()),
            DataTypes.FIELD("lat", DataTypes.FLOAT()),
            DataTypes.FIELD("psgCnt", DataTypes.INT()),
            DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
        .with_schema(  # declare the schema of the table
        Schema()
            .field("rideId", DataTypes.BIGINT())
            .field("taxiId", DataTypes.BIGINT())
            .field("isStart", DataTypes.BOOLEAN())
            .field("lon", DataTypes.FLOAT())
            .field("lat", DataTypes.FLOAT())
            .field("psgCnt", DataTypes.INT())
            .field("rideTime", DataTypes.TIMESTAMP())
            .rowtime(
            Rowtime()
                .timestamps_from_field("eventTime")
                .watermarks_periodic_bounded(60000))) \
        .in_append_mode() \
        .register_table_source("source")




def register_cnt_sink(st_env):
    st_env.connect(
        Elasticsearch()
            .version("6")
            .host("elasticsearch", 9200, "http")
            .index("taxiid-cnts")
            .document_type('taxiidcnt')
            .key_delimiter("$")) \
        .with_schema(
            Schema()
                .field("taxiId", DataTypes.BIGINT())
                .field("cnt", DataTypes.BIGINT())) \
        .with_format(
           Json()
               .derive_schema()) \
        .in_upsert_mode() \
        .register_table_sink("sink")




if __name__ == '__main__':
    area_cnts()


Re: pyflink连接elasticsearch5.4问题

Posted by Jark Wu <im...@gmail.com>.
Hi,

据我所知,Flink 1.10 官方没有支持Elasticsearch 5.x 版本的 sql connector。

Best,
Jark

On Tue, 16 Jun 2020 at 16:08, Dian Fu <di...@gmail.com> wrote:

> 可以发一下完整的异常吗?
>
> 在 2020年6月16日,下午3:45,jack <ws...@163.com> 写道:
>
> 连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>
> >>     st_env.connect(
> >>         Elasticsearch()
> >>             .version("5")
> >>             .host("localhost", 9200, "http")
> >>             .index("taxiid-cnts")
> >>             .document_type('taxiidcnt')
> >>             .key_delimiter("$")) \
>
>
>
>
>
>
> 在 2020-06-16 15:38:28,"Dian Fu" <di...@gmail.com> 写道:
> >I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`.
> >
> >> 在 2020年6月16日,下午1:47,jack <ws...@163.com> 写道:
> >>
> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> >> 连接es的时候报错,findAndCreateTableSink   failed。
> >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> >>
> >> Caused by Could not find a suitable  factory for   ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> >> Reason: Required context properties mismatch
> >>
> >>
> >>
> >> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
> >> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
> >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch
> >>
> >>
> >> def area_cnts():
> >>     s_env = StreamExecutionEnvironment.get_execution_environment()
> >>     s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> >>     s_env.set_parallelism(1)
> >>
> >>     # use blink table planner
> >>     st_env = StreamTableEnvironment \
> >>         .create(s_env, environment_settings=EnvironmentSettings
> >>                 .new_instance()
> >>                 .in_streaming_mode()
> >>                 .use_blink_planner().build())
> >>
> >>     # register source and sink
> >>     register_rides_source(st_env)
> >>     register_cnt_sink(st_env)
> >>
> >>     # query
> >>     st_env.from_path("source")\
> >>         .group_by("taxiId")\
> >>         .select("taxiId, count(1) as cnt")\
> >>         .insert_into("sink")
> >>
> >>     # execute
> >>     st_env.execute("6-write_with_elasticsearch")
> >>
> >>
> >> def register_rides_source(st_env):
> >>     st_env \
> >>         .connect(  # declare the external system to connect to
> >>         Kafka()
> >>             .version("universal")
> >>             .topic("Rides")
> >>             .start_from_earliest()
> >>             .property("zookeeper.connect", "zookeeper:2181")
> >>             .property("bootstrap.servers", "kafka:9092")) \
> >>         .with_format(  # declare a format for this system
> >>         Json()
> >>             .fail_on_missing_field(True)
> >>             .schema(DataTypes.ROW([
> >>             DataTypes.FIELD("rideId", DataTypes.BIGINT()),
> >>             DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
> >>             DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
> >>             DataTypes.FIELD("lon", DataTypes.FLOAT()),
> >>             DataTypes.FIELD("lat", DataTypes.FLOAT()),
> >>             DataTypes.FIELD("psgCnt", DataTypes.INT()),
> >>             DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
> >>         .with_schema(  # declare the schema of the table
> >>         Schema()
> >>             .field("rideId", DataTypes.BIGINT())
> >>             .field("taxiId", DataTypes.BIGINT())
> >>             .field("isStart", DataTypes.BOOLEAN())
> >>             .field("lon", DataTypes.FLOAT())
> >>             .field("lat", DataTypes.FLOAT())
> >>             .field("psgCnt", DataTypes.INT())
> >>             .field("rideTime", DataTypes.TIMESTAMP())
> >>             .rowtime(
> >>             Rowtime()
> >>                 .timestamps_from_field("eventTime")
> >>                 .watermarks_periodic_bounded(60000))) \
> >>         .in_append_mode() \
> >>         .register_table_source("source")
> >>
> >>
> >> def register_cnt_sink(st_env):
> >>     st_env.connect(
> >>         Elasticsearch()
> >>             .version("6")
> >>             .host("elasticsearch", 9200, "http")
> >>             .index("taxiid-cnts")
> >>             .document_type('taxiidcnt')
> >>             .key_delimiter("$")) \
> >>         .with_schema(
> >>             Schema()
> >>                 .field("taxiId", DataTypes.BIGINT())
> >>                 .field("cnt", DataTypes.BIGINT())) \
> >>         .with_format(
> >>            Json()
> >>                .derive_schema()) \
> >>         .in_upsert_mode() \
> >>         .register_table_sink("sink")
> >>
> >>
> >> if __name__ == '__main__':
> >>     area_cnts()
> >>
> >
>
>
>

Re: pyflink连接elasticsearch5.4问题

Posted by Jark Wu <im...@gmail.com>.
Hi,

据我所知,Flink 1.10 官方没有支持Elasticsearch 5.x 版本的 sql connector。

Best,
Jark

On Tue, 16 Jun 2020 at 16:08, Dian Fu <di...@gmail.com> wrote:

> 可以发一下完整的异常吗?
>
> 在 2020年6月16日,下午3:45,jack <ws...@163.com> 写道:
>
> 连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>
> >>     st_env.connect(
> >>         Elasticsearch()
> >>             .version("5")
> >>             .host("localhost", 9200, "http")
> >>             .index("taxiid-cnts")
> >>             .document_type('taxiidcnt')
> >>             .key_delimiter("$")) \
>
>
>
>
>
>
> 在 2020-06-16 15:38:28,"Dian Fu" <di...@gmail.com> 写道:
> >I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`.
> >
> >> 在 2020年6月16日,下午1:47,jack <ws...@163.com> 写道:
> >>
> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> >> 连接es的时候报错,findAndCreateTableSink   failed。
> >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> >>
> >> Caused by Could not find a suitable  factory for   ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> >> Reason: Required context properties mismatch
> >>
> >>
> >>
> >> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
> >> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
> >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch
> >>
> >>
> >> def area_cnts():
> >>     s_env = StreamExecutionEnvironment.get_execution_environment()
> >>     s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> >>     s_env.set_parallelism(1)
> >>
> >>     # use blink table planner
> >>     st_env = StreamTableEnvironment \
> >>         .create(s_env, environment_settings=EnvironmentSettings
> >>                 .new_instance()
> >>                 .in_streaming_mode()
> >>                 .use_blink_planner().build())
> >>
> >>     # register source and sink
> >>     register_rides_source(st_env)
> >>     register_cnt_sink(st_env)
> >>
> >>     # query
> >>     st_env.from_path("source")\
> >>         .group_by("taxiId")\
> >>         .select("taxiId, count(1) as cnt")\
> >>         .insert_into("sink")
> >>
> >>     # execute
> >>     st_env.execute("6-write_with_elasticsearch")
> >>
> >>
> >> def register_rides_source(st_env):
> >>     st_env \
> >>         .connect(  # declare the external system to connect to
> >>         Kafka()
> >>             .version("universal")
> >>             .topic("Rides")
> >>             .start_from_earliest()
> >>             .property("zookeeper.connect", "zookeeper:2181")
> >>             .property("bootstrap.servers", "kafka:9092")) \
> >>         .with_format(  # declare a format for this system
> >>         Json()
> >>             .fail_on_missing_field(True)
> >>             .schema(DataTypes.ROW([
> >>             DataTypes.FIELD("rideId", DataTypes.BIGINT()),
> >>             DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
> >>             DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
> >>             DataTypes.FIELD("lon", DataTypes.FLOAT()),
> >>             DataTypes.FIELD("lat", DataTypes.FLOAT()),
> >>             DataTypes.FIELD("psgCnt", DataTypes.INT()),
> >>             DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
> >>         .with_schema(  # declare the schema of the table
> >>         Schema()
> >>             .field("rideId", DataTypes.BIGINT())
> >>             .field("taxiId", DataTypes.BIGINT())
> >>             .field("isStart", DataTypes.BOOLEAN())
> >>             .field("lon", DataTypes.FLOAT())
> >>             .field("lat", DataTypes.FLOAT())
> >>             .field("psgCnt", DataTypes.INT())
> >>             .field("rideTime", DataTypes.TIMESTAMP())
> >>             .rowtime(
> >>             Rowtime()
> >>                 .timestamps_from_field("eventTime")
> >>                 .watermarks_periodic_bounded(60000))) \
> >>         .in_append_mode() \
> >>         .register_table_source("source")
> >>
> >>
> >> def register_cnt_sink(st_env):
> >>     st_env.connect(
> >>         Elasticsearch()
> >>             .version("6")
> >>             .host("elasticsearch", 9200, "http")
> >>             .index("taxiid-cnts")
> >>             .document_type('taxiidcnt')
> >>             .key_delimiter("$")) \
> >>         .with_schema(
> >>             Schema()
> >>                 .field("taxiId", DataTypes.BIGINT())
> >>                 .field("cnt", DataTypes.BIGINT())) \
> >>         .with_format(
> >>            Json()
> >>                .derive_schema()) \
> >>         .in_upsert_mode() \
> >>         .register_table_sink("sink")
> >>
> >>
> >> if __name__ == '__main__':
> >>     area_cnts()
> >>
> >
>
>
>

Re: pyflink连接elasticsearch5.4问题

Posted by Dian Fu <di...@gmail.com>.
可以发一下完整的异常吗?

> 在 2020年6月16日,下午3:45,jack <ws...@163.com> 写道:
> 
> 连接的版本部分我本地已经修改为 5了,发生了下面的报错;
> >>     st_env.connect(
> >>         Elasticsearch()
> >>             .version("5")
> >>             .host("localhost", 9200, "http")
> >>             .index("taxiid-cnts")
> >>             .document_type('taxiidcnt')
> >>             .key_delimiter("$")) \
> 
> 
> 
> 
> 
> 在 2020-06-16 15:38:28,"Dian Fu" <di...@gmail.com> 写道:
> >I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`.
> >
> >> 在 2020年6月16日,下午1:47,jack <ws...@163.com> 写道:
> >> 
> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> >> 连接es的时候报错,findAndCreateTableSink   failed。 
> >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> >> 
> >> Caused by Could not find a suitable  factory for   ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> >> Reason: Required context properties mismatch
> >> 
> >> 
> >> 
> >> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
> >> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
> >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch
> >> 
> >> 
> >> def area_cnts():
> >>     s_env = StreamExecutionEnvironment.get_execution_environment()
> >>     s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> >>     s_env.set_parallelism(1)
> >> 
> >>     # use blink table planner
> >>     st_env = StreamTableEnvironment \
> >>         .create(s_env, environment_settings=EnvironmentSettings
> >>                 .new_instance()
> >>                 .in_streaming_mode()
> >>                 .use_blink_planner().build())
> >> 
> >>     # register source and sink
> >>     register_rides_source(st_env)
> >>     register_cnt_sink(st_env)
> >> 
> >>     # query
> >>     st_env.from_path("source")\
> >>         .group_by("taxiId")\
> >>         .select("taxiId, count(1) as cnt")\
> >>         .insert_into("sink")
> >> 
> >>     # execute
> >>     st_env.execute("6-write_with_elasticsearch")
> >> 
> >> 
> >> def register_rides_source(st_env):
> >>     st_env \
> >>         .connect(  # declare the external system to connect to
> >>         Kafka()
> >>             .version("universal")
> >>             .topic("Rides")
> >>             .start_from_earliest()
> >>             .property("zookeeper.connect", "zookeeper:2181")
> >>             .property("bootstrap.servers", "kafka:9092")) \
> >>         .with_format(  # declare a format for this system
> >>         Json()
> >>             .fail_on_missing_field(True)
> >>             .schema(DataTypes.ROW([
> >>             DataTypes.FIELD("rideId", DataTypes.BIGINT()),
> >>             DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
> >>             DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
> >>             DataTypes.FIELD("lon", DataTypes.FLOAT()),
> >>             DataTypes.FIELD("lat", DataTypes.FLOAT()),
> >>             DataTypes.FIELD("psgCnt", DataTypes.INT()),
> >>             DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
> >>         .with_schema(  # declare the schema of the table
> >>         Schema()
> >>             .field("rideId", DataTypes.BIGINT())
> >>             .field("taxiId", DataTypes.BIGINT())
> >>             .field("isStart", DataTypes.BOOLEAN())
> >>             .field("lon", DataTypes.FLOAT())
> >>             .field("lat", DataTypes.FLOAT())
> >>             .field("psgCnt", DataTypes.INT())
> >>             .field("rideTime", DataTypes.TIMESTAMP())
> >>             .rowtime(
> >>             Rowtime()
> >>                 .timestamps_from_field("eventTime")
> >>                 .watermarks_periodic_bounded(60000))) \
> >>         .in_append_mode() \
> >>         .register_table_source("source")
> >> 
> >> 
> >> def register_cnt_sink(st_env):
> >>     st_env.connect(
> >>         Elasticsearch()
> >>             .version("6")
> >>             .host("elasticsearch", 9200, "http")
> >>             .index("taxiid-cnts")
> >>             .document_type('taxiidcnt')
> >>             .key_delimiter("$")) \
> >>         .with_schema(
> >>             Schema()
> >>                 .field("taxiId", DataTypes.BIGINT())
> >>                 .field("cnt", DataTypes.BIGINT())) \
> >>         .with_format(
> >>            Json()
> >>                .derive_schema()) \
> >>         .in_upsert_mode() \
> >>         .register_table_sink("sink")
> >> 
> >> 
> >> if __name__ == '__main__':
> >>     area_cnts()
> >> 
> >


Re: pyflink连接elasticsearch5.4问题

Posted by Dian Fu <di...@gmail.com>.
可以发一下完整的异常吗?

> 在 2020年6月16日,下午3:45,jack <ws...@163.com> 写道:
> 
> 连接的版本部分我本地已经修改为 5了,发生了下面的报错;
> >>     st_env.connect(
> >>         Elasticsearch()
> >>             .version("5")
> >>             .host("localhost", 9200, "http")
> >>             .index("taxiid-cnts")
> >>             .document_type('taxiidcnt')
> >>             .key_delimiter("$")) \
> 
> 
> 
> 
> 
> 在 2020-06-16 15:38:28,"Dian Fu" <di...@gmail.com> 写道:
> >I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`.
> >
> >> 在 2020年6月16日,下午1:47,jack <ws...@163.com> 写道:
> >> 
> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> >> 连接es的时候报错,findAndCreateTableSink   failed。 
> >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> >> 
> >> Caused by Could not find a suitable  factory for   ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> >> Reason: Required context properties mismatch
> >> 
> >> 
> >> 
> >> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
> >> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
> >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch
> >> 
> >> 
> >> def area_cnts():
> >>     s_env = StreamExecutionEnvironment.get_execution_environment()
> >>     s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> >>     s_env.set_parallelism(1)
> >> 
> >>     # use blink table planner
> >>     st_env = StreamTableEnvironment \
> >>         .create(s_env, environment_settings=EnvironmentSettings
> >>                 .new_instance()
> >>                 .in_streaming_mode()
> >>                 .use_blink_planner().build())
> >> 
> >>     # register source and sink
> >>     register_rides_source(st_env)
> >>     register_cnt_sink(st_env)
> >> 
> >>     # query
> >>     st_env.from_path("source")\
> >>         .group_by("taxiId")\
> >>         .select("taxiId, count(1) as cnt")\
> >>         .insert_into("sink")
> >> 
> >>     # execute
> >>     st_env.execute("6-write_with_elasticsearch")
> >> 
> >> 
> >> def register_rides_source(st_env):
> >>     st_env \
> >>         .connect(  # declare the external system to connect to
> >>         Kafka()
> >>             .version("universal")
> >>             .topic("Rides")
> >>             .start_from_earliest()
> >>             .property("zookeeper.connect", "zookeeper:2181")
> >>             .property("bootstrap.servers", "kafka:9092")) \
> >>         .with_format(  # declare a format for this system
> >>         Json()
> >>             .fail_on_missing_field(True)
> >>             .schema(DataTypes.ROW([
> >>             DataTypes.FIELD("rideId", DataTypes.BIGINT()),
> >>             DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
> >>             DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
> >>             DataTypes.FIELD("lon", DataTypes.FLOAT()),
> >>             DataTypes.FIELD("lat", DataTypes.FLOAT()),
> >>             DataTypes.FIELD("psgCnt", DataTypes.INT()),
> >>             DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
> >>         .with_schema(  # declare the schema of the table
> >>         Schema()
> >>             .field("rideId", DataTypes.BIGINT())
> >>             .field("taxiId", DataTypes.BIGINT())
> >>             .field("isStart", DataTypes.BOOLEAN())
> >>             .field("lon", DataTypes.FLOAT())
> >>             .field("lat", DataTypes.FLOAT())
> >>             .field("psgCnt", DataTypes.INT())
> >>             .field("rideTime", DataTypes.TIMESTAMP())
> >>             .rowtime(
> >>             Rowtime()
> >>                 .timestamps_from_field("eventTime")
> >>                 .watermarks_periodic_bounded(60000))) \
> >>         .in_append_mode() \
> >>         .register_table_source("source")
> >> 
> >> 
> >> def register_cnt_sink(st_env):
> >>     st_env.connect(
> >>         Elasticsearch()
> >>             .version("6")
> >>             .host("elasticsearch", 9200, "http")
> >>             .index("taxiid-cnts")
> >>             .document_type('taxiidcnt')
> >>             .key_delimiter("$")) \
> >>         .with_schema(
> >>             Schema()
> >>                 .field("taxiId", DataTypes.BIGINT())
> >>                 .field("cnt", DataTypes.BIGINT())) \
> >>         .with_format(
> >>            Json()
> >>                .derive_schema()) \
> >>         .in_upsert_mode() \
> >>         .register_table_sink("sink")
> >> 
> >> 
> >> if __name__ == '__main__':
> >>     area_cnts()
> >> 
> >


Re:Re: pyflink连接elasticsearch5.4问题

Posted by jack <ws...@163.com>.
连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>>     st_env.connect(
>>         Elasticsearch()
>>             .version("5")
>>             .host("localhost", 9200, "http")
>>             .index("taxiid-cnts")
>>             .document_type('taxiidcnt')
>>             .key_delimiter("$")) \














在 2020-06-16 15:38:28,"Dian Fu" <di...@gmail.com> 写道:
>I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`.
>
>> 在 2020年6月16日,下午1:47,jack <ws...@163.com> 写道:
>> 
>> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
>> 连接es的时候报错,findAndCreateTableSink   failed。 
>> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
>> 
>> Caused by Could not find a suitable  factory for   ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
>> Reason: Required context properties mismatch
>> 
>> 
>> 
>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch
>> 
>> 
>> def area_cnts():
>>     s_env = StreamExecutionEnvironment.get_execution_environment()
>>     s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>     s_env.set_parallelism(1)
>> 
>>     # use blink table planner
>>     st_env = StreamTableEnvironment \
>>         .create(s_env, environment_settings=EnvironmentSettings
>>                 .new_instance()
>>                 .in_streaming_mode()
>>                 .use_blink_planner().build())
>> 
>>     # register source and sink
>>     register_rides_source(st_env)
>>     register_cnt_sink(st_env)
>> 
>>     # query
>>     st_env.from_path("source")\
>>         .group_by("taxiId")\
>>         .select("taxiId, count(1) as cnt")\
>>         .insert_into("sink")
>> 
>>     # execute
>>     st_env.execute("6-write_with_elasticsearch")
>> 
>> 
>> def register_rides_source(st_env):
>>     st_env \
>>         .connect(  # declare the external system to connect to
>>         Kafka()
>>             .version("universal")
>>             .topic("Rides")
>>             .start_from_earliest()
>>             .property("zookeeper.connect", "zookeeper:2181")
>>             .property("bootstrap.servers", "kafka:9092")) \
>>         .with_format(  # declare a format for this system
>>         Json()
>>             .fail_on_missing_field(True)
>>             .schema(DataTypes.ROW([
>>             DataTypes.FIELD("rideId", DataTypes.BIGINT()),
>>             DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
>>             DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
>>             DataTypes.FIELD("lon", DataTypes.FLOAT()),
>>             DataTypes.FIELD("lat", DataTypes.FLOAT()),
>>             DataTypes.FIELD("psgCnt", DataTypes.INT()),
>>             DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
>>         .with_schema(  # declare the schema of the table
>>         Schema()
>>             .field("rideId", DataTypes.BIGINT())
>>             .field("taxiId", DataTypes.BIGINT())
>>             .field("isStart", DataTypes.BOOLEAN())
>>             .field("lon", DataTypes.FLOAT())
>>             .field("lat", DataTypes.FLOAT())
>>             .field("psgCnt", DataTypes.INT())
>>             .field("rideTime", DataTypes.TIMESTAMP())
>>             .rowtime(
>>             Rowtime()
>>                 .timestamps_from_field("eventTime")
>>                 .watermarks_periodic_bounded(60000))) \
>>         .in_append_mode() \
>>         .register_table_source("source")
>> 
>> 
>> def register_cnt_sink(st_env):
>>     st_env.connect(
>>         Elasticsearch()
>>             .version("6")
>>             .host("elasticsearch", 9200, "http")
>>             .index("taxiid-cnts")
>>             .document_type('taxiidcnt')
>>             .key_delimiter("$")) \
>>         .with_schema(
>>             Schema()
>>                 .field("taxiId", DataTypes.BIGINT())
>>                 .field("cnt", DataTypes.BIGINT())) \
>>         .with_format(
>>            Json()
>>                .derive_schema()) \
>>         .in_upsert_mode() \
>>         .register_table_sink("sink")
>> 
>> 
>> if __name__ == '__main__':
>>     area_cnts()
>> 
>

Re:Re: pyflink连接elasticsearch5.4问题

Posted by jack <ws...@163.com>.
连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>>     st_env.connect(
>>         Elasticsearch()
>>             .version("5")
>>             .host("localhost", 9200, "http")
>>             .index("taxiid-cnts")
>>             .document_type('taxiidcnt')
>>             .key_delimiter("$")) \














在 2020-06-16 15:38:28,"Dian Fu" <di...@gmail.com> 写道:
>I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`.
>
>> 在 2020年6月16日,下午1:47,jack <ws...@163.com> 写道:
>> 
>> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
>> 连接es的时候报错,findAndCreateTableSink   failed。 
>> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
>> 
>> Caused by Could not find a suitable  factory for   ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
>> Reason: Required context properties mismatch
>> 
>> 
>> 
>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch
>> 
>> 
>> def area_cnts():
>>     s_env = StreamExecutionEnvironment.get_execution_environment()
>>     s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>     s_env.set_parallelism(1)
>> 
>>     # use blink table planner
>>     st_env = StreamTableEnvironment \
>>         .create(s_env, environment_settings=EnvironmentSettings
>>                 .new_instance()
>>                 .in_streaming_mode()
>>                 .use_blink_planner().build())
>> 
>>     # register source and sink
>>     register_rides_source(st_env)
>>     register_cnt_sink(st_env)
>> 
>>     # query
>>     st_env.from_path("source")\
>>         .group_by("taxiId")\
>>         .select("taxiId, count(1) as cnt")\
>>         .insert_into("sink")
>> 
>>     # execute
>>     st_env.execute("6-write_with_elasticsearch")
>> 
>> 
>> def register_rides_source(st_env):
>>     st_env \
>>         .connect(  # declare the external system to connect to
>>         Kafka()
>>             .version("universal")
>>             .topic("Rides")
>>             .start_from_earliest()
>>             .property("zookeeper.connect", "zookeeper:2181")
>>             .property("bootstrap.servers", "kafka:9092")) \
>>         .with_format(  # declare a format for this system
>>         Json()
>>             .fail_on_missing_field(True)
>>             .schema(DataTypes.ROW([
>>             DataTypes.FIELD("rideId", DataTypes.BIGINT()),
>>             DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
>>             DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
>>             DataTypes.FIELD("lon", DataTypes.FLOAT()),
>>             DataTypes.FIELD("lat", DataTypes.FLOAT()),
>>             DataTypes.FIELD("psgCnt", DataTypes.INT()),
>>             DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
>>         .with_schema(  # declare the schema of the table
>>         Schema()
>>             .field("rideId", DataTypes.BIGINT())
>>             .field("taxiId", DataTypes.BIGINT())
>>             .field("isStart", DataTypes.BOOLEAN())
>>             .field("lon", DataTypes.FLOAT())
>>             .field("lat", DataTypes.FLOAT())
>>             .field("psgCnt", DataTypes.INT())
>>             .field("rideTime", DataTypes.TIMESTAMP())
>>             .rowtime(
>>             Rowtime()
>>                 .timestamps_from_field("eventTime")
>>                 .watermarks_periodic_bounded(60000))) \
>>         .in_append_mode() \
>>         .register_table_source("source")
>> 
>> 
>> def register_cnt_sink(st_env):
>>     st_env.connect(
>>         Elasticsearch()
>>             .version("6")
>>             .host("elasticsearch", 9200, "http")
>>             .index("taxiid-cnts")
>>             .document_type('taxiidcnt')
>>             .key_delimiter("$")) \
>>         .with_schema(
>>             Schema()
>>                 .field("taxiId", DataTypes.BIGINT())
>>                 .field("cnt", DataTypes.BIGINT())) \
>>         .with_format(
>>            Json()
>>                .derive_schema()) \
>>         .in_upsert_mode() \
>>         .register_table_sink("sink")
>> 
>> 
>> if __name__ == '__main__':
>>     area_cnts()
>> 
>

Re: pyflink连接elasticsearch5.4问题

Posted by Dian Fu <di...@gmail.com>.
I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`.

> 在 2020年6月16日,下午1:47,jack <ws...@163.com> 写道:
> 
> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> 连接es的时候报错,findAndCreateTableSink   failed。 
> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> 
> Caused by Could not find a suitable  factory for   ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> Reason: Required context properties mismatch
> 
> 
> 
> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch
> 
> 
> def area_cnts():
>     s_env = StreamExecutionEnvironment.get_execution_environment()
>     s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>     s_env.set_parallelism(1)
> 
>     # use blink table planner
>     st_env = StreamTableEnvironment \
>         .create(s_env, environment_settings=EnvironmentSettings
>                 .new_instance()
>                 .in_streaming_mode()
>                 .use_blink_planner().build())
> 
>     # register source and sink
>     register_rides_source(st_env)
>     register_cnt_sink(st_env)
> 
>     # query
>     st_env.from_path("source")\
>         .group_by("taxiId")\
>         .select("taxiId, count(1) as cnt")\
>         .insert_into("sink")
> 
>     # execute
>     st_env.execute("6-write_with_elasticsearch")
> 
> 
> def register_rides_source(st_env):
>     st_env \
>         .connect(  # declare the external system to connect to
>         Kafka()
>             .version("universal")
>             .topic("Rides")
>             .start_from_earliest()
>             .property("zookeeper.connect", "zookeeper:2181")
>             .property("bootstrap.servers", "kafka:9092")) \
>         .with_format(  # declare a format for this system
>         Json()
>             .fail_on_missing_field(True)
>             .schema(DataTypes.ROW([
>             DataTypes.FIELD("rideId", DataTypes.BIGINT()),
>             DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
>             DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
>             DataTypes.FIELD("lon", DataTypes.FLOAT()),
>             DataTypes.FIELD("lat", DataTypes.FLOAT()),
>             DataTypes.FIELD("psgCnt", DataTypes.INT()),
>             DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
>         .with_schema(  # declare the schema of the table
>         Schema()
>             .field("rideId", DataTypes.BIGINT())
>             .field("taxiId", DataTypes.BIGINT())
>             .field("isStart", DataTypes.BOOLEAN())
>             .field("lon", DataTypes.FLOAT())
>             .field("lat", DataTypes.FLOAT())
>             .field("psgCnt", DataTypes.INT())
>             .field("rideTime", DataTypes.TIMESTAMP())
>             .rowtime(
>             Rowtime()
>                 .timestamps_from_field("eventTime")
>                 .watermarks_periodic_bounded(60000))) \
>         .in_append_mode() \
>         .register_table_source("source")
> 
> 
> def register_cnt_sink(st_env):
>     st_env.connect(
>         Elasticsearch()
>             .version("6")
>             .host("elasticsearch", 9200, "http")
>             .index("taxiid-cnts")
>             .document_type('taxiidcnt')
>             .key_delimiter("$")) \
>         .with_schema(
>             Schema()
>                 .field("taxiId", DataTypes.BIGINT())
>                 .field("cnt", DataTypes.BIGINT())) \
>         .with_format(
>            Json()
>                .derive_schema()) \
>         .in_upsert_mode() \
>         .register_table_sink("sink")
> 
> 
> if __name__ == '__main__':
>     area_cnts()
> 


Re: pyflink连接elasticsearch5.4问题

Posted by Dian Fu <di...@gmail.com>.
I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`.

> 在 2020年6月16日,下午1:47,jack <ws...@163.com> 写道:
> 
> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> 连接es的时候报错,findAndCreateTableSink   failed。 
> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> 
> Caused by Could not find a suitable  factory for   ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> Reason: Required context properties mismatch
> 
> 
> 
> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch
> 
> 
> def area_cnts():
>     s_env = StreamExecutionEnvironment.get_execution_environment()
>     s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>     s_env.set_parallelism(1)
> 
>     # use blink table planner
>     st_env = StreamTableEnvironment \
>         .create(s_env, environment_settings=EnvironmentSettings
>                 .new_instance()
>                 .in_streaming_mode()
>                 .use_blink_planner().build())
> 
>     # register source and sink
>     register_rides_source(st_env)
>     register_cnt_sink(st_env)
> 
>     # query
>     st_env.from_path("source")\
>         .group_by("taxiId")\
>         .select("taxiId, count(1) as cnt")\
>         .insert_into("sink")
> 
>     # execute
>     st_env.execute("6-write_with_elasticsearch")
> 
> 
> def register_rides_source(st_env):
>     st_env \
>         .connect(  # declare the external system to connect to
>         Kafka()
>             .version("universal")
>             .topic("Rides")
>             .start_from_earliest()
>             .property("zookeeper.connect", "zookeeper:2181")
>             .property("bootstrap.servers", "kafka:9092")) \
>         .with_format(  # declare a format for this system
>         Json()
>             .fail_on_missing_field(True)
>             .schema(DataTypes.ROW([
>             DataTypes.FIELD("rideId", DataTypes.BIGINT()),
>             DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
>             DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
>             DataTypes.FIELD("lon", DataTypes.FLOAT()),
>             DataTypes.FIELD("lat", DataTypes.FLOAT()),
>             DataTypes.FIELD("psgCnt", DataTypes.INT()),
>             DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
>         .with_schema(  # declare the schema of the table
>         Schema()
>             .field("rideId", DataTypes.BIGINT())
>             .field("taxiId", DataTypes.BIGINT())
>             .field("isStart", DataTypes.BOOLEAN())
>             .field("lon", DataTypes.FLOAT())
>             .field("lat", DataTypes.FLOAT())
>             .field("psgCnt", DataTypes.INT())
>             .field("rideTime", DataTypes.TIMESTAMP())
>             .rowtime(
>             Rowtime()
>                 .timestamps_from_field("eventTime")
>                 .watermarks_periodic_bounded(60000))) \
>         .in_append_mode() \
>         .register_table_source("source")
> 
> 
> def register_cnt_sink(st_env):
>     st_env.connect(
>         Elasticsearch()
>             .version("6")
>             .host("elasticsearch", 9200, "http")
>             .index("taxiid-cnts")
>             .document_type('taxiidcnt')
>             .key_delimiter("$")) \
>         .with_schema(
>             Schema()
>                 .field("taxiId", DataTypes.BIGINT())
>                 .field("cnt", DataTypes.BIGINT())) \
>         .with_format(
>            Json()
>                .derive_schema()) \
>         .in_upsert_mode() \
>         .register_table_sink("sink")
> 
> 
> if __name__ == '__main__':
>     area_cnts()
>