You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by "yuemeng (Jira)" <ji...@apache.org> on 2019/11/19 07:38:00 UTC

[jira] [Updated] (BAHIR-220) Add redis descriptor for flink sql environment

     [ https://issues.apache.org/jira/browse/BAHIR-220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

yuemeng updated BAHIR-220:
--------------------------
    Description: 
currently, for Flink-1.9.0, we can use the catalog to store our stream table source and sink meta.
for Redis connector, it should exist a Redis table sink so we can register it to catalog, and use Redis as a table in SQL environment

{code}
Redis redis = new Redis()
                .mode(RedisVadidator.REDIS_CLUSTER)
                .command(RedisCommand.INCRBY_EX.name())
                .ttl(100000)
                .property(RedisVadidator.REDIS_NODES, REDIS_HOST+ ":" + REDIS_PORT);
tableEnvironment
                .connect(redis).withSchema(new Schema()
                .field("k", TypeInformation.of(String.class))
                .field("v", TypeInformation.of(Long.class)))
                .registerTableSink("redis");
tableEnvironment.sqlUpdate("insert into redis select k, v from t1");
env.execute("Test Redis Table");

{code}




  was:
currently, for Flink-1.9.0, we can use the catalog to store our stream table source and sink meta.
for Redis connector, it should exist a Redis table sink so we can register it to catalog, and use Redis as a table in SQL environment

{code}
Redis redis = new Redis()
                .mode(RedisVadidator.REDIS_CLUSTER)
                .command(RedisCommand.INCRBY_EX.name())
                .ttl(100000)
                .property(RedisVadidator.REDIS_NODES, REDIS_HOST+ ":" + REDIS_PORT);

        tableEnvironment
                .connect(redis).withSchema(new Schema()
                .field("k", TypeInformation.of(String.class))
                .field("v", TypeInformation.of(Long.class)))
                .registerTableSink("redis");


        tableEnvironment.sqlUpdate("insert into redis select k, v from t1");
        env.execute("Test Redis Set Data Type With TTL");



{code}





> Add redis descriptor for flink sql environment
> ----------------------------------------------
>
>                 Key: BAHIR-220
>                 URL: https://issues.apache.org/jira/browse/BAHIR-220
>             Project: Bahir
>          Issue Type: Improvement
>          Components: Flink Streaming Connectors
>    Affects Versions: Flink-1.0
>            Reporter: yuemeng
>            Priority: Major
>
> currently, for Flink-1.9.0, we can use the catalog to store our stream table source and sink meta.
> for Redis connector, it should exist a Redis table sink so we can register it to catalog, and use Redis as a table in SQL environment
> {code}
> Redis redis = new Redis()
>                 .mode(RedisVadidator.REDIS_CLUSTER)
>                 .command(RedisCommand.INCRBY_EX.name())
>                 .ttl(100000)
>                 .property(RedisVadidator.REDIS_NODES, REDIS_HOST+ ":" + REDIS_PORT);
> tableEnvironment
>                 .connect(redis).withSchema(new Schema()
>                 .field("k", TypeInformation.of(String.class))
>                 .field("v", TypeInformation.of(Long.class)))
>                 .registerTableSink("redis");
> tableEnvironment.sqlUpdate("insert into redis select k, v from t1");
> env.execute("Test Redis Table");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)