You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/09 16:59:11 UTC

[5/9] flink git commit: [hotfix] Don't use deprecated writeWithTimestamps in Kafka 0.10 tests

[hotfix] Don't use deprecated writeWithTimestamps in Kafka 0.10 tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a3621b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a3621b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a3621b8

Branch: refs/heads/master
Commit: 9a3621b842d2bf6b76e394f1412dd27475180ac2
Parents: 08bfdae
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 28 14:53:24 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../streaming/connectors/kafka/KafkaTestEnvironmentImpl.java    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9a3621b8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 5a5caad..d0e935b 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -168,7 +168,10 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 	@Override
 	public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
-		return FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props);
+		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props);
+		prod.setFlushOnCheckpoint(true);
+		prod.setWriteTimestampToKafka(true);
+		return stream.addSink(prod);
 	}
 
 	@Override