You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 淘宝龙安 <re...@gmail.com> on 2019/11/05 05:53:40 UTC

How can I update data by flink-connector-elasticsearch6_2.11 in join scenario

hi, All

I register two table.

user_info
--------------------------
user_id  |  varchar |
--------------------------
name     |  varchar |
-------------------------


user_order
------------------------------
order_id  |    varchar  |
------------------------------
user_id   |     varchar  |
------------------------------
price       |   varchar    |
------------------------------


then I user flink Table&SQL API to join these table,

"select user_info.user_id as user_id, name, price, order_id from user_info
join user_order on user_order.user_id = user_info.user_id"


finally  I emit join data to elasticsearch cluster.

then I run my flink program and  insert two user to user_info and one order
to user_order.


[image: image.png]

[image: image.png]


In elasticsearch result is

[image: image.png]

my question is

1.    how can i update the price? when I insert another record into
user_order, it not works correctly
update the price from 23.00 to 46.00 (order_id : 111).

[image: image.png]

then i got two records.

[image: image.png]

it seems this program do not defined the unique key fields. But i cann’t
find information in flink docement .
In code source , it says
If the table does not have a key and is append-only, the keys attribute is
null. .

However it not works in join scenario


2.   If the data come from kafka  and syncs from mysql binlog.
      I submit my flink job on  2019-11-05 : 21:00:00.  Then,how can i join
with these users  in mysql but never appeared in  kafka streaming. ( kafka
offset from 2019-11-05 : 21:00:00.)
     (kafkaConsumer011.setStartFromGroupOffsets() not setStartFromEarliest
).




Thanks .

my code

public class TestTwoStreamJoin {

    @Test
    void testTowStreamJoin() throws Exception {
        EnvironmentSettings fsSettings =
EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
        StreamExecutionEnvironment fsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment fsTableEnv =
StreamTableEnvironment.create(fsEnv, fsSettings);
        DataStream<String> d1 = fsEnv.socketTextStream("localhost", 9000);
        String[] fieldNames = new String[]{"user_id", "name"};
        TypeInformation[] types = new TypeInformation[]{Types.STRING,
Types.STRING};
        DataStream<Row> t1 = getRows(d1, fieldNames, types);
        DataStream<String> d2 = fsEnv.socketTextStream("localhost", 9001);
        String[] field2 = new String[]{"order_id", "user_id", "price"};
        TypeInformation[] types2 = new TypeInformation[]{Types.STRING,
Types.STRING, Types.STRING};
        DataStream<Row> t2 = getRows(d2, field2, types2);
        fsTableEnv.registerTable("user_info", fsTableEnv.fromDataStream(t1));
        fsTableEnv.registerTable("user_order", fsTableEnv.fromDataStream(t2));
            String joinSql = "select user_info.user_id as user_id,
name, price, order_id from user_info join user_order on
user_order.user_id = user_info.user_id";
        Table t3 = fsTableEnv.sqlQuery(joinSql);
        fsTableEnv.toAppendStream(t3, Row.class).print();
        String[] outputFields = new
String[]{"user_id","name","price","order_id"};
        TypeInformation[] outputTypes = new
TypeInformation[]{Types.STRING, Types.STRING,
Types.STRING,Types.STRING};
        fsTableEnv.connect(new Elasticsearch()
                .version("6")
                .host("127.0.0.1", 9200, "http")
                .index("test_index")
                .documentType("user")
                .failureHandlerIgnore()
                .bulkFlushInterval(1000)
                .bulkFlushMaxActions(1)
        )
        .withSchema(new Schema().schema(new TableSchema(outputFields,
outputTypes)))
        .withFormat(new Json().schema(new RowTypeInfo(outputTypes,
outputFields)))
        .inUpsertMode()
        .registerTableSink("output");
        fsTableEnv.sqlUpdate("insert into output " + joinSql);
        fsTableEnv.execute("job");
    }

    public DataStream<Row> getRows(DataStream<String> dataStream,
String[] f, TypeInformation[] t) {
        DataStream<Row> r1 = dataStream.map(new MapFunction<String, Row>() {
            @Override
            public Row map(String value) throws Exception {
                String[] v = value.split(",");
                Row r  = new Row(v.length);
                for (int i = 0; i<v.length; i++) {
                    r.setField(i, v[i]);
                }
                return r;
            }
        }).returns(new RowTypeInfo(t,f));
        return r1;
    }
}