You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:19 UTC
[29/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
new file mode 100644
index 0000000..dccf698
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+/**
+ * A class containing a special Kafka broker which has a log retention of only 250 ms.
+ * This way, we can make sure our consumer is properly handling cases where we run into out of offset
+ * errors
+ */
+@SuppressWarnings("serial")
+public class KafkaShortRetentionTestBase implements Serializable {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
+
+ private static KafkaTestEnvironment kafkaServer;
+ private static Properties standardProps;
+ private static LocalFlinkMiniCluster flink;
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ protected static Properties secureProps = new Properties();
+
+ @BeforeClass
+ public static void prepare() throws IOException, ClassNotFoundException {
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" Starting KafkaShortRetentionTestBase ");
+ LOG.info("-------------------------------------------------------------------------");
+
+ Configuration flinkConfig = new Configuration();
+
+ // dynamically load the implementation for the test
+ Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
+ kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+
+ LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
+
+ if(kafkaServer.isSecureRunSupported()) {
+ secureProps = kafkaServer.getSecureProperties();
+ }
+
+ Properties specificProperties = new Properties();
+ specificProperties.setProperty("log.retention.hours", "0");
+ specificProperties.setProperty("log.retention.minutes", "0");
+ specificProperties.setProperty("log.retention.ms", "250");
+ specificProperties.setProperty("log.retention.check.interval.ms", "100");
+ kafkaServer.prepare(1, specificProperties, false);
+
+ standardProps = kafkaServer.getStandardProperties();
+
+ // start also a re-usable Flink mini cluster
+ flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+ flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+ flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+ flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
+
+ flink = new LocalFlinkMiniCluster(flinkConfig, false);
+ flink.start();
+ }
+
+ @AfterClass
+ public static void shutDownServices() {
+ if (flink != null) {
+ flink.shutdown();
+ }
+ kafkaServer.shutdown();
+
+ secureProps.clear();
+ }
+
+ /**
+ * This test is concurrently reading and writing from a kafka topic.
+ * The job will run for a while
+ * In a special deserializationSchema, we make sure that the offsets from the topic
+ * are non-continuous (because the data is expiring faster than its consumed --> with auto.offset.reset = 'earliest', some offsets will not show up)
+ *
+ */
+ private static boolean stopProducer = false;
+
+ public void runAutoOffsetResetTest() throws Exception {
+ final String topic = "auto-offset-reset-test";
+
+ final int parallelism = 1;
+ final int elementsPerPartition = 50000;
+
+ Properties tprops = new Properties();
+ tprops.setProperty("retention.ms", "250");
+ kafkaServer.createTestTopic(topic, parallelism, 1, tprops);
+
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort());
+ env.setParallelism(parallelism);
+ env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
+ env.getConfig().disableSysoutLogging();
+
+
+ // ----------- add producer dataflow ----------
+
+
+ DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() {
+
+ private boolean running = true;
+
+ @Override
+ public void run(SourceContext<String> ctx) throws InterruptedException {
+ int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
+ int limit = cnt + elementsPerPartition;
+
+
+ while (running && !stopProducer && cnt < limit) {
+ ctx.collect("element-" + cnt);
+ cnt++;
+ Thread.sleep(10);
+ }
+ LOG.info("Stopping producer");
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null);
+
+ // ----------- add consumer dataflow ----------
+
+ NonContinousOffsetsDeserializationSchema deserSchema = new NonContinousOffsetsDeserializationSchema();
+ FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, props);
+
+ DataStreamSource<String> consuming = env.addSource(source);
+ consuming.addSink(new DiscardingSink<String>());
+
+ tryExecute(env, "run auto offset reset test");
+
+ kafkaServer.deleteTestTopic(topic);
+ }
+
+
+ private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema<String> {
+ private int numJumps;
+ long nextExpected = 0;
+
+ @Override
+ public String deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+ if(offset != nextExpected) {
+ numJumps++;
+ nextExpected = offset;
+ LOG.info("Registered now jump at offset {}", offset);
+ }
+ nextExpected++;
+ try {
+ Thread.sleep(10); // slow down data consumption to trigger log eviction
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Stopping it");
+ }
+ return "";
+ }
+
+ @Override
+ public boolean isEndOfStream(String nextElement) {
+ if( numJumps >= 5) {
+ // we saw 5 jumps and no failures --> consumer can handle auto.offset.reset
+ stopProducer = true;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return TypeInfoParser.parse("String");
+ }
+ }
+
+
+ /**
+ * Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none"
+ * @throws Exception
+ */
+ public void runFailOnAutoOffsetResetNone() throws Exception {
+ final String topic = "auto-offset-reset-none-test";
+ final int parallelism = 1;
+
+ kafkaServer.createTestTopic(topic, parallelism, 1);
+
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort());
+ env.setParallelism(parallelism);
+ env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
+ env.getConfig().disableSysoutLogging();
+
+ // ----------- add consumer ----------
+
+ Properties customProps = new Properties();
+ customProps.putAll(standardProps);
+ customProps.putAll(secureProps);
+ customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
+ FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
+
+ DataStreamSource<String> consuming = env.addSource(source);
+ consuming.addSink(new DiscardingSink<String>());
+
+ try {
+ env.execute("Test auto offset reset none");
+ } catch(Throwable e) {
+ System.out.println("MESSAGE: " + e.getCause().getCause().getMessage());
+ // check if correct exception has been thrown
+ if(!e.getCause().getCause().getMessage().contains("Unable to find previous offset") // kafka 0.8
+ && !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9
+ ) {
+ throw e;
+ }
+ }
+
+ kafkaServer.deleteTestTopic(topic);
+ }
+
+ public void runFailOnAutoOffsetResetNoneEager() throws Exception {
+ final String topic = "auto-offset-reset-none-test";
+ final int parallelism = 1;
+
+ kafkaServer.createTestTopic(topic, parallelism, 1);
+
+ // ----------- add consumer ----------
+
+ Properties customProps = new Properties();
+ customProps.putAll(standardProps);
+ customProps.putAll(secureProps);
+ customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
+
+ try {
+ kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
+ fail("should fail with an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ assertTrue(e.getMessage().contains("none"));
+ }
+
+ kafkaServer.deleteTestTopic(topic);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
new file mode 100644
index 0000000..ae0af52
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public abstract class KafkaTableSinkTestBase {
+
+ private static final String TOPIC = "testTopic";
+ protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
+ private static final TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+ private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
+ private static final Properties PROPERTIES = createSinkProperties();
+ @SuppressWarnings("unchecked")
+ private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
+ TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) {
+
+ @Override
+ protected void flush() {}
+ };
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testKafkaTableSink() throws Exception {
+ DataStream dataStream = mock(DataStream.class);
+
+ KafkaTableSink kafkaTableSink = spy(createTableSink());
+ kafkaTableSink.emitDataStream(dataStream);
+
+ verify(dataStream).addSink(eq(PRODUCER));
+
+ verify(kafkaTableSink).createKafkaProducer(
+ eq(TOPIC),
+ eq(PROPERTIES),
+ any(getSerializationSchema().getClass()),
+ eq(PARTITIONER));
+ }
+
+ @Test
+ public void testConfiguration() {
+ KafkaTableSink kafkaTableSink = createTableSink();
+ KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+ assertNotSame(kafkaTableSink, newKafkaTableSink);
+
+ assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
+ assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
+ assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
+ }
+
+ protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
+ KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+
+ protected abstract SerializationSchema<Row> getSerializationSchema();
+
+ private KafkaTableSink createTableSink() {
+ return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
+ }
+
+ private static Properties createSinkProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", "localhost:12345");
+ return properties;
+ }
+
+ private static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable {
+ @Override
+ public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+ return 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
new file mode 100644
index 0000000..2a281e8
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public abstract class KafkaTableSourceTestBase {
+
+ private static final String TOPIC = "testTopic";
+ private static final String[] FIELD_NAMES = new String[] { "long", "string", "boolean", "double", "missing-field" };
+ private static final TypeInformation<?>[] FIELD_TYPES = new TypeInformation<?>[] {
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO };
+ private static final Properties PROPERTIES = createSourceProperties();
+
+ @Test
+ public void testKafkaTableSource() {
+ KafkaTableSource kafkaTableSource = spy(createTableSource());
+ StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class);
+ kafkaTableSource.getDataStream(env);
+
+ verify(env).addSource(any(getFlinkKafkaConsumer()));
+
+ verify(kafkaTableSource).getKafkaConsumer(
+ eq(TOPIC),
+ eq(PROPERTIES),
+ any(getDeserializationSchema()));
+ }
+
+ protected abstract KafkaTableSource createTableSource(String topic, Properties properties,
+ String[] fieldNames, TypeInformation<?>[] typeInfo);
+
+ protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();
+
+ protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer();
+
+ private KafkaTableSource createTableSource() {
+ return createTableSource(TOPIC, PROPERTIES, FIELD_NAMES, FIELD_TYPES);
+ }
+
+ private static Properties createSourceProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("zookeeper.connect", "dummy");
+ properties.setProperty("group.id", "dummy");
+ return properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
new file mode 100644
index 0000000..5cec4f0
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.jmx.JMXReporter;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * The base for the Kafka tests. It brings up:
+ * <ul>
+ * <li>A ZooKeeper mini cluster</li>
+ * <li>Three Kafka Brokers (mini clusters)</li>
+ * <li>A Flink mini cluster</li>
+ * </ul>
+ *
+ * <p>Code in this test is based on the following GitHub repository:
+ * <a href="https://github.com/sakserv/hadoop-mini-clusters">
+ * https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
+ * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
+ */
+@SuppressWarnings("serial")
+public abstract class KafkaTestBase extends TestLogger {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
+
+ protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
+
+ protected static String brokerConnectionStrings;
+
+ protected static Properties standardProps;
+
+ protected static LocalFlinkMiniCluster flink;
+
+ protected static int flinkPort;
+
+ protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+
+ protected static KafkaTestEnvironment kafkaServer;
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ protected static Properties secureProps = new Properties();
+
+ // ------------------------------------------------------------------------
+ // Setup and teardown of the mini clusters
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void prepare() throws IOException, ClassNotFoundException {
+
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" Starting KafkaTestBase ");
+ LOG.info("-------------------------------------------------------------------------");
+
+ startClusters(false);
+
+ }
+
+ @AfterClass
+ public static void shutDownServices() {
+
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" Shut down KafkaTestBase ");
+ LOG.info("-------------------------------------------------------------------------");
+
+ shutdownClusters();
+
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" KafkaTestBase finished");
+ LOG.info("-------------------------------------------------------------------------");
+ }
+
+ protected static Configuration getFlinkConfiguration() {
+ Configuration flinkConfig = new Configuration();
+ flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+ flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+ flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+ flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
+ flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
+ flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
+ return flinkConfig;
+ }
+
+ protected static void startClusters(boolean secureMode) throws ClassNotFoundException {
+
+ // dynamically load the implementation for the test
+ Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
+ kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+
+ LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
+
+ kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode);
+
+ standardProps = kafkaServer.getStandardProperties();
+
+ brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
+
+ if (secureMode) {
+ if (!kafkaServer.isSecureRunSupported()) {
+ throw new IllegalStateException(
+ "Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment.");
+ }
+ secureProps = kafkaServer.getSecureProperties();
+ }
+
+ // start also a re-usable Flink mini cluster
+ flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
+ flink.start();
+
+ flinkPort = flink.getLeaderRPCPort();
+
+ }
+
+ protected static void shutdownClusters() {
+
+ flinkPort = -1;
+ if (flink != null) {
+ flink.shutdown();
+ }
+
+ if(secureProps != null) {
+ secureProps.clear();
+ }
+
+ kafkaServer.shutdown();
+
+ }
+
+
+
+ // ------------------------------------------------------------------------
+ // Execution utilities
+ // ------------------------------------------------------------------------
+
+
+ protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception {
+ try {
+ see.execute(name);
+ }
+ catch (ProgramInvocationException | JobExecutionException root) {
+ Throwable cause = root.getCause();
+
+ // search for nested SuccessExceptions
+ int depth = 0;
+ while (!(cause instanceof SuccessException)) {
+ if (cause == null || depth++ == 20) {
+ throw root;
+ }
+ else {
+ cause = cause.getCause();
+ }
+ }
+ }
+ }
+
+ protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
+ kafkaServer.createTestTopic(topic, numberOfPartitions, replicationFactor);
+ }
+
+ protected static void deleteTestTopic(String topic) {
+ kafkaServer.deleteTestTopic(topic);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
new file mode 100644
index 0000000..10c7b86
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.server.KafkaServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Properties;
+
+/**
+ * Abstract class providing a Kafka test environment
+ */
+public abstract class KafkaTestEnvironment {
+
+ protected static final String KAFKA_HOST = "localhost";
+
+ public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode);
+
+ public void prepare(int numberOfKafkaServers, boolean secureMode) {
+ this.prepare(numberOfKafkaServers, null, secureMode);
+ }
+
+ public abstract void shutdown();
+
+ public abstract void deleteTestTopic(String topic);
+
+ public abstract void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties properties);
+
+ public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
+ this.createTestTopic(topic, numberOfPartitions, replicationFactor, new Properties());
+ }
+
+ public abstract Properties getStandardProperties();
+
+ public abstract Properties getSecureProperties();
+
+ public abstract String getBrokerConnectionString();
+
+ public abstract String getVersion();
+
+ public abstract List<KafkaServer> getBrokers();
+
+ // -- consumer / producer instances:
+ public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, DeserializationSchema<T> deserializationSchema, Properties props) {
+ return getConsumer(topics, new KeyedDeserializationSchemaWrapper<T>(deserializationSchema), props);
+ }
+
+ public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, KeyedDeserializationSchema<T> readSchema, Properties props) {
+ return getConsumer(Collections.singletonList(topic), readSchema, props);
+ }
+
+ public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
+ return getConsumer(Collections.singletonList(topic), deserializationSchema, props);
+ }
+
+ public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props);
+
+ public abstract <T> StreamSink<T> getProducerSink(String topic,
+ KeyedSerializationSchema<T> serSchema, Properties props,
+ KafkaPartitioner<T> partitioner);
+
+ public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
+ KeyedSerializationSchema<T> serSchema, Properties props,
+ KafkaPartitioner<T> partitioner);
+
+ // -- offset handlers
+
+ public interface KafkaOffsetHandler {
+ Long getCommittedOffset(String topicName, int partition);
+ void close();
+ }
+
+ public abstract KafkaOffsetHandler createOffsetHandler(Properties props);
+
+ // -- leader failure simulation
+
+ public abstract void restartBroker(int leaderId) throws Exception;
+
+ public abstract int getLeaderToShutDown(String topic) throws Exception;
+
+ public abstract int getBrokerId(KafkaServer server);
+
+ public abstract boolean isSecureRunSupported();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
new file mode 100644
index 0000000..5dab05a
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+
+public class TestFixedPartitioner {
+
+
+ /**
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ */
+ @Test
+ public void testMoreFlinkThanBrokers() {
+ FixedPartitioner<String> part = new FixedPartitioner<>();
+
+ int[] partitions = new int[]{0};
+
+ part.open(0, 4, partitions);
+ Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
+
+ part.open(1, 4, partitions);
+ Assert.assertEquals(0, part.partition("abc2", null, null, partitions.length));
+
+ part.open(2, 4, partitions);
+ Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length));
+ Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); // check if it is changing ;)
+
+ part.open(3, 4, partitions);
+ Assert.assertEquals(0, part.partition("abc4", null, null, partitions.length));
+ }
+
+ /**
+ *
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ *
+ * </pre>
+ */
+ @Test
+ public void testFewerPartitions() {
+ FixedPartitioner<String> part = new FixedPartitioner<>();
+
+ int[] partitions = new int[]{0, 1, 2, 3, 4};
+ part.open(0, 2, partitions);
+ Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
+ Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
+
+ part.open(1, 2, partitions);
+ Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
+ Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
+ }
+
+ /*
+ * Flink Sinks: Kafka Partitions
+ * 1 ------------>---> 1
+ * 2 -----------/----> 2
+ * 3 ----------/
+ */
+ @Test
+ public void testMixedCase() {
+ FixedPartitioner<String> part = new FixedPartitioner<>();
+ int[] partitions = new int[]{0,1};
+
+ part.open(0, 3, partitions);
+ Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
+
+ part.open(1, 3, partitions);
+ Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
+
+ part.open(2, 3, partitions);
+ Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
new file mode 100644
index 0000000..0b3507a
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class AbstractFetcherTimestampsTest {
+
+ @Test
+ public void testPunctuatedWatermarks() throws Exception {
+ final String testTopic = "test topic name";
+ List<KafkaTopicPartition> originalPartitions = Arrays.asList(
+ new KafkaTopicPartition(testTopic, 7),
+ new KafkaTopicPartition(testTopic, 13),
+ new KafkaTopicPartition(testTopic, 21));
+
+ TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+ TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
+
+ TestFetcher<Long> fetcher = new TestFetcher<>(
+ sourceContext,
+ originalPartitions,
+ null, /* periodic watermark assigner */
+ new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()),
+ processingTimeProvider,
+ 0);
+
+ final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
+ final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
+ final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+
+ // elements generate a watermark if the timestamp is a multiple of three
+
+ // elements for partition 1
+ fetcher.emitRecord(1L, part1, 1L);
+ fetcher.emitRecord(2L, part1, 2L);
+ fetcher.emitRecord(3L, part1, 3L);
+ assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+ assertFalse(sourceContext.hasWatermark());
+
+ // elements for partition 2
+ fetcher.emitRecord(12L, part2, 1L);
+ assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+ assertFalse(sourceContext.hasWatermark());
+
+ // elements for partition 3
+ fetcher.emitRecord(101L, part3, 1L);
+ fetcher.emitRecord(102L, part3, 2L);
+ assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+
+ // now, we should have a watermark
+ assertTrue(sourceContext.hasWatermark());
+ assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 3
+ fetcher.emitRecord(1003L, part3, 3L);
+ fetcher.emitRecord(1004L, part3, 4L);
+ fetcher.emitRecord(1005L, part3, 5L);
+ assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+ // advance partition 1 beyond partition 2 - this bumps the watermark
+ fetcher.emitRecord(30L, part1, 4L);
+ assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+ assertTrue(sourceContext.hasWatermark());
+ assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 2 again - this bumps the watermark
+ fetcher.emitRecord(13L, part2, 2L);
+ assertFalse(sourceContext.hasWatermark());
+ fetcher.emitRecord(14L, part2, 3L);
+ assertFalse(sourceContext.hasWatermark());
+ fetcher.emitRecord(15L, part2, 3L);
+ assertTrue(sourceContext.hasWatermark());
+ assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
+ }
+
+ @Test
+ public void testPeriodicWatermarks() throws Exception {
+ final String testTopic = "test topic name";
+ List<KafkaTopicPartition> originalPartitions = Arrays.asList(
+ new KafkaTopicPartition(testTopic, 7),
+ new KafkaTopicPartition(testTopic, 13),
+ new KafkaTopicPartition(testTopic, 21));
+
+ TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+
+ TestFetcher<Long> fetcher = new TestFetcher<>(
+ sourceContext,
+ originalPartitions,
+ new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
+ null, /* punctuated watermarks assigner*/
+ processingTimeService,
+ 10);
+
+ final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
+ final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
+ final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+
+ // elements generate a watermark if the timestamp is a multiple of three
+
+ // elements for partition 1
+ fetcher.emitRecord(1L, part1, 1L);
+ fetcher.emitRecord(2L, part1, 2L);
+ fetcher.emitRecord(3L, part1, 3L);
+ assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+
+ // elements for partition 2
+ fetcher.emitRecord(12L, part2, 1L);
+ assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+
+ // elements for partition 3
+ fetcher.emitRecord(101L, part3, 1L);
+ fetcher.emitRecord(102L, part3, 2L);
+ assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+
+ processingTimeService.setCurrentTime(10);
+
+ // now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
+ assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 3
+ fetcher.emitRecord(1003L, part3, 3L);
+ fetcher.emitRecord(1004L, part3, 4L);
+ fetcher.emitRecord(1005L, part3, 5L);
+ assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+ // advance partition 1 beyond partition 2 - this bumps the watermark
+ fetcher.emitRecord(30L, part1, 4L);
+ assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+
+ processingTimeService.setCurrentTime(20);
+
+ // this blocks until the periodic thread emitted the watermark
+ assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 2 again - this bumps the watermark
+ fetcher.emitRecord(13L, part2, 2L);
+ fetcher.emitRecord(14L, part2, 3L);
+ fetcher.emitRecord(15L, part2, 3L);
+
+ processingTimeService.setCurrentTime(30);
+ // this blocks until the periodic thread emitted the watermark
+ long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
+ assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+ }
+
+ // ------------------------------------------------------------------------
+ // Test mocks
+ // ------------------------------------------------------------------------
+
+ private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
+
+ protected TestFetcher(
+ SourceContext<T> sourceContext,
+ List<KafkaTopicPartition> assignedPartitions,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ ProcessingTimeService processingTimeProvider,
+ long autoWatermarkInterval) throws Exception
+ {
+ super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), false);
+ }
+
+ @Override
+ public void runFetchLoop() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void cancel() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
+ return new Object();
+ }
+
+ @Override
+ public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class TestSourceContext<T> implements SourceContext<T> {
+
+ private final Object checkpointLock = new Object();
+ private final Object watermarkLock = new Object();
+
+ private volatile StreamRecord<T> latestElement;
+ private volatile Watermark currentWatermark;
+
+ @Override
+ public void collect(T element) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ this.latestElement = new StreamRecord<>(element, timestamp);
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ synchronized (watermarkLock) {
+ currentWatermark = mark;
+ watermarkLock.notifyAll();
+ }
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return checkpointLock;
+ }
+
+ @Override
+ public void close() {}
+
+ public StreamRecord<T> getLatestElement() {
+ return latestElement;
+ }
+
+ public boolean hasWatermark() {
+ return currentWatermark != null;
+ }
+
+ public Watermark getLatestWatermark() throws InterruptedException {
+ synchronized (watermarkLock) {
+ while (currentWatermark == null) {
+ watermarkLock.wait();
+ }
+ Watermark wm = currentWatermark;
+ currentWatermark = null;
+ return wm;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
+
+ private volatile long maxTimestamp = Long.MIN_VALUE;
+
+ @Override
+ public long extractTimestamp(Long element, long previousElementTimestamp) {
+ maxTimestamp = Math.max(maxTimestamp, element);
+ return element;
+ }
+
+ @Nullable
+ @Override
+ public Watermark getCurrentWatermark() {
+ return new Watermark(maxTimestamp);
+ }
+ }
+
+ private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
+
+ @Override
+ public long extractTimestamp(Long element, long previousElementTimestamp) {
+ return element;
+ }
+
+ @Nullable
+ @Override
+ public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
+ return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
new file mode 100644
index 0000000..0e16263
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import static org.junit.Assert.*;
+
+public class KafkaTopicPartitionTest {
+
+ @Test
+ public void validateUid() {
+ Field uidField;
+ try {
+ uidField = KafkaTopicPartition.class.getDeclaredField("serialVersionUID");
+ uidField.setAccessible(true);
+ }
+ catch (NoSuchFieldException e) {
+ fail("serialVersionUID is not defined");
+ return;
+ }
+
+ assertTrue(Modifier.isStatic(uidField.getModifiers()));
+ assertTrue(Modifier.isFinal(uidField.getModifiers()));
+ assertTrue(Modifier.isPrivate(uidField.getModifiers()));
+
+ assertEquals(long.class, uidField.getType());
+
+ // the UID has to be constant to make sure old checkpoints/savepoints can be read
+ try {
+ assertEquals(722083576322742325L, uidField.getLong(null));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
new file mode 100644
index 0000000..9e8e1d9
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.mockito.Mockito.mock;
+
+@SuppressWarnings("serial")
+public class DataGenerators {
+
+ public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
+ KafkaTestEnvironment testServer, String topic,
+ final int numPartitions,
+ final int numElements,
+ final boolean randomizeOrder) throws Exception {
+ env.setParallelism(numPartitions);
+ env.getConfig().disableSysoutLogging();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ DataStream<Integer> stream = env.addSource(
+ new RichParallelSourceFunction<Integer>() {
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Integer> ctx) {
+ // create a sequence
+ int[] elements = new int[numElements];
+ for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
+ i < numElements;
+ i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
+
+ elements[i] = val;
+ }
+
+ // scramble the sequence
+ if (randomizeOrder) {
+ Random rnd = new Random();
+ for (int i = 0; i < elements.length; i++) {
+ int otherPos = rnd.nextInt(elements.length);
+
+ int tmp = elements[i];
+ elements[i] = elements[otherPos];
+ elements[otherPos] = tmp;
+ }
+ }
+
+ // emit the sequence
+ int pos = 0;
+ while (running && pos < elements.length) {
+ ctx.collect(elements[pos++]);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ Properties props = new Properties();
+ props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()));
+ Properties secureProps = testServer.getSecureProperties();
+ if(secureProps != null) {
+ props.putAll(testServer.getSecureProperties());
+ }
+
+ stream = stream.rebalance();
+ testServer.produceIntoKafka(stream, topic,
+ new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
+ props,
+ new KafkaPartitioner<Integer>() {
+ @Override
+ public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+ return next % numPartitions;
+ }
+ });
+
+ env.execute("Scrambles int sequence generator");
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static class InfiniteStringsGenerator extends Thread {
+
+ private final KafkaTestEnvironment server;
+
+ private final String topic;
+
+ private volatile Throwable error;
+
+ private volatile boolean running = true;
+
+
+ public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) {
+ this.server = server;
+ this.topic = topic;
+ }
+
+ @Override
+ public void run() {
+ // we manually feed data into the Kafka sink
+ RichFunction producer = null;
+ try {
+ Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString());
+ producerProperties.setProperty("retries", "3");
+ StreamTransformation<String> mockTransform = new MockStreamTransformation();
+ DataStream<String> stream = new DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform);
+
+ StreamSink<String> sink = server.getProducerSink(
+ topic,
+ new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
+ producerProperties,
+ new FixedPartitioner<String>());
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(sink);
+
+ testHarness.open();
+
+ final StringBuilder bld = new StringBuilder();
+ final Random rnd = new Random();
+
+ while (running) {
+ bld.setLength(0);
+
+ int len = rnd.nextInt(100) + 1;
+ for (int i = 0; i < len; i++) {
+ bld.append((char) (rnd.nextInt(20) + 'a') );
+ }
+
+ String next = bld.toString();
+ testHarness.processElement(new StreamRecord<>(next));
+ }
+ }
+ catch (Throwable t) {
+ this.error = t;
+ }
+ finally {
+ if (producer != null) {
+ try {
+ producer.close();
+ }
+ catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ public void shutdown() {
+ this.running = false;
+ this.interrupt();
+ }
+
+ public Throwable getError() {
+ return this.error;
+ }
+
+ private static class MockStreamTransformation extends StreamTransformation<String> {
+ public MockStreamTransformation() {
+ super("MockTransform", TypeInfoParser.<String>parse("String"), 1);
+ }
+
+ @Override
+ public void setChainingStrategy(ChainingStrategy strategy) {
+
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ return null;
+ }
+ }
+
+ public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
new file mode 100644
index 0000000..2bd400c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
+ Checkpointed<Integer>, CheckpointListener, Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
+
+ private static final long serialVersionUID = 6334389850158707313L;
+
+ public static volatile boolean failedBefore;
+ public static volatile boolean hasBeenCheckpointedBeforeFailure;
+
+ private final int failCount;
+ private int numElementsTotal;
+ private int numElementsThisTime;
+
+ private boolean failer;
+ private boolean hasBeenCheckpointed;
+
+ private Thread printer;
+ private volatile boolean printerRunning = true;
+
+ public FailingIdentityMapper(int failCount) {
+ this.failCount = failCount;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+ printer = new Thread(this, "FailingIdentityMapper Status Printer");
+ printer.start();
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ numElementsTotal++;
+ numElementsThisTime++;
+
+ if (!failedBefore) {
+ Thread.sleep(10);
+
+ if (failer && numElementsTotal >= failCount) {
+ hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+ failedBefore = true;
+ throw new Exception("Artificial Test Failure");
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public void close() throws Exception {
+ printerRunning = false;
+ if (printer != null) {
+ printer.interrupt();
+ printer = null;
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ this.hasBeenCheckpointed = true;
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return numElementsTotal;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ numElementsTotal = state;
+ }
+
+ @Override
+ public void run() {
+ while (printerRunning) {
+ try {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e) {
+ // ignore
+ }
+ LOG.info("============================> Failing mapper {}: count={}, totalCount={}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ numElementsThisTime, numElementsTotal);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
new file mode 100644
index 0000000..055326d
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.util.Properties;
+
+public class FakeStandardProducerConfig {
+
+ public static Properties get() {
+ Properties p = new Properties();
+ p.setProperty("bootstrap.servers", "localhost:12345");
+ p.setProperty("key.serializer", ByteArraySerializer.class.getName());
+ p.setProperty("value.serializer", ByteArraySerializer.class.getName());
+ return p;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
new file mode 100644
index 0000000..acdad5a
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class JobManagerCommunicationUtils {
+
+ private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
+
+
+ public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception {
+ while (true) {
+ // find the jobID
+ Future<Object> listResponse = jobManager.ask(
+ JobManagerMessages.getRequestRunningJobsStatus(), askTimeout);
+
+ Object result = Await.result(listResponse, askTimeout);
+ List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+
+
+ if (jobs.isEmpty()) {
+ return;
+ }
+
+ Thread.sleep(50);
+ }
+ }
+
+ public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
+ cancelCurrentJob(jobManager, null);
+ }
+
+ public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception {
+ JobStatusMessage status = null;
+
+ for (int i = 0; i < 200; i++) {
+ // find the jobID
+ Future<Object> listResponse = jobManager.ask(
+ JobManagerMessages.getRequestRunningJobsStatus(),
+ askTimeout);
+
+ List<JobStatusMessage> jobs;
+ try {
+ Object result = Await.result(listResponse, askTimeout);
+ jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+ }
+ catch (Exception e) {
+ throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
+ }
+
+ if (jobs.isEmpty()) {
+ // try again, fall through the loop
+ Thread.sleep(50);
+ }
+ else if (jobs.size() == 1) {
+ status = jobs.get(0);
+ }
+ else if(name != null) {
+ for(JobStatusMessage msg: jobs) {
+ if(msg.getJobName().equals(name)) {
+ status = msg;
+ }
+ }
+ if(status == null) {
+ throw new Exception("Could not cancel job - no job matched expected name = '" + name +"' in " + jobs);
+ }
+ } else {
+ String jobNames = "";
+ for(JobStatusMessage jsm: jobs) {
+ jobNames += jsm.getJobName() + ", ";
+ }
+ throw new Exception("Could not cancel job - more than one running job: " + jobNames);
+ }
+ }
+
+ if (status == null) {
+ throw new Exception("Could not cancel job - no running jobs");
+ }
+ else if (status.getJobState().isGloballyTerminalState()) {
+ throw new Exception("Could not cancel job - job is not running any more");
+ }
+
+ JobID jobId = status.getJobId();
+
+ Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
+ try {
+ Await.result(response, askTimeout);
+ }
+ catch (Exception e) {
+ throw new Exception("Sending the 'cancel' message failed.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
new file mode 100644
index 0000000..e105e01
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+public class PartitionValidatingMapper implements MapFunction<Integer, Integer> {
+
+ private static final long serialVersionUID = 1088381231244959088L;
+
+ /* the partitions from which this function received data */
+ private final Set<Integer> myPartitions = new HashSet<>();
+
+ private final int numPartitions;
+ private final int maxPartitions;
+
+ public PartitionValidatingMapper(int numPartitions, int maxPartitions) {
+ this.numPartitions = numPartitions;
+ this.maxPartitions = maxPartitions;
+ }
+
+ @Override
+ public Integer map(Integer value) throws Exception {
+ // validate that the partitioning is identical
+ int partition = value % numPartitions;
+ myPartitions.add(partition);
+ if (myPartitions.size() > maxPartitions) {
+ throw new Exception("Error: Elements from too many different partitions: " + myPartitions
+ + ". Expect elements only from " + maxPartitions + " partitions");
+ }
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
new file mode 100644
index 0000000..1d61229
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * An identity map function that sleeps between elements, throttling the
+ * processing speed.
+ *
+ * @param <T> The type mapped.
+ */
+public class ThrottledMapper<T> implements MapFunction<T,T> {
+
+ private static final long serialVersionUID = 467008933767159126L;
+
+ private final int sleep;
+
+ public ThrottledMapper(int sleep) {
+ this.sleep = sleep;
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ Thread.sleep(this.sleep);
+ return value;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
new file mode 100644
index 0000000..c9e9ac1
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+
+import java.io.Serializable;
+
+/**
+ * Special partitioner that uses the first field of a 2-tuple as the partition,
+ * and that expects a specific number of partitions.
+ */
+public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int expectedPartitions;
+
+ public Tuple2Partitioner(int expectedPartitions) {
+ this.expectedPartitions = expectedPartitions;
+ }
+
+ @Override
+ public int partition(Tuple2<Integer, Integer> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+ if (numPartitions != expectedPartitions) {
+ throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
+ }
+
+ return next.f0;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
new file mode 100644
index 0000000..7813561
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.test.util.SuccessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+
+public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements Checkpointed<Tuple2<Integer, BitSet>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
+
+ private static final long serialVersionUID = 1748426382527469932L;
+
+ private final int numElementsTotal;
+
+ private BitSet duplicateChecker = new BitSet(); // this is checkpointed
+
+ private int numElements; // this is checkpointed
+
+
+ public ValidatingExactlyOnceSink(int numElementsTotal) {
+ this.numElementsTotal = numElementsTotal;
+ }
+
+
+ @Override
+ public void invoke(Integer value) throws Exception {
+ numElements++;
+
+ if (duplicateChecker.get(value)) {
+ throw new Exception("Received a duplicate: " + value);
+ }
+ duplicateChecker.set(value);
+ if (numElements == numElementsTotal) {
+ // validate
+ if (duplicateChecker.cardinality() != numElementsTotal) {
+ throw new Exception("Duplicate checker has wrong cardinality");
+ }
+ else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
+ throw new Exception("Received sparse sequence");
+ }
+ else {
+ throw new SuccessException();
+ }
+ }
+ }
+
+ @Override
+ public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) {
+ LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId);
+ return new Tuple2<>(numElements, duplicateChecker);
+ }
+
+ @Override
+ public void restoreState(Tuple2<Integer, BitSet> state) {
+ LOG.info("restoring num elements to {}", state.f0);
+ this.numElements = state.f0;
+ this.duplicateChecker = state.f1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
new file mode 100644
index 0000000..8a4c408
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * Simple ZooKeeper serializer for Strings.
+ */
+public class ZooKeeperStringSerializer implements ZkSerializer {
+
+ private static final Charset CHARSET = Charset.forName("UTF-8");
+
+ @Override
+ public byte[] serialize(Object data) {
+ if (data instanceof String) {
+ return ((String) data).getBytes(CHARSET);
+ }
+ else {
+ throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
+ }
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ else {
+ return new String(bytes, CHARSET);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file