You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Curtis Jensen <cu...@gmail.com> on 2023/01/27 18:17:14 UTC

Multiple Window Streams to same Kinesis Sink

I'm trying to sink two Window Streams to the same Kinesis Sink.  When
I do this, no results are making it to the sink (code below).  If I
remove one of the windows from the Job, results do get published.
Adding another stream to the sink seems to void both.

How can I have results from both Window Streams go to the same sink?

Thanks


public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

ObjectMapper jsonParser = new ObjectMapper();

DataStream<String> inputStream = createKinesisSource(env);
FlinkKinesisProducer<String> kinesisSink = createKinesisSink();

WindowedStream oneMinStream = inputStream
        .map(value -> jsonParser.readValue(value, JsonNode.class))
        .keyBy(node -> node.get("accountId"))
        .window(TumblingProcessingTimeWindows.of(Time.minutes(1)));

oneMinStream
        .aggregate(new LoginAggregator("k1m"))
        .addSink(kinesisSink);

WindowedStream twoMinStream = inputStream
        .map(value -> jsonParser.readValue(value, JsonNode.class))
        .keyBy(node -> node.get("accountId"))
        .window(TumblingProcessingTimeWindows.of(Time.minutes(2)));

twoMinStream
        .aggregate(new LoginAggregator("k2m"))
        .addSink(kinesisSink);

try {
        env.execute("Flink Kinesis Streaming Sink Job");
    } catch (Exception e) {
        LOG.error("failed");
        LOG.error(e.getLocalizedMessage());
        LOG.error(e.getStackTrace().toString());

        throw e;
    }
}


private static DataStream<String>
createKinesisSource(StreamExecutionEnvironment env) {
    Properties inputProperties = new Properties();
    inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
    inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
new SimpleStringSchema(), inputProperties));
}

private static FlinkKinesisProducer<String> createKinesisSink() {
    Properties outputProperties = new Properties();
    outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
    outputProperties.setProperty("AggregationEnabled", "false");

    FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new
SimpleStringSchema(), outputProperties);
    sink.setDefaultStream(outputStreamName);
    sink.setDefaultPartition(UUID.randomUUID().toString());

    return sink;
}