You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Curtis Jensen <cu...@gmail.com> on 2023/01/27 18:17:14 UTC
Multiple Window Streams to same Kinesis Sink
I'm trying to sink two Window Streams to the same Kinesis Sink. When
I do this, no results are making it to the sink (code below). If I
remove one of the windows from the Job, results do get published.
Adding another stream to the sink seems to void both.
How can I have results from both Window Streams go to the same sink?
Thanks
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper jsonParser = new ObjectMapper();
DataStream<String> inputStream = createKinesisSource(env);
FlinkKinesisProducer<String> kinesisSink = createKinesisSink();
WindowedStream oneMinStream = inputStream
.map(value -> jsonParser.readValue(value, JsonNode.class))
.keyBy(node -> node.get("accountId"))
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)));
oneMinStream
.aggregate(new LoginAggregator("k1m"))
.addSink(kinesisSink);
WindowedStream twoMinStream = inputStream
.map(value -> jsonParser.readValue(value, JsonNode.class))
.keyBy(node -> node.get("accountId"))
.window(TumblingProcessingTimeWindows.of(Time.minutes(2)));
twoMinStream
.aggregate(new LoginAggregator("k2m"))
.addSink(kinesisSink);
try {
env.execute("Flink Kinesis Streaming Sink Job");
} catch (Exception e) {
LOG.error("failed");
LOG.error(e.getLocalizedMessage());
LOG.error(e.getStackTrace().toString());
throw e;
}
}
private static DataStream<String>
createKinesisSource(StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
new SimpleStringSchema(), inputProperties));
}
private static FlinkKinesisProducer<String> createKinesisSink() {
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
outputProperties.setProperty("AggregationEnabled", "false");
FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new
SimpleStringSchema(), outputProperties);
sink.setDefaultStream(outputStreamName);
sink.setDefaultPartition(UUID.randomUUID().toString());
return sink;
}