You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/10/13 08:24:15 UTC
[1/2] flink git commit: [hotfix][kafka] Undo DataGenerators changes
(use inline kafka producer again
Repository: flink
Updated Branches:
refs/heads/master 15df71ba9 -> 3d5bca0ab
[hotfix][kafka] Undo DataGenerators changes (use inline kafka producer again
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/744f8ebb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/744f8ebb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/744f8ebb
Branch: refs/heads/master
Commit: 744f8ebb66b2a7288942be139cd7a7e6d1170c80
Parents: 15df71b
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Oct 11 15:48:32 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Oct 12 14:03:14 2016 +0200
----------------------------------------------------------------------
.../kafka/KafkaTestEnvironmentImpl.java | 3 -
.../connectors/kafka/Kafka09ITCase.java | 9 -
.../connectors/kafka/KafkaConsumerTestBase.java | 242 +------------------
.../kafka/testutils/DataGenerators.java | 165 ++++++-------
4 files changed, 72 insertions(+), 347 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index af6d254..78fc1c6 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -115,9 +115,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
return stream.addSink(prod);
- /* FlinkKafkaProducer010.FlinkKafkaProducer010Configuration<T> sink = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props, partitioner);
- sink.setFlushOnCheckpoint(true);
- return sink; */
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index fd167a0..b9ec18a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -35,15 +35,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
runSimpleConcurrentProducerConsumerTopology();
}
-// @Test(timeout = 60000)
-// public void testPunctuatedExplicitWMConsumer() throws Exception {
-// runExplicitPunctuatedWMgeneratingConsumerTest(false);
-// }
-
-// @Test(timeout = 60000)
-// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
-// runExplicitPunctuatedWMgeneratingConsumerTest(true);
-// }
@Test(timeout = 60000)
public void testKeyValueSupport() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 3c967ba..0810a3e 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -18,8 +18,6 @@
package org.apache.flink.streaming.connectors.kafka;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
@@ -31,7 +29,6 @@ import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -39,13 +36,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
@@ -68,7 +62,6 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -92,7 +85,6 @@ import org.apache.flink.test.util.SuccessException;
import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Assert;
@@ -116,7 +108,6 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -517,7 +508,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// launch a producer thread
DataGenerators.InfiniteStringsGenerator generator =
- new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic, flinkPort);
+ new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
generator.start();
// launch a consumer asynchronously
@@ -571,7 +562,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
assertTrue(failueCause.getMessage().contains("Job was cancelled"));
if (generator.isAlive()) {
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "String generator");
generator.shutdown();
generator.join();
}
@@ -1723,234 +1713,4 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
this.numElementsTotal = state;
}
}
-
- ///////////// Testing the Kafka consumer with embeded watermark generation functionality ///////////////
-
-// @RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class)
-// public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws Exception {
-//
-// final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString();
-// final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString();
-//
-// final Map<String, Boolean> topics = new HashMap<>();
-// topics.put(topic1, false);
-// topics.put(topic2, emptyPartition);
-//
-// final int noOfTopcis = topics.size();
-// final int partitionsPerTopic = 1;
-// final int elementsPerPartition = 100 + 1;
-//
-// final int totalElements = emptyPartition ?
-// partitionsPerTopic * elementsPerPartition :
-// noOfTopcis * partitionsPerTopic * elementsPerPartition;
-//
-// createTestTopic(topic1, partitionsPerTopic, 1);
-// createTestTopic(topic2, partitionsPerTopic, 1);
-//
-// final StreamExecutionEnvironment env =
-// StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-// env.setParallelism(partitionsPerTopic);
-// env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
-// env.getConfig().disableSysoutLogging();
-//
-// TypeInformation<Tuple2<Long, Integer>> longIntType = TypeInfoParser.parse("Tuple2<Long, Integer>");
-//
-// Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
-// producerProperties.setProperty("retries", "0");
-//
-// putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType);
-//
-// List<String> topicTitles = new ArrayList<>(topics.keySet());
-// runPunctuatedComsumer(env, topicTitles, totalElements, longIntType);
-//
-// executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest");
-//
-// for(String topic: topicTitles) {
-// deleteTestTopic(topic);
-// }
-// }
-//
-// private void executeAndCatchException(StreamExecutionEnvironment env, String execName) throws Exception {
-// try {
-// tryExecutePropagateExceptions(env, execName);
-// }
-// catch (ProgramInvocationException | JobExecutionException e) {
-// // look for NotLeaderForPartitionException
-// Throwable cause = e.getCause();
-//
-// // search for nested SuccessExceptions
-// int depth = 0;
-// while (cause != null && depth++ < 20) {
-// if (cause instanceof kafka.common.NotLeaderForPartitionException) {
-// throw (Exception) cause;
-// }
-// cause = cause.getCause();
-// }
-// throw e;
-// }
-// }
-//
-// private void putDataInTopics(StreamExecutionEnvironment env,
-// Properties producerProperties,
-// final int elementsPerPartition,
-// Map<String, Boolean> topics,
-// TypeInformation<Tuple2<Long, Integer>> outputTypeInfo) {
-// if(topics.size() != 2) {
-// throw new RuntimeException("This method accepts two topics as arguments.");
-// }
-//
-// TypeInformationSerializationSchema<Tuple2<Long, Integer>> sinkSchema =
-// new TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig());
-//
-// DataStream<Tuple2<Long, Integer>> stream = env
-// .addSource(new RichParallelSourceFunction<Tuple2<Long, Integer>>() {
-// private boolean running = true;
-//
-// @Override
-// public void run(SourceContext<Tuple2<Long, Integer>> ctx) throws InterruptedException {
-// int topic = 0;
-// int currentTs = 1;
-//
-// while (running && currentTs < elementsPerPartition) {
-// long timestamp = (currentTs % 10 == 0) ? -1L : currentTs;
-// ctx.collect(new Tuple2<Long, Integer>(timestamp, topic));
-// currentTs++;
-// }
-//
-// Tuple2<Long, Integer> toWrite2 = new Tuple2<Long, Integer>(-1L, topic);
-// ctx.collect(toWrite2);
-// }
-//
-// @Override
-// public void cancel() {
-// running = false;
-// }
-// }).setParallelism(1);
-//
-// List<Map.Entry<String, Boolean>> topicsL = new ArrayList<>(topics.entrySet());
-//
-// stream = stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() {
-//
-// @Override
-// public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception {
-// return value;
-// }
-// }).setParallelism(1);
-// kafkaServer.produceIntoKafka(stream, topicsL.get(0).getKey(),
-// new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null).setParallelism(1);
-//
-// if(!topicsL.get(1).getValue()) {
-// stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() {
-//
-// @Override
-// public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception {
-// long timestamp = (value.f0 == -1) ? -1L : 1000 + value.f0;
-// return new Tuple2<>(timestamp, 1);
-// }
-// }).setParallelism(1).addSink(kafkaServer.produceIntoKafka(topicsL.get(1).getKey(),
-// new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null)).setParallelism(1);
-// }
-// }
-
- private DataStreamSink<Tuple2<Long, Integer>> runPunctuatedComsumer(StreamExecutionEnvironment env,
- List<String> topics,
- final int totalElementsToExpect,
- TypeInformation<Tuple2<Long, Integer>> inputTypeInfo) {
-
- TypeInformationSerializationSchema<Tuple2<Long, Integer>> sourceSchema =
- new TypeInformationSerializationSchema<>(inputTypeInfo, env.getConfig());
-
- Properties props = new Properties();
- props.putAll(standardProps);
- props.putAll(secureProps);
- FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer
- .getConsumer(topics, sourceSchema, props)
- .assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor());
-
- DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source);
-
- return consuming
- .transform("testingWatermarkOperator", inputTypeInfo, new WMTestingOperator())
- .addSink(new RichSinkFunction<Tuple2<Long, Integer>>() {
-
- private int elementCount = 0;
-
- @Override
- public void invoke(Tuple2<Long, Integer> value) throws Exception {
- elementCount++;
- if (elementCount == totalElementsToExpect) {
- throw new SuccessException();
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- }
- });
- }
-
- /** An extractor that emits a Watermark whenever the timestamp <b>in the record</b> is equal to {@code -1}. */
- private static class TestPunctuatedTSExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<Long, Integer>> {
-
- @Override
- public Watermark checkAndGetNextWatermark(Tuple2<Long, Integer> lastElement, long extractedTimestamp) {
- return (lastElement.f0 == -1) ? new Watermark(extractedTimestamp) : null;
- }
-
- @Override
- public long extractTimestamp(Tuple2<Long, Integer> element, long previousElementTimestamp) {
- return element.f0;
- }
- }
-
- private static class WMTestingOperator extends AbstractStreamOperator<Tuple2<Long, Integer>> implements OneInputStreamOperator<Tuple2<Long, Integer>, Tuple2<Long, Integer>> {
-
- private long lastReceivedWatermark = Long.MIN_VALUE;
-
- private Map<Integer, Boolean> isEligible = new HashMap<>();
- private Map<Integer, Long> perPartitionMaxTs = new HashMap<>();
-
- WMTestingOperator() {
- isEligible = new HashMap<>();
- perPartitionMaxTs = new HashMap<>();
- }
-
- @Override
- public void processElement(StreamRecord<Tuple2<Long, Integer>> element) throws Exception {
- int partition = element.getValue().f1;
- Long maxTs = perPartitionMaxTs.get(partition);
- if(maxTs == null || maxTs < element.getValue().f0) {
- perPartitionMaxTs.put(partition, element.getValue().f0);
- isEligible.put(partition, element.getValue().f0 > lastReceivedWatermark);
- }
- output.collect(element);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- int partition = -1;
- long minTS = Long.MAX_VALUE;
- for (Integer part : perPartitionMaxTs.keySet()) {
- Long ts = perPartitionMaxTs.get(part);
- if (ts < minTS && isEligible.get(part)) {
- partition = part;
- minTS = ts;
- lastReceivedWatermark = ts;
- }
- }
- isEligible.put(partition, false);
-
- assertEquals(minTS, mark.getTimestamp());
- output.emitWatermark(mark);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- perPartitionMaxTs.clear();
- isEligible.clear();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 3f035fd..ba75212 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -18,74 +18,35 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import java.io.Serializable;
+import java.util.Collection;
import java.util.Properties;
import java.util.Random;
@SuppressWarnings("serial")
public class DataGenerators {
-
- public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
- KafkaTestEnvironment testServer, String topic,
- int numPartitions,
- final int from, final int to) throws Exception {
- TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
- env.setParallelism(numPartitions);
- env.getConfig().disableSysoutLogging();
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
- new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
- int cnt = from;
- int partition = getRuntimeContext().getIndexOfThisSubtask();
-
- while (running && cnt <= to) {
- ctx.collect(new Tuple2<>(partition, cnt));
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- testServer.produceIntoKafka(stream, topic,
- new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(resultType, env.getConfig())),
- FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()),
- new Tuple2Partitioner(numPartitions)
- );
-
- env.execute("Data generator (Int, Int) stream to topic " + topic);
- }
-
- // ------------------------------------------------------------------------
-
public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
KafkaTestEnvironment testServer, String topic,
final int numPartitions,
@@ -105,9 +66,9 @@ public class DataGenerators {
// create a sequence
int[] elements = new int[numElements];
for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
- i < numElements;
- i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
-
+ i < numElements;
+ i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
+
elements[i] = val;
}
@@ -116,7 +77,7 @@ public class DataGenerators {
Random rnd = new Random();
for (int i = 0; i < elements.length; i++) {
int otherPos = rnd.nextInt(elements.length);
-
+
int tmp = elements[i];
elements[i] = elements[otherPos];
elements[otherPos] = tmp;
@@ -142,7 +103,7 @@ public class DataGenerators {
if(secureProps != null) {
props.putAll(testServer.getSecureProperties());
}
-
+
stream = stream.rebalance();
testServer.produceIntoKafka(stream, topic,
new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
@@ -156,63 +117,55 @@ public class DataGenerators {
env.execute("Scrambles int sequence generator");
}
-
+
// ------------------------------------------------------------------------
-
- public static class InfiniteStringsGenerator extends Thread implements Serializable {
- private transient KafkaTestEnvironment server;
-
- private final String topic;
+ public static class InfiniteStringsGenerator extends Thread {
- private final int flinkPort;
+ private final KafkaTestEnvironment server;
+
+ private final String topic;
private volatile Throwable error;
-
+
private volatile boolean running = true;
-
- public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic, int flinkPort) {
+
+ public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) {
this.server = server;
this.topic = topic;
- this.flinkPort = flinkPort;
}
@Override
public void run() {
// we manually feed data into the Kafka sink
- FlinkKafkaProducerBase<String> producer = null;
+ RichFunction producer = null;
try {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- DataStream<String> stream = env.addSource(new SourceFunction<String>() {
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- final StringBuilder bld = new StringBuilder();
- final Random rnd = new Random();
- while (running) {
- bld.setLength(0);
- int len = rnd.nextInt(100) + 1;
- for (int i = 0; i < len; i++) {
- bld.append((char) (rnd.nextInt(20) + 'a'));
- }
+ Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString());
+ producerProperties.setProperty("retries", "3");
+ StreamTransformation<String> mockTransform = new MockStreamTransformation();
+ DataStream<String> stream = new DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform);
+ DataStreamSink<String> sink = server.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
+ producerProperties, new FixedPartitioner<String>());
+ StreamSink<String> producerOperator = sink.getTransformation().getOperator();
+ producer = (RichFunction) producerOperator.getUserFunction();
+ producer.setRuntimeContext(new MockRuntimeContext(1,0));
+ producer.open(new Configuration());
- String next = bld.toString();
- ctx.collect(next);
- }
- }
+ final StringBuilder bld = new StringBuilder();
+ final Random rnd = new Random();
- @Override
- public void cancel() {
- running = false;
+ while (running) {
+ bld.setLength(0);
+
+ int len = rnd.nextInt(100) + 1;
+ for (int i = 0; i < len; i++) {
+ bld.append((char) (rnd.nextInt(20) + 'a') );
}
- });
- Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString());
- producerProperties.setProperty("retries", "3");
- server.produceIntoKafka(stream, topic,
- new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
- producerProperties, new FixedPartitioner<String>());
- env.execute("String generator");
+ String next = bld.toString();
+ producerOperator.processElement(new StreamRecord<>(next));
+ }
}
catch (Throwable t) {
this.error = t;
@@ -228,14 +181,38 @@ public class DataGenerators {
}
}
}
-
+
public void shutdown() {
this.running = false;
this.interrupt();
}
-
+
public Throwable getError() {
return this.error;
}
+
+ private static class MockStreamTransformation extends StreamTransformation<String> {
+ public MockStreamTransformation() {
+ super("MockTransform", TypeInfoParser.<String>parse("String"), 1);
+ }
+
+ @Override
+ public void setChainingStrategy(ChainingStrategy strategy) {
+
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ return null;
+ }
+ }
+
+ public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ return null;
+ }
+ }
}
-}
+}
\ No newline at end of file
[2/2] flink git commit: [hotfix][kafka] Backport Kafka09FetcherTest
for Kafka010
Posted by rm...@apache.org.
[hotfix][kafka] Backport Kafka09FetcherTest for Kafka010
This closes #2627
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d5bca0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d5bca0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d5bca0a
Branch: refs/heads/master
Commit: 3d5bca0abcb6071c2eedfedce36b883a6f605687
Parents: 744f8eb
Author: Robert Metzger <rm...@apache.org>
Authored: Wed Oct 12 14:03:01 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Oct 13 10:23:37 2016 +0200
----------------------------------------------------------------------
.../connectors/kafka/Kafka010FetcherTest.java | 300 +++++++++++++++++++
1 file changed, 300 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3d5bca0a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
new file mode 100644
index 0000000..8f0b170
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka010Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Kafka010Fetcher.class)
+public class Kafka010FetcherTest {
+
+ @Test
+ public void testCommitDoesNotBlock() throws Exception {
+
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // to synchronize when the consumer is in its blocking method
+ final OneShotLatch sync = new OneShotLatch();
+
+ // ----- the mock consumer with blocking poll calls ----
+ final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+ KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+ sync.trigger();
+ blockerLatch.await();
+ return ConsumerRecords.empty();
+ }
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ blockerLatch.trigger();
+ return null;
+ }
+ }).when(mockConsumer).wakeup();
+
+ // make sure the fetcher creates the mock consumer
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = mock(SourceContext.class);
+ List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+ StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+
+ final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+ sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // wait until the fetcher has reached the method of interest
+ sync.await();
+
+ // ----- trigger the offset commit -----
+
+ final AtomicReference<Throwable> commitError = new AtomicReference<>();
+ final Thread committer = new Thread("committer runner") {
+ @Override
+ public void run() {
+ try {
+ fetcher.commitSpecificOffsetsToKafka(testCommitData);
+ } catch (Throwable t) {
+ commitError.set(t);
+ }
+ }
+ };
+ committer.start();
+
+ // ----- ensure that the committer finishes in time -----
+ committer.join(30000);
+ assertFalse("The committer did not finish in time", committer.isAlive());
+
+ // ----- test done, wait till the fetcher is done for a clean shutdown -----
+ fetcher.cancel();
+ fetcherRunner.join();
+
+ // check that there were no errors in the fetcher
+ final Throwable caughtError = error.get();
+ if (caughtError != null) {
+ throw new Exception("Exception in the fetcher", caughtError);
+ }
+ }
+
+ @Test
+ public void ensureOffsetsGetCommitted() throws Exception {
+
+ // test data
+ final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
+ final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
+
+ final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
+ testCommitData1.put(testPartition1, 11L);
+ testCommitData1.put(testPartition2, 18L);
+
+ final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
+ testCommitData2.put(testPartition1, 19L);
+ testCommitData2.put(testPartition2, 28L);
+
+ final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
+
+
+ // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
+
+ final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+ KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+ blockerLatch.await();
+ return ConsumerRecords.empty();
+ }
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ blockerLatch.trigger();
+ return null;
+ }
+ }).when(mockConsumer).wakeup();
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ @SuppressWarnings("unchecked")
+ Map<TopicPartition, OffsetAndMetadata> offsets =
+ (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
+
+ OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
+
+ commitStore.add(offsets);
+ callback.onComplete(offsets, null);
+
+ return null;
+ }
+ }).when(mockConsumer).commitAsync(
+ Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+ // make sure the fetcher creates the mock consumer
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = mock(SourceContext.class);
+ List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+ StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+
+ final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+ sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // ----- trigger the first offset commit -----
+
+ fetcher.commitSpecificOffsetsToKafka(testCommitData1);
+ Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
+
+ for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
+ TopicPartition partition = entry.getKey();
+ if (partition.topic().equals("test")) {
+ assertEquals(42, partition.partition());
+ assertEquals(12L, entry.getValue().offset());
+ }
+ else if (partition.topic().equals("another")) {
+ assertEquals(99, partition.partition());
+ assertEquals(18L, entry.getValue().offset());
+ }
+ }
+
+ // ----- trigger the second offset commit -----
+
+ fetcher.commitSpecificOffsetsToKafka(testCommitData2);
+ Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
+
+ for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
+ TopicPartition partition = entry.getKey();
+ if (partition.topic().equals("test")) {
+ assertEquals(42, partition.partition());
+ assertEquals(20L, entry.getValue().offset());
+ }
+ else if (partition.topic().equals("another")) {
+ assertEquals(99, partition.partition());
+ assertEquals(28L, entry.getValue().offset());
+ }
+ }
+
+ // ----- test done, wait till the fetcher is done for a clean shutdown -----
+ fetcher.cancel();
+ fetcherRunner.join();
+
+ // check that there were no errors in the fetcher
+ final Throwable caughtError = error.get();
+ if (caughtError != null) {
+ throw new Exception("Exception in the fetcher", caughtError);
+ }
+ }
+}
\ No newline at end of file