You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Edward Bingham <ed...@siden.io> on 2021/04/14 22:08:14 UTC
Flink 1.11 FlinkKafkaConsumer not propagating watermarks
Hi everyone,
I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some
Flink processors using Flink 1.12, and tried to get them working on Amazon
EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I
went to downgrade, I found, inexplicably, that watermarks were no longer
propagating.
There is only one partition on the topic, and parallelism is set to 1. Is
there something I'm missing here? I feel like I'm going a bit crazy.
I've cross-posted this on stackoverflow, but I figure the mailing list is
probably a better avenue for this question.
Thanks,
Ned
Here's the output for Flink 1.12 (correctly propagating the watermark):
input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 2,
"type" : "Process",
"pact" : "Operator",
"contents" : "Process",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=86400000 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=864000000 watermark=0] "test message"
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=8640000000 watermark=777600000] "test message"
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=86400000000 watermark=8553600000] "test message"
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=9223372036854775807 watermark=86313600000] "test message"
Emitting watermark 9223372036768375807
And here is the output for Flink 1.11 (not propagating the watermark):
input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 2,
"type" : "Process",
"pact" : "Operator",
"contents" : "Process",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 9223372036768375807
Here's the integration test that exposes it:
package mytest;
import com.fasterxml.jackson.core.JsonProcessingException;import
com.fasterxml.jackson.databind.ObjectMapper;
import java.io.FileInputStream;import java.io.InputStream;import
java.io.IOException;
import java.nio.file.Files;import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Arrays;import
java.util.concurrent.CompletableFuture;import
java.util.concurrent.TimeUnit;import java.util.Date;import
java.util.HashMap;import java.util.Map;import java.util.Properties;
import kafka.server.KafkaConfig;import kafka.server.KafkaServer;
import kafka.utils.MockTime;import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.flink.api.common.eventtime.TimestampAssigner;import
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;import
org.apache.flink.api.common.eventtime.Watermark;import
org.apache.flink.api.common.eventtime.WatermarkGenerator;import
org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;import
org.apache.flink.api.common.eventtime.WatermarkOutput;import
org.apache.flink.api.common.eventtime.WatermarkStrategy;import
org.apache.flink.api.common.JobExecutionResult;import
org.apache.flink.api.common.serialization.SimpleStringSchema;import
org.apache.flink.core.execution.JobClient;import
org.apache.flink.runtime.client.JobCancellationException;import
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.streaming.api.functions.ProcessFunction;import
org.apache.flink.streaming.api.functions.ProcessFunction.Context;import
org.apache.flink.streaming.api.TimerService;import
org.apache.flink.streaming.api.windowing.time.Time;import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import
org.apache.flink.test.util.MiniClusterWithClientResource;import
org.apache.flink.util.Collector;import
org.apache.kafka.clients.admin.AdminClient;import
org.apache.kafka.clients.admin.CreateTopicsResult;import
org.apache.kafka.clients.admin.DescribeTopicsResult;import
org.apache.kafka.clients.admin.NewTopic;import
org.apache.kafka.clients.admin.TopicDescription;import
org.apache.kafka.clients.producer.KafkaProducer;import
org.apache.kafka.clients.producer.ProducerRecord;import
org.apache.kafka.common.serialization.Serializer;import
org.apache.kafka.common.serialization.StringDeserializer;import
org.apache.kafka.common.serialization.StringSerializer;import
org.apache.kafka.streams.StreamsConfig;
import org.junit.*;
public class FailTest {
private static EmbeddedZookeeper zooKeeper = null;
private static KafkaServer server = null;
public static AdminClient admin = null;
private static int connected = 0;
private static StringSerializer stringSerializer = new StringSerializer();
private static StringDeserializer stringDeserializer = new
StringDeserializer();
private static final Properties ZooKeeperProperties =
getZooKeeperProperties();
private static final Properties ServerProperties =
getServerProperties();
private static final Properties ProducerProperties =
getProducerProperties();
public static StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
public static Properties getProducerProperties() {
// Use Kafka provided properties
Properties result = new Properties();
result.put("bootstrap.servers", "localhost:9092");
result.put("compression.type", "none");
return result;
}
public static Properties getServerProperties() {
// Use Kafka provided properties
Properties result = new Properties();
result.put("broker.id", "0");
result.put("num.network.threads", "3");
result.put("num.io.threads", "8");
result.put("socket.send.buffer.bytes", "102400");
result.put("socket.recv.buffer.bytes", "102400");
result.put("log.dirs", "target/kafka-logs");
result.put("num.partitions", "1");
result.put("offset.topic.replication.factor", "1");
result.put("transaction.state.log.replication.factor", "1");
result.put("transaction.state.log.min.isr", "1");
result.put("auto.create.topics.enable", "true");
result.put("log.retention.hours", "168");
result.put("log.segment.bytes", "1073741824");
result.put("log.retention.check.interval.ms", "300000");
result.put("zookeeper.connect", "localhost:2181");
result.put("zookeeper.connection.timeout.ms", "18000");
result.put("group.initial.rebalance.delay.ms", "0");
return result;
}
public static Properties getZooKeeperProperties() {
// Use Kafka provided properties
Properties result = new Properties();
result.put("dataDir", "/tmp/zookeeper");
result.put("clientPort", "2181");
result.put("maxClientCnxns", "0");
result.put("admin.enableServer", "false");
return result;
}
private static Properties getNewLogDir(Properties props) {
String path = props.getProperty("log.dirs");
path = path + "/run.";
int index = 0;
boolean done = false;
while (!Files.notExists(Paths.get(path + String.valueOf(index)))) {
index += 1;
}
props.setProperty("log.dirs", path + String.valueOf(index));
return props;
}
public static class Print<V> extends ProcessFunction<V, V> {
private static final ObjectMapper mapper = new ObjectMapper();
public String prefix;
public Print() {
this.prefix = "";
}
public Print(String prefix) {
this.prefix = prefix;
}
@Override
public void processElement(V value, Context ctx, Collector<V> out) {
System.out.printf("%s ", prefix);
if (ctx != null) {
TimerService srv = ctx.timerService();
Long timestampLong = ctx.timestamp();
long timestamp = 0;
if (timestampLong != null) {
timestamp = timestampLong;
}
long watermark = 0;
if (srv != null) {
watermark = srv.currentWatermark();
}
System.out.printf("[timestamp=%d watermark=%d] ",
timestamp, watermark);
}
if (value == null) {
System.out.println("null");
} else {
try {
System.out.println(new
String(mapper.writeValueAsBytes(value)));
} catch (Exception e) {
System.out.println("exception");
e.printStackTrace();
}
}
out.collect(value);
}
}
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
@BeforeClass
public static void setup() {
env.setParallelism(1);
if (connected == 0) {
zooKeeper = new EmbeddedZookeeper();
ServerProperties.setProperty("zookeeper.connect",
"localhost:" + zooKeeper.port());
server = TestUtils.createServer(new
KafkaConfig(getNewLogDir(ServerProperties)), new MockTime());
admin = AdminClient.create(ProducerProperties);
}
connected += 1;
}
@AfterClass
public static void tearDown() {
if (connected == 1) {
try {
server.shutdown();
zooKeeper.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
zooKeeper = null;
server = null;
admin = null;
}
connected -= 1;
}
@Test
public void testFail() throws Exception {
String inputTopic = "input";
Map<String, String> configs = new HashMap<>();
int partitions = 1;
short replication = 1;
CreateTopicsResult result = admin.createTopics(Arrays.asList(
new NewTopic(inputTopic, partitions, replication).configs(configs)
));
result.all().get();
KafkaProducer<String, String> producer = new
KafkaProducer<String, String>(ProducerProperties, stringSerializer,
stringSerializer);
;
DescribeTopicsResult topics =
admin.describeTopics(Arrays.asList(inputTopic));
for (Map.Entry<String, TopicDescription> topic :
topics.all().get().entrySet()) {
System.out.printf("%s %d\n", topic.getValue().name(),
topic.getValue().partitions().size());
System.out.println(topic.getValue().toString());
}
// Some subscription events
producer.send(new ProducerRecord<String, String>(inputTopic,
0, Time.days(1).toMilliseconds(), "0", "test message"));
producer.send(new ProducerRecord<String, String>(inputTopic,
0, Time.days(10).toMilliseconds(), "0", "test message"));
producer.send(new ProducerRecord<String, String>(inputTopic,
0, Time.days(100).toMilliseconds(), "0", "test message"));
producer.send(new ProducerRecord<String, String>(inputTopic,
0, Time.days(1000).toMilliseconds(), "0", "test message"));
producer.send(new ProducerRecord<String, String>(inputTopic,
0, Long.MAX_VALUE, "0", "test message"));
producer.flush();
producer.close();
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
prop.put("group.id", "0");
prop.put("enable.auto.commit", "true");
prop.put("auto.commit.interval.ms", "1000");
prop.put("session.timeout.ms", "30000");
FlinkKafkaConsumer<String> source = new
FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(),
prop);
source.assignTimestampsAndWatermarks(
new WatermarkStrategy<String>() {
@Override
public TimestampAssigner<String>
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new TimestampAssigner<String>() {
@Override
public long extractTimestamp(String event,
long recordTimestamp) {
System.out.printf("Assigning timestamp
%d\n", recordTimestamp);
return recordTimestamp;
}
};
}
@Override
public WatermarkGenerator<String>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<String>() {
public long latestWatermark = Long.MIN_VALUE;
@Override
public void onEvent(String event, long
timestamp, WatermarkOutput output) {
long eventWatermark = timestamp -
Time.days(1).toMilliseconds();
if (eventWatermark > latestWatermark) {
System.out.printf("Emitting watermark
%d\n", eventWatermark);
output.emitWatermark(new
Watermark(eventWatermark));
latestWatermark = eventWatermark;
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
}
};
}
});
source.setStartFromEarliest();
env.addSource(source)
.process(new Print<String>("Source"));
System.out.println(env.getExecutionPlan());
JobClient client = null;
try {
client = env.executeAsync("Fail Test");
} catch (Exception e) {
e.printStackTrace();
throw e;
}
topics = admin.describeTopics(Arrays.asList(inputTopic));
for (Map.Entry<String, TopicDescription> topic :
topics.all().get().entrySet()) {
System.out.printf("%s %d\n", topic.getValue().name(),
topic.getValue().partitions().size());
System.out.println(topic.getValue().toString());
}
TimeUnit.SECONDS.sleep(5);
client.cancel().get(5, TimeUnit.SECONDS);
}
}
Re: Flink 1.11 FlinkKafkaConsumer not propagating watermarks
Posted by Arvid Heise <ar...@apache.org>.
For reference: self answered on [1].
Turns out that Flink 1.12 defaults the TimeCharacteristic to EventTime and
> deprecates the whole TimeCharacteristic flow. So to downgrade to Flink
> 1.11, you must add the following statement to configure the
> StreamExecutionEnvironment.
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
[1] https://stackoverflow.com/a/67111541/10299342
On Thu, Apr 15, 2021 at 12:08 AM Edward Bingham <ed...@siden.io>
wrote:
> Hi everyone,
>
> I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some
> Flink processors using Flink 1.12, and tried to get them working on Amazon
> EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I
> went to downgrade, I found, inexplicably, that watermarks were no longer
> propagating.
>
> There is only one partition on the topic, and parallelism is set to 1. Is
> there something I'm missing here? I feel like I'm going a bit crazy.
>
> I've cross-posted this on stackoverflow, but I figure the mailing list is
> probably a better avenue for this question.
>
> Thanks,
> Ned
>
>
> Here's the output for Flink 1.12 (correctly propagating the watermark):
>
> input 1
> (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
> {
> "nodes" : [ {
> "id" : 1,
> "type" : "Source: Custom Source",
> "pact" : "Data Source",
> "contents" : "Source: Custom Source",
> "parallelism" : 1
> }, {
> "id" : 2,
> "type" : "Process",
> "pact" : "Operator",
> "contents" : "Process",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 1,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> } ]
> }
> input 1
> (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
> Assigning timestamp 86400000
> Source [timestamp=86400000 watermark=-9223372036854775808] "test message"
> Emitting watermark 0
> Assigning timestamp 864000000
> Source [timestamp=864000000 watermark=0] "test message"
> Emitting watermark 777600000
> Assigning timestamp 8640000000
> Source [timestamp=8640000000 watermark=777600000] "test message"
> Emitting watermark 8553600000
> Assigning timestamp 86400000000
> Source [timestamp=86400000000 watermark=8553600000] "test message"
> Emitting watermark 86313600000
> Assigning timestamp 9223372036854775807
> Source [timestamp=9223372036854775807 watermark=86313600000] "test message"
> Emitting watermark 9223372036768375807
>
> And here is the output for Flink 1.11 (not propagating the watermark):
>
> input 1
> (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
> {
> "nodes" : [ {
> "id" : 1,
> "type" : "Source: Custom Source",
> "pact" : "Data Source",
> "contents" : "Source: Custom Source",
> "parallelism" : 1
> }, {
> "id" : 2,
> "type" : "Process",
> "pact" : "Operator",
> "contents" : "Process",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 1,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> } ]
> }
> input 1
> (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
> Assigning timestamp 86400000
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 0
> Assigning timestamp 864000000
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 777600000
> Assigning timestamp 8640000000
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 8553600000
> Assigning timestamp 86400000000
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 86313600000
> Assigning timestamp 9223372036854775807
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 9223372036768375807
>
> Here's the integration test that exposes it:
>
> package mytest;
> import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;
> import java.io.FileInputStream;import java.io.InputStream;import java.io.IOException;
> import java.nio.file.Files;import java.nio.file.Paths;
> import java.text.SimpleDateFormat;
> import java.util.Arrays;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.Properties;
> import kafka.server.KafkaConfig;import kafka.server.KafkaServer;
> import kafka.utils.MockTime;import kafka.utils.TestUtils;
> import kafka.zk.EmbeddedZookeeper;
> import org.apache.flink.api.common.eventtime.TimestampAssigner;import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;import org.apache.flink.api.common.eventtime.Watermark;import org.apache.flink.api.common.eventtime.WatermarkGenerator;import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;import org.apache.flink.api.common.eventtime.WatermarkOutput;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.JobExecutionResult;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.core.execution.JobClient;import org.apache.flink.runtime.client.JobCancellationException;import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.streaming.api.functions.ProcessFunction.Context;import org.apache.flink.streaming.api.TimerService;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.test.util.MiniClusterWithClientResource;import org.apache.flink.util.Collector;import org.apache.kafka.clients.admin.AdminClient;import org.apache.kafka.clients.admin.CreateTopicsResult;import org.apache.kafka.clients.admin.DescribeTopicsResult;import org.apache.kafka.clients.admin.NewTopic;import org.apache.kafka.clients.admin.TopicDescription;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.Serializer;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import org.apache.kafka.streams.StreamsConfig;
> import org.junit.*;
> public class FailTest {
> private static EmbeddedZookeeper zooKeeper = null;
> private static KafkaServer server = null;
> public static AdminClient admin = null;
> private static int connected = 0;
>
> private static StringSerializer stringSerializer = new StringSerializer();
> private static StringDeserializer stringDeserializer = new StringDeserializer();
>
> private static final Properties ZooKeeperProperties = getZooKeeperProperties();
> private static final Properties ServerProperties = getServerProperties();
> private static final Properties ProducerProperties = getProducerProperties();
>
> public static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
> public static Properties getProducerProperties() {
> // Use Kafka provided properties
> Properties result = new Properties();
> result.put("bootstrap.servers", "localhost:9092");
> result.put("compression.type", "none");
> return result;
> }
>
> public static Properties getServerProperties() {
> // Use Kafka provided properties
> Properties result = new Properties();
> result.put("broker.id", "0");
> result.put("num.network.threads", "3");
> result.put("num.io.threads", "8");
> result.put("socket.send.buffer.bytes", "102400");
> result.put("socket.recv.buffer.bytes", "102400");
> result.put("log.dirs", "target/kafka-logs");
> result.put("num.partitions", "1");
> result.put("offset.topic.replication.factor", "1");
> result.put("transaction.state.log.replication.factor", "1");
> result.put("transaction.state.log.min.isr", "1");
> result.put("auto.create.topics.enable", "true");
> result.put("log.retention.hours", "168");
> result.put("log.segment.bytes", "1073741824");
> result.put("log.retention.check.interval.ms", "300000");
> result.put("zookeeper.connect", "localhost:2181");
> result.put("zookeeper.connection.timeout.ms", "18000");
> result.put("group.initial.rebalance.delay.ms", "0");
> return result;
> }
>
> public static Properties getZooKeeperProperties() {
> // Use Kafka provided properties
> Properties result = new Properties();
> result.put("dataDir", "/tmp/zookeeper");
> result.put("clientPort", "2181");
> result.put("maxClientCnxns", "0");
> result.put("admin.enableServer", "false");
> return result;
> }
>
> private static Properties getNewLogDir(Properties props) {
> String path = props.getProperty("log.dirs");
> path = path + "/run.";
> int index = 0;
> boolean done = false;
> while (!Files.notExists(Paths.get(path + String.valueOf(index)))) {
> index += 1;
> }
> props.setProperty("log.dirs", path + String.valueOf(index));
> return props;
> }
>
> public static class Print<V> extends ProcessFunction<V, V> {
> private static final ObjectMapper mapper = new ObjectMapper();
> public String prefix;
>
> public Print() {
> this.prefix = "";
> }
>
> public Print(String prefix) {
> this.prefix = prefix;
> }
>
> @Override
> public void processElement(V value, Context ctx, Collector<V> out) {
> System.out.printf("%s ", prefix);
> if (ctx != null) {
> TimerService srv = ctx.timerService();
> Long timestampLong = ctx.timestamp();
> long timestamp = 0;
> if (timestampLong != null) {
> timestamp = timestampLong;
> }
> long watermark = 0;
> if (srv != null) {
> watermark = srv.currentWatermark();
> }
> System.out.printf("[timestamp=%d watermark=%d] ", timestamp, watermark);
> }
>
> if (value == null) {
> System.out.println("null");
> } else {
> try {
> System.out.println(new String(mapper.writeValueAsBytes(value)));
> } catch (Exception e) {
> System.out.println("exception");
> e.printStackTrace();
> }
> }
> out.collect(value);
> }
> }
>
> @ClassRule
> public static MiniClusterWithClientResource flinkCluster =
> new MiniClusterWithClientResource(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberSlotsPerTaskManager(2)
> .setNumberTaskManagers(1)
> .build());
>
> @BeforeClass
> public static void setup() {
> env.setParallelism(1);
> if (connected == 0) {
> zooKeeper = new EmbeddedZookeeper();
> ServerProperties.setProperty("zookeeper.connect", "localhost:" + zooKeeper.port());
>
> server = TestUtils.createServer(new KafkaConfig(getNewLogDir(ServerProperties)), new MockTime());
> admin = AdminClient.create(ProducerProperties);
> }
> connected += 1;
> }
>
> @AfterClass
> public static void tearDown() {
> if (connected == 1) {
> try {
> server.shutdown();
> zooKeeper.shutdown();
> } catch (Exception e) {
> e.printStackTrace();
> }
>
> zooKeeper = null;
> server = null;
> admin = null;
> }
> connected -= 1;
> }
>
> @Test
> public void testFail() throws Exception {
> String inputTopic = "input";
>
> Map<String, String> configs = new HashMap<>();
> int partitions = 1;
> short replication = 1;
>
> CreateTopicsResult result = admin.createTopics(Arrays.asList(
> new NewTopic(inputTopic, partitions, replication).configs(configs)
> ));
> result.all().get();
>
> KafkaProducer<String, String> producer = new KafkaProducer<String, String>(ProducerProperties, stringSerializer, stringSerializer);
> ;
>
> DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(inputTopic));
> for (Map.Entry<String, TopicDescription> topic : topics.all().get().entrySet()) {
> System.out.printf("%s %d\n", topic.getValue().name(), topic.getValue().partitions().size());
> System.out.println(topic.getValue().toString());
> }
>
> // Some subscription events
> producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1).toMilliseconds(), "0", "test message"));
> producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(10).toMilliseconds(), "0", "test message"));
> producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(100).toMilliseconds(), "0", "test message"));
> producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1000).toMilliseconds(), "0", "test message"));
> producer.send(new ProducerRecord<String, String>(inputTopic, 0, Long.MAX_VALUE, "0", "test message"));
> producer.flush();
> producer.close();
>
> Properties prop = new Properties();
> prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
> prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> prop.put("group.id", "0");
> prop.put("enable.auto.commit", "true");
> prop.put("auto.commit.interval.ms", "1000");
> prop.put("session.timeout.ms", "30000");
> FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), prop);
> source.assignTimestampsAndWatermarks(
> new WatermarkStrategy<String>() {
> @Override
> public TimestampAssigner<String> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
> return new TimestampAssigner<String>() {
> @Override
> public long extractTimestamp(String event, long recordTimestamp) {
> System.out.printf("Assigning timestamp %d\n", recordTimestamp);
> return recordTimestamp;
> }
> };
> }
>
> @Override
> public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
> return new WatermarkGenerator<String>() {
> public long latestWatermark = Long.MIN_VALUE;
>
> @Override
> public void onEvent(String event, long timestamp, WatermarkOutput output) {
> long eventWatermark = timestamp - Time.days(1).toMilliseconds();
> if (eventWatermark > latestWatermark) {
> System.out.printf("Emitting watermark %d\n", eventWatermark);
> output.emitWatermark(new Watermark(eventWatermark));
> latestWatermark = eventWatermark;
> }
> }
>
> @Override
> public void onPeriodicEmit(WatermarkOutput output) {
> }
> };
> }
> });
> source.setStartFromEarliest();
>
> env.addSource(source)
> .process(new Print<String>("Source"));
>
> System.out.println(env.getExecutionPlan());
> JobClient client = null;
> try {
> client = env.executeAsync("Fail Test");
> } catch (Exception e) {
> e.printStackTrace();
> throw e;
> }
>
> topics = admin.describeTopics(Arrays.asList(inputTopic));
> for (Map.Entry<String, TopicDescription> topic : topics.all().get().entrySet()) {
> System.out.printf("%s %d\n", topic.getValue().name(), topic.getValue().partitions().size());
> System.out.println(topic.getValue().toString());
> }
>
> TimeUnit.SECONDS.sleep(5);
> client.cancel().get(5, TimeUnit.SECONDS);
> }
> }
>
>