You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:38 UTC
[08/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-connector-kafka*
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ac278fb..03a23f5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -18,14 +18,6 @@
package org.apache.flink.streaming.connectors.kafka;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.server.KafkaServer;
-import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -82,6 +74,15 @@ import org.apache.flink.test.util.SuccessException;
import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Collector;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.server.KafkaServer;
+import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Assert;
@@ -90,6 +91,7 @@ import org.junit.Rule;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
@@ -114,14 +116,15 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
+/**
+ * Abstract test base for all Kafka consumer tests.
+ */
@SuppressWarnings("serial")
public abstract class KafkaConsumerTestBase extends KafkaTestBase {
-
+
@Rule
public RetryRule retryRule = new RetryRule();
-
// ------------------------------------------------------------------------
// Common Test Preparation
// ------------------------------------------------------------------------
@@ -134,8 +137,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
public void ensureNoJobIsLingering() throws Exception {
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
}
-
-
+
// ------------------------------------------------------------------------
// Suite of Tests
//
@@ -146,7 +148,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
/**
* Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist
- * and a wrong broker was specified
+ * and a wrong broker was specified.
*
* @throws Exception
*/
@@ -173,8 +175,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
DataStream<String> stream = see.addSource(source);
stream.print();
see.execute("No broker test");
- } catch(JobExecutionException jee) {
- if(kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) {
+ } catch (JobExecutionException jee) {
+ if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) {
assertTrue(jee.getCause() instanceof TimeoutException);
TimeoutException te = (TimeoutException) jee.getCause();
@@ -191,7 +193,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
/**
- * Ensures that the committed offsets to Kafka are the offsets of "the next record to process"
+ * Ensures that the committed offsets to Kafka are the offsets of "the next record to process".
*/
public void runCommitOffsetsToKafka() throws Exception {
// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
@@ -354,13 +356,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
* This test ensures that when the consumers retrieve some start offset from kafka (earliest, latest), that this offset
* is committed to Kafka, even if some partitions are not read.
*
- * Test:
+ * <p>Test:
* - Create 3 partitions
* - write 50 messages into each.
* - Start three consumers with auto.offset.reset='latest' and wait until they committed into Kafka.
* - Check if the offsets in Kafka are set to 50 for the three partitions
*
- * See FLINK-3440 as well
+ * <p>See FLINK-3440 as well
*/
public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception {
// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
@@ -518,7 +520,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env
.addSource(latestReadingConsumer).setParallelism(parallelism)
- .flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Object>() {
+ .flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Object>() {
@Override
public void flatMap(Tuple2<Integer, Integer> value, Collector<Object> out) throws Exception {
if (value.f1 - recordsInEachPartition < 0) {
@@ -605,12 +607,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
* This test ensures that the consumer correctly uses group offsets in Kafka, and defaults to "auto.offset.reset"
* behaviour when necessary, when explicitly configured to start from group offsets.
*
- * The partitions and their committed group offsets are setup as:
+ * <p>The partitions and their committed group offsets are setup as:
* partition 0 --> committed offset 23
* partition 1 --> no commit offset
* partition 2 --> committed offset 43
*
- * When configured to start from group offsets, each partition should read:
+ * <p>When configured to start from group offsets, each partition should read:
* partition 0 --> start from offset 23, read to offset 49 (27 records)
* partition 1 --> default to "auto.offset.reset" (set to earliest), so start from offset 0, read to offset 49 (50 records)
* partition 2 --> start from offset 43, read to offset 49 (7 records)
@@ -653,20 +655,20 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
* start from specific offsets. For partitions which a specific offset can not be found for, the starting position
* for them should fallback to the group offsets behaviour.
*
- * 4 partitions will have 50 records with offsets 0 to 49. The supplied specific offsets map is:
+ * <p>4 partitions will have 50 records with offsets 0 to 49. The supplied specific offsets map is:
* partition 0 --> start from offset 19
* partition 1 --> not set
* partition 2 --> start from offset 22
* partition 3 --> not set
* partition 4 --> start from offset 26 (this should be ignored because the partition does not exist)
*
- * The partitions and their committed group offsets are setup as:
+ * <p>The partitions and their committed group offsets are setup as:
* partition 0 --> committed offset 23
* partition 1 --> committed offset 31
* partition 2 --> committed offset 43
* partition 3 --> no commit offset
*
- * When configured to start from these specific offsets, each partition should read:
+ * <p>When configured to start from these specific offsets, each partition should read:
* partition 0 --> start from offset 19, read to offset 49 (31 records)
* partition 1 --> fallback to group offsets, so start from offset 31, read to offset 49 (19 records)
* partition 2 --> start from offset 22, read to offset 49 (28 records)
@@ -711,7 +713,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
-
+
/**
* Ensure Kafka is working on both producer and consumer side.
* This executes a job that contains two Flink pipelines.
@@ -719,22 +721,22 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
* <pre>
* (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
* </pre>
- *
- * We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer
+ *
+ * <p>We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer
* does not guarantee exactly-once output. Hence a recovery would introduce duplicates that
* cause the test to fail.
*
- * This test also ensures that FLINK-3156 doesn't happen again:
+ * <p>This test also ensures that FLINK-3156 doesn't happen again:
*
- * The following situation caused a NPE in the FlinkKafkaConsumer
+ * <p>The following situation caused a NPE in the FlinkKafkaConsumer
*
- * topic-1 <-- elements are only produced into topic1.
+ * <p>topic-1 <-- elements are only produced into topic1.
* topic-2
*
- * Therefore, this test is consuming as well from an empty topic.
+ * <p>Therefore, this test is consuming as well from an empty topic.
*
*/
- @RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
+ @RetryOnException(times = 2, exception = kafka.common.NotLeaderForPartitionException.class)
public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString();
@@ -763,7 +765,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// ----------- add producer dataflow ----------
- DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long, String>>() {
private boolean running = true;
@@ -772,7 +774,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
int limit = cnt + elementsPerPartition;
-
while (running && cnt < limit) {
ctx.collect(new Tuple2<>(1000L + cnt, "kafka-" + cnt));
cnt++;
@@ -1002,13 +1003,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
FailingIdentityMapper.failedBefore = false;
tryExecute(env, "multi-source-one-partitions exactly once test");
-
deleteTestTopic(topic);
}
-
-
+
/**
- * Tests that the source can be properly canceled when reading full partitions.
+ * Tests that the source can be properly canceled when reading full partitions.
*/
public void runCancelingOnFullInputTest() throws Exception {
final String topic = "cancelingOnFullTopic";
@@ -1056,7 +1055,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
Thread.sleep(2000);
Throwable failueCause = jobError.get();
- if(failueCause != null) {
+ if (failueCause != null) {
failueCause.printStackTrace();
Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
}
@@ -1089,7 +1088,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
/**
- * Tests that the source can be properly canceled when reading empty partitions.
+ * Tests that the source can be properly canceled when reading empty partitions.
*/
public void runCancelingOnEmptyInputTest() throws Exception {
final String topic = "cancelingOnEmptyInputTopic";
@@ -1149,7 +1148,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
/**
- * Tests that the source can be properly canceled when reading full partitions.
+ * Tests that the source can be properly canceled when reading full partitions.
*/
public void runFailOnDeployTest() throws Exception {
final String topic = "failOnDeployTopic";
@@ -1198,19 +1197,19 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
/**
- * Test producing and consuming into multiple topics
+ * Test producing and consuming into multiple topics.
* @throws Exception
*/
public void runProduceConsumeMultipleTopics() throws Exception {
- final int NUM_TOPICS = 5;
- final int NUM_ELEMENTS = 20;
+ final int numTopics = 5;
+ final int numElements = 20;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
-
+
// create topics with content
final List<String> topics = new ArrayList<>();
- for (int i = 0; i < NUM_TOPICS; i++) {
+ for (int i = 0; i < numTopics; i++) {
final String topic = "topic-" + i;
topics.add(topic);
// create topic
@@ -1227,8 +1226,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
public void run(SourceContext<Tuple3<Integer, Integer, String>> ctx) throws Exception {
int partition = getRuntimeContext().getIndexOfThisSubtask();
- for (int topicId = 0; topicId < NUM_TOPICS; topicId++) {
- for (int i = 0; i < NUM_ELEMENTS; i++) {
+ for (int topicId = 0; topicId < numTopics; topicId++) {
+ for (int i = 0; i < numElements; i++) {
ctx.collect(new Tuple3<>(partition, i, "topic-" + topicId));
}
}
@@ -1255,7 +1254,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
stream = env.addSource(kafkaServer.getConsumer(topics, schema, props));
stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() {
- Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS);
+ Map<String, Integer> countPerTopic = new HashMap<>(numTopics);
@Override
public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> out) throws Exception {
Integer count = countPerTopic.get(value.f2);
@@ -1268,10 +1267,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// check map:
for (Map.Entry<String, Integer> el: countPerTopic.entrySet()) {
- if (el.getValue() < NUM_ELEMENTS) {
+ if (el.getValue() < numElements) {
break; // not enough yet
}
- if (el.getValue() > NUM_ELEMENTS) {
+ if (el.getValue() > numElements) {
throw new RuntimeException("There is a failure in the test. I've read " +
el.getValue() + " from topic " + el.getKey());
}
@@ -1283,17 +1282,17 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
tryExecute(env, "Count elements from the topics");
-
// delete all topics again
- for (int i = 0; i < NUM_TOPICS; i++) {
+ for (int i = 0; i < numTopics; i++) {
final String topic = "topic-" + i;
deleteTestTopic(topic);
}
}
/**
- * Test Flink's Kafka integration also with very big records (30MB)
- * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
+ * Test Flink's Kafka integration also with very big records (30MB).
+ *
+ * <p>see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
*
*/
public void runBigRecordTestTopology() throws Exception {
@@ -1337,11 +1336,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
if (elCnt == 11) {
throw new SuccessException();
} else {
- throw new RuntimeException("There have been "+elCnt+" elements");
+ throw new RuntimeException("There have been " + elCnt + " elements");
}
}
if (elCnt > 10) {
- throw new RuntimeException("More than 10 elements seen: "+elCnt);
+ throw new RuntimeException("More than 10 elements seen: " + elCnt);
}
}
});
@@ -1395,7 +1394,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
deleteTestTopic(topic);
}
-
public void runBrokerFailureTest() throws Exception {
final String topic = "brokerFailureTestTopic";
@@ -1404,7 +1402,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
final int totalElements = parallelism * numElementsPerPartition;
final int failAfterElements = numElementsPerPartition / 3;
-
createTestTopic(topic, parallelism, 2);
DataGenerators.generateRandomizedIntegerSequence(
@@ -1417,7 +1414,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
LOG.info("Leader to shutdown {}", leaderId);
-
// run the topology (the consumers must handle the failures)
DeserializationSchema<Integer> schema =
@@ -1450,7 +1446,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
public void runKeyValueTest() throws Exception {
final String topic = "keyvaluetest";
createTestTopic(topic, 1, 1);
- final int ELEMENT_COUNT = 5000;
+ final int elementCount = 5000;
// ----------- Write some data into Kafka -------------------
@@ -1463,7 +1459,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
@Override
public void run(SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception {
Random rnd = new Random(1337);
- for (long i = 0; i < ELEMENT_COUNT; i++) {
+ for (long i = 0; i < elementCount; i++) {
PojoValue pojo = new PojoValue();
pojo.when = new Date(rnd.nextLong());
pojo.lon = rnd.nextLong();
@@ -1473,6 +1469,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
ctx.collect(new Tuple2<>(key, pojo));
}
}
+
@Override
public void cancel() {
}
@@ -1491,26 +1488,25 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
-
KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, props));
- fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() {
+ fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long, PojoValue>, Object>() {
long counter = 0;
@Override
public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws Exception {
// the elements should be in order.
- Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter );
+ Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter);
if (value.f1.lat % 2 == 0) {
assertNull("key was not null", value.f0);
} else {
Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter);
}
counter++;
- if (counter == ELEMENT_COUNT) {
+ if (counter == elementCount) {
// we got the right number of elements
throw new SuccessException();
}
@@ -1522,22 +1518,21 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
deleteTestTopic(topic);
}
- public static class PojoValue {
+ private static class PojoValue {
public Date when;
public long lon;
public long lat;
public PojoValue() {}
}
-
/**
- * Test delete behavior and metrics for producer
+ * Test delete behavior and metrics for producer.
* @throws Exception
*/
public void runAllDeletesTest() throws Exception {
final String topic = "alldeletestest";
createTestTopic(topic, 1, 1);
- final int ELEMENT_COUNT = 300;
+ final int elementCount = 300;
// ----------- Write some data into Kafka -------------------
@@ -1550,12 +1545,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
@Override
public void run(SourceContext<Tuple2<byte[], PojoValue>> ctx) throws Exception {
Random rnd = new Random(1337);
- for (long i = 0; i < ELEMENT_COUNT; i++) {
+ for (long i = 0; i < elementCount; i++) {
final byte[] key = new byte[200];
rnd.nextBytes(key);
ctx.collect(new Tuple2<>(key, (PojoValue) null));
}
}
+
@Override
public void cancel() {
}
@@ -1589,7 +1585,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// ensure that deleted messages are passed as nulls
assertNull(value.f1);
counter++;
- if (counter == ELEMENT_COUNT) {
+ if (counter == elementCount) {
// we got the right number of elements
throw new SuccessException();
}
@@ -1608,8 +1604,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
*/
public void runEndOfStreamTest() throws Exception {
- final int ELEMENT_COUNT = 300;
- final String topic = writeSequence("testEndOfStream", ELEMENT_COUNT, 1, 1);
+ final int elementCount = 300;
+ final String topic = writeSequence("testEndOfStream", elementCount, 1, 1);
// read using custom schema
final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -1621,21 +1617,21 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
props.putAll(standardProps);
props.putAll(secureProps);
- DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), props));
- fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Void>() {
+ DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(elementCount), props));
+ fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
@Override
public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {
// noop ;)
}
});
- JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka");
+ JobExecutionResult result = tryExecute(env1, "Consume " + elementCount + " elements from Kafka");
deleteTestTopic(topic);
}
/**
- * Test metrics reporting for consumer
+ * Test metrics reporting for consumer.
*
* @throws Exception
*/
@@ -1690,9 +1686,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
env1.execute("Metrics test job");
- } catch(Throwable t) {
+ } catch (Throwable t) {
LOG.warn("Got exception during execution", t);
- if(!(t instanceof JobCancellationException)) { // we'll cancel the job
+ if (!(t instanceof JobCancellationException)) { // we'll cancel the job
error.f0 = t;
}
}
@@ -1723,7 +1719,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// check that offsets are correctly reported
for (ObjectName object : offsetMetrics) {
Object offset = mBeanServer.getAttribute(object, "Value");
- if((long) offset >= 0) {
+ if ((long) offset >= 0) {
numPosOffsets++;
}
}
@@ -1738,7 +1734,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
Set<ObjectName> producerMetrics = mBeanServer.queryNames(new ObjectName("*KafkaProducer*:*"), null);
Assert.assertTrue("No producer metrics found", producerMetrics.size() > 30);
-
LOG.info("Found all JMX metrics. Cancelling job.");
} finally {
// cancel
@@ -1755,12 +1750,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
deleteTestTopic(topic);
}
+ private static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> {
- public static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> {
-
final int finalCount;
int count = 0;
-
+
TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer, Integer>");
TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig());
@@ -1785,7 +1779,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}
-
// ------------------------------------------------------------------------
// Reading writing test data sets
// ------------------------------------------------------------------------
@@ -1916,13 +1909,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
String baseTopicName,
final int numElements,
final int parallelism,
- final int replicationFactor) throws Exception
- {
+ final int replicationFactor) throws Exception {
LOG.info("\n===================================\n" +
"== Writing sequence of " + numElements + " into " + baseTopicName + " with p=" + parallelism + "\n" +
"===================================");
- final TypeInformation<Tuple2<Integer, Integer>> resultType =
+ final TypeInformation<Tuple2<Integer, Integer>> resultType =
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
@@ -1932,15 +1924,15 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KeyedDeserializationSchemaWrapper<>(
new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
-
+
final int maxNumAttempts = 10;
for (int attempt = 1; attempt <= maxNumAttempts; attempt++) {
-
+
final String topicName = baseTopicName + '-' + attempt;
-
+
LOG.info("Writing attempt #1");
-
+
// -------- Write the Sequence --------
createTestTopic(topicName, parallelism, replicationFactor);
@@ -1948,33 +1940,33 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
writeEnv.getConfig().disableSysoutLogging();
-
+
DataStream<Tuple2<Integer, Integer>> stream = writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
+
private boolean running = true;
-
+
@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
int cnt = 0;
int partition = getRuntimeContext().getIndexOfThisSubtask();
-
+
while (running && cnt < numElements) {
ctx.collect(new Tuple2<>(partition, cnt));
cnt++;
}
}
-
+
@Override
public void cancel() {
running = false;
}
}).setParallelism(parallelism);
-
+
// the producer must not produce duplicates
Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "0");
producerProperties.putAll(secureProps);
-
+
kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism))
.setParallelism(parallelism);
@@ -1987,21 +1979,21 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
continue;
}
-
+
LOG.info("Finished writing sequence");
// -------- Validate the Sequence --------
-
+
// we need to validate the sequence, because kafka's producers are not exactly once
LOG.info("Validating sequence");
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
-
+
final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment();
readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
readEnv.getConfig().disableSysoutLogging();
readEnv.setParallelism(parallelism);
-
+
Properties readProps = (Properties) standardProps.clone();
readProps.setProperty("group.id", "flink-tests-validator");
readProps.putAll(secureProps);
@@ -2010,10 +2002,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
readEnv
.addSource(consumer)
.map(new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
-
+
private final int totalCount = parallelism * numElements;
private int count = 0;
-
+
@Override
public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
if (++count == totalCount) {
@@ -2024,9 +2016,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}).setParallelism(1)
.addSink(new DiscardingSink<Tuple2<Integer, Integer>>()).setParallelism(1);
-
+
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
+
Thread runner = new Thread() {
@Override
public void run() {
@@ -2038,15 +2030,15 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
};
runner.start();
-
+
final long deadline = System.nanoTime() + 10_000_000_000L;
long delay;
while (runner.isAlive() && (delay = deadline - System.nanoTime()) > 0) {
- runner.join(delay/1_000_000L);
+ runner.join(delay / 1_000_000L);
}
-
+
boolean success;
-
+
if (runner.isAlive()) {
// did not finish in time, maybe the producer dropped one or more records and
// the validation did not reach the exit point
@@ -2065,7 +2057,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
-
+
if (success) {
// everything is good!
return topicName;
@@ -2075,7 +2067,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// fall through the loop
}
}
-
+
throw new Exception("Could not write a valid sequence to Kafka after " + maxNumAttempts + " attempts");
}
@@ -2090,28 +2082,28 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
// we request only one stream per consumer instance. Kafka will make sure that each consumer group
// will see each message only once.
- Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+ Map<String, Integer> topicCountMap = Collections.singletonMap(topicName, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
if (streams.size() != 1) {
- throw new RuntimeException("Expected only one message stream but got "+streams.size());
+ throw new RuntimeException("Expected only one message stream but got " + streams.size());
}
List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
if (kafkaStreams == null) {
- throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+ throw new RuntimeException("Requested stream not available. Available streams: " + streams.toString());
}
if (kafkaStreams.size() != 1) {
- throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+ throw new RuntimeException("Requested 1 stream from Kafka, bot got " + kafkaStreams.size() + " streams");
}
LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>();
int read = 0;
- while(iteratorToRead.hasNext()) {
+ while (iteratorToRead.hasNext()) {
read++;
result.add(iteratorToRead.next());
if (read == stopAfter) {
- LOG.info("Read "+read+" elements");
+ LOG.info("Read " + read + " elements");
return result;
}
}
@@ -2131,12 +2123,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}
- private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer)
- throws IOException
- {
+ private static void printTopic(String topicName, int elements, DeserializationSchema<?> deserializer)
+ throws IOException {
// write the sequence to log for debugging purposes
Properties newProps = new Properties(standardProps);
- newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
+ newProps.setProperty("group.id", "topic-printer" + UUID.randomUUID().toString());
newProps.setProperty("auto.offset.reset", "smallest");
newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));
newProps.putAll(secureProps);
@@ -2145,15 +2136,14 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
printTopic(topicName, printerConfig, deserializer, elements);
}
-
- public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
+ private static class BrokerKillingMapper<T> extends RichMapFunction<T, T>
implements ListCheckpointed<Integer>, CheckpointListener {
private static final long serialVersionUID = 6334389850158707313L;
public static volatile boolean killedLeaderBefore;
public static volatile boolean hasBeenCheckpointedBeforeFailure;
-
+
private final int shutdownBrokerId;
private final int failCount;
private int numElementsTotal;
@@ -2174,10 +2164,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
@Override
public T map(T value) throws Exception {
numElementsTotal++;
-
+
if (!killedLeaderBefore) {
Thread.sleep(10);
-
+
if (failer && numElementsTotal >= failCount) {
// shut down a Kafka broker
KafkaServer toShutDown = null;
@@ -2188,14 +2178,14 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
break;
}
}
-
+
if (toShutDown == null) {
StringBuilder listOfBrokers = new StringBuilder();
for (KafkaServer server : kafkaServer.getBrokers()) {
listOfBrokers.append(kafkaServer.getBrokerId(server));
listOfBrokers.append(" ; ");
}
-
+
throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId
+ " ; available brokers: " + listOfBrokers.toString());
}
@@ -2266,7 +2256,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
try {
ts.serialize(new Tuple2<>(element.f0, element.f1), out);
} catch (IOException e) {
- throw new RuntimeException("Error" ,e);
+ throw new RuntimeException("Error" , e);
}
return by.toByteArray();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index bcc8328..e292e13 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -43,6 +43,9 @@ import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+/**
+ * Abstract test base for all Kafka producer tests.
+ */
@SuppressWarnings("serial")
public abstract class KafkaProducerTestBase extends KafkaTestBase {
@@ -50,7 +53,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
* This tests verifies that custom partitioning works correctly, with a default topic
* and dynamic topic. The number of partitions for each topic is deliberately different.
*
- * Test topology:
+ * <p>Test topology:
*
* <pre>
* +------> (sink) --+--> [DEFAULT_TOPIC-1] --> (source) -> (map) -----+
@@ -67,11 +70,11 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
* \ | | | |
* +------> (sink) --+--> [DYNAMIC_TOPIC-3] --> (source) -> (map) -----+
* </pre>
- *
- * Each topic has an independent mapper that validates the values come consistently from
+ *
+ * <p>Each topic has an independent mapper that validates the values come consistently from
* the correct Kafka partition of the topic is is responsible of.
- *
- * Each topic also has a final sink that validates that there are no duplicates and that all
+ *
+ * <p>Each topic also has a final sink that validates that there are no duplicates and that all
* partitions are present.
*/
public void runCustomPartitioningTest() {
@@ -171,7 +174,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
// ------------------------------------------------------------------------
- public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
+ private static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
private final Map<String, Integer> expectedTopicsToNumPartitions;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index f688660..d5c9276 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -37,7 +37,6 @@ import org.apache.flink.util.InstantiationUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
@@ -58,7 +57,7 @@ import static org.junit.Assert.fail;
*/
@SuppressWarnings("serial")
public class KafkaShortRetentionTestBase implements Serializable {
-
+
protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
protected static final int NUM_TMS = 1;
@@ -66,7 +65,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
protected static final int TM_SLOTS = 8;
protected static final int PARALLELISM = NUM_TMS * TM_SLOTS;
-
+
private static KafkaTestEnvironment kafkaServer;
private static Properties standardProps;
private static LocalFlinkMiniCluster flink;
@@ -90,7 +89,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
- if(kafkaServer.isSecureRunSupported()) {
+ if (kafkaServer.isSecureRunSupported()) {
secureProps = kafkaServer.getSecureProperties();
}
@@ -151,10 +150,8 @@ public class KafkaShortRetentionTestBase implements Serializable {
env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
env.getConfig().disableSysoutLogging();
-
// ----------- add producer dataflow ----------
-
DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() {
private boolean running = true;
@@ -164,7 +161,6 @@ public class KafkaShortRetentionTestBase implements Serializable {
int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
int limit = cnt + elementsPerPartition;
-
while (running && !stopProducer && cnt < limit) {
ctx.collect("element-" + cnt);
cnt++;
@@ -196,14 +192,13 @@ public class KafkaShortRetentionTestBase implements Serializable {
kafkaServer.deleteTestTopic(topic);
}
-
private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema<String> {
private int numJumps;
long nextExpected = 0;
@Override
public String deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
- if(offset != nextExpected) {
+ if (offset != nextExpected) {
numJumps++;
nextExpected = offset;
LOG.info("Registered now jump at offset {}", offset);
@@ -219,7 +214,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
@Override
public boolean isEndOfStream(String nextElement) {
- if( numJumps >= 5) {
+ if (numJumps >= 5) {
// we saw 5 jumps and no failures --> consumer can handle auto.offset.reset
stopProducer = true;
return true;
@@ -233,15 +228,14 @@ public class KafkaShortRetentionTestBase implements Serializable {
}
}
-
/**
- * Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none"
+ * Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none".
* @throws Exception
*/
public void runFailOnAutoOffsetResetNone() throws Exception {
final String topic = "auto-offset-reset-none-test";
final int parallelism = 1;
-
+
kafkaServer.createTestTopic(topic, parallelism, 1);
final StreamExecutionEnvironment env =
@@ -249,7 +243,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
env.setParallelism(parallelism);
env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
env.getConfig().disableSysoutLogging();
-
+
// ----------- add consumer ----------
Properties customProps = new Properties();
@@ -263,10 +257,10 @@ public class KafkaShortRetentionTestBase implements Serializable {
try {
env.execute("Test auto offset reset none");
- } catch(Throwable e) {
+ } catch (Throwable e) {
// check if correct exception has been thrown
- if(!e.getCause().getCause().getMessage().contains("Unable to find previous offset") // kafka 0.8
- && !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9
+ if (!e.getCause().getCause().getMessage().contains("Unable to find previous offset") // kafka 0.8
+ && !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9
) {
throw e;
}
@@ -287,7 +281,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
customProps.putAll(standardProps);
customProps.putAll(secureProps);
customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
-
+
try {
kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
fail("should fail with an exception");
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index d4fe9cc..dcf3167 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -15,9 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kafka;
-import java.util.Properties;
+package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -27,8 +26,11 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.types.Row;
+
import org.junit.Test;
+import java.util.Properties;
+
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
@@ -38,6 +40,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+/**
+ * Abstract test base for all Kafka table sink tests.
+ */
public abstract class KafkaTableSinkTestBase {
private static final String TOPIC = "testTopic";
@@ -46,7 +51,7 @@ public abstract class KafkaTableSinkTestBase {
private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
private static final Properties PROPERTIES = createSinkProperties();
@SuppressWarnings("unchecked")
- private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
+ private final FlinkKafkaProducerBase<Row> producer = new FlinkKafkaProducerBase<Row>(
TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) {
@Override
@@ -61,7 +66,7 @@ public abstract class KafkaTableSinkTestBase {
KafkaTableSink kafkaTableSink = spy(createTableSink());
kafkaTableSink.emitDataStream(dataStream);
- verify(dataStream).addSink(eq(PRODUCER));
+ verify(dataStream).addSink(eq(producer));
verify(kafkaTableSink).createKafkaProducer(
eq(TOPIC),
@@ -87,7 +92,7 @@ public abstract class KafkaTableSinkTestBase {
protected abstract SerializationSchema<Row> getSerializationSchema();
private KafkaTableSink createTableSink() {
- return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
+ return createTableSink(TOPIC, PROPERTIES, PARTITIONER, producer);
}
private static Properties createSinkProperties() {
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 341df45..8028bfc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -18,23 +18,29 @@
package org.apache.flink.streaming.connectors.kafka;
-import java.util.Properties;
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.types.Row;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificRecordBase;
import org.junit.Test;
+
+import java.util.Properties;
+
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+/**
+ * Abstract test base for all Kafka table sources.
+ */
public abstract class KafkaTableSourceTestBase {
private static final String TOPIC = "testTopic";
@@ -47,10 +53,14 @@ public abstract class KafkaTableSourceTestBase {
BasicTypeInfo.LONG_TYPE_INFO };
private static final Properties PROPERTIES = createSourceProperties();
- // Avro record that matches above schema
+ /**
+ * Avro record that matches above schema.
+ */
public static class AvroSpecificRecord extends SpecificRecordBase {
+ //CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(FIELD_NAMES, FIELD_TYPES);
+ //CHECKSTYLE.ON: StaticVariableNameCheck
public Long mylong;
public String mystring;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 1837af6..c484a4b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -33,17 +33,15 @@ import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import scala.concurrent.duration.FiniteDuration;
/**
* The base for the Kafka tests. It brings up:
@@ -52,7 +50,7 @@ import java.util.concurrent.TimeUnit;
* <li>Three Kafka Brokers (mini clusters)</li>
* <li>A Flink mini cluster</li>
* </ul>
- *
+ *
* <p>Code in this test is based on the following GitHub repository:
* <a href="https://github.com/sakserv/hadoop-mini-clusters">
* https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
@@ -62,7 +60,7 @@ import java.util.concurrent.TimeUnit;
public abstract class KafkaTestBase extends TestLogger {
protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
-
+
protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
protected static final int NUM_TMS = 1;
@@ -74,7 +72,7 @@ public abstract class KafkaTestBase extends TestLogger {
protected static String brokerConnectionStrings;
protected static Properties standardProps;
-
+
protected static LocalFlinkMiniCluster flink;
protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
@@ -89,7 +87,7 @@ public abstract class KafkaTestBase extends TestLogger {
// ------------------------------------------------------------------------
// Setup and teardown of the mini clusters
// ------------------------------------------------------------------------
-
+
@BeforeClass
public static void prepare() throws ClassNotFoundException {
@@ -162,7 +160,7 @@ public abstract class KafkaTestBase extends TestLogger {
flink.shutdown();
}
- if(secureProps != null) {
+ if (secureProps != null) {
secureProps.clear();
}
@@ -170,12 +168,9 @@ public abstract class KafkaTestBase extends TestLogger {
}
-
-
// ------------------------------------------------------------------------
// Execution utilities
// ------------------------------------------------------------------------
-
protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception {
try {
@@ -200,7 +195,7 @@ public abstract class KafkaTestBase extends TestLogger {
protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
kafkaServer.createTestTopic(topic, numberOfPartitions, replicationFactor);
}
-
+
protected static void deleteTestTopic(String topic) {
kafkaServer.deleteTestTopic(topic);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 311a1a4..4df3465 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,11 +17,6 @@
package org.apache.flink.streaming.connectors.kafka;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import kafka.server.KafkaServer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
@@ -31,8 +26,14 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import kafka.server.KafkaServer;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
/**
- * Abstract class providing a Kafka test environment
+ * Abstract class providing a Kafka test environment.
*/
public abstract class KafkaTestEnvironment {
@@ -89,9 +90,14 @@ public abstract class KafkaTestEnvironment {
// -- offset handlers
+ /**
+ * Simple interface to commit and retrieve offsets.
+ */
public interface KafkaOffsetHandler {
Long getCommittedOffset(String topicName, int partition);
+
void setCommittedOffset(String topicName, int partition, long offset);
+
void close();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
deleted file mode 100644
index fa84199..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestFlinkFixedPartitioner {
-
-
- /**
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 --------------/
- * 3 -------------/
- * 4 ------------/
- * </pre>
- */
- @Test
- public void testMoreFlinkThanBrokers() {
- FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
-
- int[] partitions = new int[]{0};
-
- part.open(0, 4);
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
- part.open(1, 4);
- Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
-
- part.open(2, 4);
- Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
- Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
-
- part.open(3, 4);
- Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
- }
-
- /**
- *
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 ----------------> 2
- * 3
- * 4
- * 5
- *
- * </pre>
- */
- @Test
- public void testFewerPartitions() {
- FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
-
- int[] partitions = new int[]{0, 1, 2, 3, 4};
- part.open(0, 2);
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
- part.open(1, 2);
- Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
- Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
- }
-
- /*
- * Flink Sinks: Kafka Partitions
- * 1 ------------>---> 1
- * 2 -----------/----> 2
- * 3 ----------/
- */
- @Test
- public void testMixedCase() {
- FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
- int[] partitions = new int[]{0,1};
-
- part.open(0, 3);
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
- part.open(1, 3);
- Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
-
- part.open(2, 3);
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index c1a64c4..c83a97e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -23,13 +23,14 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.SerializedValue;
import org.junit.Test;
import javax.annotation.Nullable;
+
import java.util.HashMap;
import java.util.Map;
@@ -38,6 +39,9 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+/**
+ * Tests for the {@link AbstractFetcher}.
+ */
@SuppressWarnings("serial")
public class AbstractFetcherTest {
@@ -191,7 +195,7 @@ public class AbstractFetcherTest {
final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2];
// elements generate a watermark if the timestamp is a multiple of three
-
+
// elements for partition 1
fetcher.emitRecord(1L, part1, 1L);
fetcher.emitRecord(2L, part1, 2L);
@@ -211,11 +215,11 @@ public class AbstractFetcherTest {
fetcher.emitRecord(102L, part3, 2L);
assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
-
+
// now, we should have a watermark
assertTrue(sourceContext.hasWatermark());
assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
-
+
// advance partition 3
fetcher.emitRecord(1003L, part3, 3L);
fetcher.emitRecord(1004L, part3, 4L);
@@ -239,7 +243,7 @@ public class AbstractFetcherTest {
assertTrue(sourceContext.hasWatermark());
assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
}
-
+
@Test
public void testPeriodicWatermarks() throws Exception {
final String testTopic = "test topic name";
@@ -329,8 +333,7 @@ public class AbstractFetcherTest {
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
- long autoWatermarkInterval) throws Exception
- {
+ long autoWatermarkInterval) throws Exception {
super(
sourceContext,
assignedPartitionsWithStartOffsets,
@@ -391,7 +394,6 @@ public class AbstractFetcherTest {
}
}
-
@Override
public void markAsTemporarilyIdle() {
throw new UnsupportedOperationException();
@@ -412,7 +414,7 @@ public class AbstractFetcherTest {
public boolean hasWatermark() {
return currentWatermark != null;
}
-
+
public Watermark getLatestWatermark() throws InterruptedException {
synchronized (watermarkLock) {
while (currentWatermark == null) {
@@ -430,7 +432,7 @@ public class AbstractFetcherTest {
private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
private volatile long maxTimestamp = Long.MIN_VALUE;
-
+
@Override
public long extractTimestamp(Long element, long previousElementTimestamp) {
maxTimestamp = Math.max(maxTimestamp, element);
@@ -456,6 +458,6 @@ public class AbstractFetcherTest {
public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
index b215bd3..4496a26 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
@@ -27,8 +27,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Tests for the {@link KafkaTopicPartition}.
+ */
public class KafkaTopicPartitionTest {
-
+
@Test
public void validateUid() {
Field uidField;
@@ -40,14 +43,14 @@ public class KafkaTopicPartitionTest {
fail("serialVersionUID is not defined");
return;
}
-
+
assertTrue(Modifier.isStatic(uidField.getModifiers()));
assertTrue(Modifier.isFinal(uidField.getModifiers()));
assertTrue(Modifier.isPrivate(uidField.getModifiers()));
-
+
assertEquals(long.class, uidField.getType());
-
- // the UID has to be constant to make sure old checkpoints/savepoints can be read
+
+ // the UID has to be constant to make sure old checkpoints/savepoints can be read
try {
assertEquals(722083576322742325L, uidField.getLong(null));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
index 075b79b..a41125a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
@@ -18,13 +18,6 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.io.avro.generated.Address;
import org.apache.flink.api.io.avro.generated.Colors;
@@ -32,12 +25,21 @@ import org.apache.flink.api.io.avro.generated.User;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.types.Row;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificRecord;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
/**
* Utilities for creating Avro Schemas.
*/
public final class AvroTestUtils {
- private static String NAMESPACE = "org.apache.flink.streaming.connectors.kafka";
+ private static final String NAMESPACE = "org.apache.flink.streaming.connectors.kafka";
/**
* Creates a flat Avro Schema for testing.
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index c0fb836..b204ea9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -18,10 +18,6 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
-import java.util.Collection;
-import java.util.Properties;
-import java.util.Random;
-
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -43,14 +39,22 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * Test data generators.
+ */
@SuppressWarnings("serial")
public class DataGenerators {
- public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
- KafkaTestEnvironment testServer, String topic,
- final int numPartitions,
- final int numElements,
- final boolean randomizeOrder) throws Exception {
+ public static void generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment env,
+ KafkaTestEnvironment testServer, String topic,
+ final int numPartitions,
+ final int numElements,
+ final boolean randomizeOrder) throws Exception {
env.setParallelism(numPartitions);
env.getConfig().disableSysoutLogging();
env.setRestartStrategy(RestartStrategies.noRestart());
@@ -65,8 +69,8 @@ public class DataGenerators {
// create a sequence
int[] elements = new int[numElements];
for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
- i < numElements;
- i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
+ i < numElements;
+ i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
elements[i] = val;
}
@@ -99,7 +103,7 @@ public class DataGenerators {
Properties props = new Properties();
props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()));
Properties secureProps = testServer.getSecureProperties();
- if(secureProps != null) {
+ if (secureProps != null) {
props.putAll(testServer.getSecureProperties());
}
@@ -119,6 +123,10 @@ public class DataGenerators {
// ------------------------------------------------------------------------
+ /**
+ * A generator that continuously writes strings into the configured topic. The generation is stopped if an exception
+ * occurs or {@link #shutdown()} is called.
+ */
public static class InfiniteStringsGenerator extends Thread {
private final KafkaTestEnvironment server;
@@ -129,7 +137,6 @@ public class DataGenerators {
private volatile boolean running = true;
-
public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) {
this.server = server;
this.topic = topic;
@@ -164,7 +171,7 @@ public class DataGenerators {
int len = rnd.nextInt(100) + 1;
for (int i = 0; i < len; i++) {
- bld.append((char) (rnd.nextInt(20) + 'a') );
+ bld.append((char) (rnd.nextInt(20) + 'a'));
}
String next = bld.toString();
@@ -211,7 +218,7 @@ public class DataGenerators {
}
}
- public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+ private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
index ec64b00..c25eefb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
@@ -22,30 +22,35 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
-
-public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
+/**
+ * A {@link RichMapFunction} that fails after the configured number of records have been processed.
+ *
+ * @param <T>
+ */
+public class FailingIdentityMapper<T> extends RichMapFunction<T, T> implements
ListCheckpointed<Integer>, CheckpointListener, Runnable {
-
+
private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
-
+
private static final long serialVersionUID = 6334389850158707313L;
-
+
public static volatile boolean failedBefore;
public static volatile boolean hasBeenCheckpointedBeforeFailure;
private final int failCount;
private int numElementsTotal;
private int numElementsThisTime;
-
+
private boolean failer;
private boolean hasBeenCheckpointed;
-
+
private Thread printer;
private volatile boolean printerRunning = true;
@@ -64,10 +69,10 @@ public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
public T map(T value) throws Exception {
numElementsTotal++;
numElementsThisTime++;
-
+
if (!failedBefore) {
Thread.sleep(10);
-
+
if (failer && numElementsTotal >= failCount) {
hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
failedBefore = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
index 055326d..0fbe554 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
@@ -15,12 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kafka.testutils;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import java.util.Properties;
+/**
+ * Test configuration for a kafka producer.
+ */
public class FakeStandardProducerConfig {
public static Properties get() {
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
index 131325f..9bbe1d3 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -23,13 +23,16 @@ import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
+/**
+ * Utilities for communicating with a jobmanager through a {@link ActorGateway}.
+ */
public class JobManagerCommunicationUtils {
private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
@@ -43,7 +46,6 @@ public class JobManagerCommunicationUtils {
Object result = Await.result(listResponse, askTimeout);
List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-
if (jobs.isEmpty()) {
return;
}
@@ -84,13 +86,13 @@ public class JobManagerCommunicationUtils {
public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception {
JobStatusMessage status = null;
-
+
for (int i = 0; i < 200; i++) {
// find the jobID
Future<Object> listResponse = jobManager.ask(
JobManagerMessages.getRequestRunningJobsStatus(),
askTimeout);
-
+
List<JobStatusMessage> jobs;
try {
Object result = Await.result(listResponse, askTimeout);
@@ -99,7 +101,7 @@ public class JobManagerCommunicationUtils {
catch (Exception e) {
throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
}
-
+
if (jobs.isEmpty()) {
// try again, fall through the loop
Thread.sleep(50);
@@ -107,33 +109,33 @@ public class JobManagerCommunicationUtils {
else if (jobs.size() == 1) {
status = jobs.get(0);
}
- else if(name != null) {
- for(JobStatusMessage msg: jobs) {
- if(msg.getJobName().equals(name)) {
+ else if (name != null) {
+ for (JobStatusMessage msg: jobs) {
+ if (msg.getJobName().equals(name)) {
status = msg;
}
}
- if(status == null) {
- throw new Exception("Could not cancel job - no job matched expected name = '" + name +"' in " + jobs);
+ if (status == null) {
+ throw new Exception("Could not cancel job - no job matched expected name = '" + name + "' in " + jobs);
}
} else {
String jobNames = "";
- for(JobStatusMessage jsm: jobs) {
+ for (JobStatusMessage jsm: jobs) {
jobNames += jsm.getJobName() + ", ";
}
throw new Exception("Could not cancel job - more than one running job: " + jobNames);
}
}
-
+
if (status == null) {
- throw new Exception("Could not cancel job - no running jobs");
+ throw new Exception("Could not cancel job - no running jobs");
}
else if (status.getJobState().isGloballyTerminalState()) {
throw new Exception("Could not cancel job - job is not running any more");
}
-
+
JobID jobId = status.getJobId();
-
+
Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
try {
Await.result(response, askTimeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
index e105e01..29e469d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
@@ -23,14 +23,16 @@ import org.apache.flink.api.common.functions.MapFunction;
import java.util.HashSet;
import java.util.Set;
-
+/**
+ * {@link MapFunction} that verifies that he partitioning is identical.
+ */
public class PartitionValidatingMapper implements MapFunction<Integer, Integer> {
private static final long serialVersionUID = 1088381231244959088L;
-
+
/* the partitions from which this function received data */
private final Set<Integer> myPartitions = new HashSet<>();
-
+
private final int numPartitions;
private final int maxPartitions;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
index 1d61229..040f15c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
@@ -23,10 +23,10 @@ import org.apache.flink.api.common.functions.MapFunction;
/**
* An identity map function that sleeps between elements, throttling the
* processing speed.
- *
+ *
* @param <T> The type mapped.
*/
-public class ThrottledMapper<T> implements MapFunction<T,T> {
+public class ThrottledMapper<T> implements MapFunction<T, T> {
private static final long serialVersionUID = 467008933767159126L;
@@ -41,4 +41,4 @@ public class ThrottledMapper<T> implements MapFunction<T,T> {
Thread.sleep(this.sleep);
return value;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
index e7fff52..6f2c4a1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
@@ -33,13 +33,13 @@ public class Tuple2FlinkPartitioner extends FlinkKafkaPartitioner<Tuple2<Integer
public Tuple2FlinkPartitioner(int expectedPartitions) {
this.expectedPartitions = expectedPartitions;
}
-
+
@Override
public int partition(Tuple2<Integer, Integer> next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
if (partitions.length != expectedPartitions) {
throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
}
-
+
return next.f0;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
index 46e70fd..5ace012 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.SuccessException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,14 +30,17 @@ import java.util.BitSet;
import java.util.Collections;
import java.util.List;
+/**
+ * A {@link RichSinkFunction} that verifies that no duplicate records are generated.
+ */
public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements ListCheckpointed<Tuple2<Integer, BitSet>> {
private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
private static final long serialVersionUID = 1748426382527469932L;
-
+
private final int numElementsTotal;
-
+
private BitSet duplicateChecker = new BitSet(); // this is checkpointed
private int numElements; // this is checkpointed
@@ -45,11 +49,10 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> impleme
this.numElementsTotal = numElementsTotal;
}
-
@Override
public void invoke(Integer value) throws Exception {
numElements++;
-
+
if (duplicateChecker.get(value)) {
throw new Exception("Received a duplicate: " + value);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
index 37ed408..9aa1207 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
@@ -18,9 +18,10 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.flink.configuration.ConfigConstants;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
/**
* Simple ZooKeeper serializer for Strings.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
index 16c226f..a3fb2b0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
@@ -26,4 +26,3 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-