You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:49 UTC
[32/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector]
Remove copied Kafka code again. Implemented our own topic metadata retrieval.
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
deleted file mode 100644
index 8c1883e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ /dev/null
@@ -1,1225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.network.SocketServer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.collections.map.LinkedMap;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.net.NetUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import org.apache.flink.util.Collector;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.collection.Seq;
-
-/**
- * Code in this test is based on the following GitHub repository:
- * (as per commit bc6b2b2d5f6424d5f377aa6c0871e82a956462ef)
- * <p/>
- * https://github.com/sakserv/hadoop-mini-clusters (ASL licensed)
- */
-@SuppressWarnings("serial")
-public class KafkaITCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);
- private static final int NUMBER_OF_KAFKA_SERVERS = 3;
-
- private static int zkPort;
- private static String kafkaHost;
-
- private static String zookeeperConnectionString;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
- public static File tmpZkDir;
- public static List<File> tmpKafkaDirs;
-
- private static TestingServer zookeeper;
- private static List<KafkaServer> brokers;
- private static String brokerConnectionStrings = "";
-
- private static ConsumerConfig standardCC;
-
- private static ZkClient zkClient;
-
-
- @BeforeClass
- public static void prepare() throws IOException {
- LOG.info("Starting KafkaITCase.prepare()");
- tmpZkDir = tempFolder.newFolder();
-
- tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_KAFKA_SERVERS);
- for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
- tmpKafkaDirs.add(tempFolder.newFolder());
- }
-
- kafkaHost = InetAddress.getLocalHost().getHostName();
- zkPort = NetUtils.getAvailablePort();
- zookeeperConnectionString = "localhost:" + zkPort;
-
- zookeeper = null;
- brokers = null;
-
- try {
- LOG.info("Starting Zookeeper");
- zookeeper = getZookeeper();
- LOG.info("Starting KafkaServer");
- brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
- for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
- brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
- SocketServer socketServer = brokers.get(i).socketServer();
- String host = "localhost";
- if(socketServer.host() != null) {
- host = socketServer.host();
- }
- brokerConnectionStrings += host+":"+socketServer.port()+",";
- }
-
- LOG.info("ZK and KafkaServer started.");
- } catch (Throwable t) {
- LOG.warn("Test failed with exception", t);
- Assert.fail("Test failed with: " + t.getMessage());
- }
-
- Properties cProps = new Properties();
- cProps.setProperty("zookeeper.connect", zookeeperConnectionString);
- cProps.setProperty("group.id", "flink-tests");
- cProps.setProperty("auto.commit.enable", "false");
-
- cProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning.
-
- standardCC = new ConsumerConfig(cProps);
-
- zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
- }
-
- @AfterClass
- public static void shutDownServices() {
- LOG.info("Shutting down all services");
- for (KafkaServer broker : brokers) {
- if (broker != null) {
- broker.shutdown();
- }
- }
- if (zookeeper != null) {
- try {
- zookeeper.stop();
- } catch (IOException e) {
- LOG.warn("ZK.stop() failed", e);
- }
- }
- zkClient.close();
- }
-
- // -------------------------- test checkpointing ------------------------
- @Test
- public void testCheckpointing() throws Exception {
- createTestTopic("testCheckpointing", 1, 1);
-
- Properties props = new Properties();
- props.setProperty("zookeeper.connect", zookeeperConnectionString);
- props.setProperty("group.id", "testCheckpointing");
- props.setProperty("auto.commit.enable", "false");
- ConsumerConfig cc = new ConsumerConfig(props);
- PersistentKafkaSource<String> source = new PersistentKafkaSource<String>("testCheckpointing", new FakeDeserializationSchema(), cc);
-
-
- Field pendingCheckpointsField = PersistentKafkaSource.class.getDeclaredField("pendingCheckpoints");
- pendingCheckpointsField.setAccessible(true);
- LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
-
-
- Assert.assertEquals(0, pendingCheckpoints.size());
- // first restore
- source.restoreState(new long[]{1337});
- // then open
- source.open(new Configuration());
- long[] state1 = source.snapshotState(1, 15);
- Assert.assertArrayEquals(new long[]{1337}, state1);
- long[] state2 = source.snapshotState(2, 30);
- Assert.assertArrayEquals(new long[]{1337}, state2);
- Assert.assertEquals(2, pendingCheckpoints.size());
-
- source.notifyCheckpointComplete(1);
- Assert.assertEquals(1, pendingCheckpoints.size());
-
- source.notifyCheckpointComplete(2);
- Assert.assertEquals(0, pendingCheckpoints.size());
-
- source.notifyCheckpointComplete(666); // invalid checkpoint
- Assert.assertEquals(0, pendingCheckpoints.size());
-
- // create 500 snapshots
- for(int i = 0; i < 500; i++) {
- source.snapshotState(i, 15 * i);
- }
- Assert.assertEquals(500, pendingCheckpoints.size());
-
- // commit only the second last
- source.notifyCheckpointComplete(498);
- Assert.assertEquals(1, pendingCheckpoints.size());
-
- // access invalid checkpoint
- source.notifyCheckpointComplete(490);
-
- // and the last
- source.notifyCheckpointComplete(499);
- Assert.assertEquals(0, pendingCheckpoints.size());
- }
-
-
- private static class FakeDeserializationSchema implements DeserializationSchema<String> {
-
- @Override
- public String deserialize(byte[] message) {
- return null;
- }
-
- @Override
- public boolean isEndOfStream(String nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return null;
- }
- }
-
- // ---------------------------------------------------------------
-
-
- @Test
- public void testOffsetManipulation() {
- ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
-
- final String topicName = "testOffsetManipulation";
-
- // create topic
- Properties topicConfig = new Properties();
- LOG.info("Creating topic {}", topicName);
- AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
-
- PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 1337);
-
- Assert.assertEquals(1337L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
-
- zk.close();
- }
-
- public static class TestPersistentKafkaSource<OUT> extends PersistentKafkaSource<OUT> {
- private static Object sync = new Object();
- public static long[] finalOffset;
- public TestPersistentKafkaSource(String topicName, DeserializationSchema<OUT> deserializationSchema, ConsumerConfig consumerConfig) {
- super(topicName, deserializationSchema, consumerConfig);
- }
-
- @Override
- public void close() {
- super.close();
- LOG.info("Starting close " +Arrays.toString(commitedOffsets));
- synchronized (sync) {
- if (finalOffset == null) {
- finalOffset = new long[commitedOffsets.length];
- }
- for(int i = 0; i < commitedOffsets.length; i++) {
- if(commitedOffsets[i] > 0) {
- if(finalOffset[i] > 0) {
- throw new RuntimeException("This is unexpected on i = "+i);
- }
- finalOffset[i] = commitedOffsets[i];
- }
- }
- }
- LOG.info("Finished closing. Final "+Arrays.toString(finalOffset));
- }
- }
- /**
- * We want to use the High level java consumer API but manage the offset in Zookeeper manually.
- *
- */
- @Test
- public void testPersistentSourceWithOffsetUpdates() throws Exception {
- LOG.info("Starting testPersistentSourceWithOffsetUpdates()");
-
- ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
-
- final String topicName = "testOffsetHacking";
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3);
- env.getConfig().disableSysoutLogging();
- env.enableCheckpointing(50);
- env.setNumberOfExecutionRetries(0);
-
- // create topic
- Properties topicConfig = new Properties();
- LOG.info("Creating topic {}", topicName);
- AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
-
- // write a sequence from 0 to 99 to each of the three partitions.
- writeSequence(env, topicName, 0, 99);
-
- readSequence(env, standardCC, topicName, 0, 100, 300);
-
- LOG.info("State in persistent kafka sources {}", TestPersistentKafkaSource.finalOffset);
-
- // check offsets to be set at least higher than 50.
- // correctly, we would expect them to be set to 99, but right now there is no way of stopping a topology once all pending
- // checkpoints have been committed.
- // To work around that limitation, the persistent kafka consumer is throtteled with a thread.sleep().
-
- long o1 = -1, o2 = -1, o3 = -1;
- if(TestPersistentKafkaSource.finalOffset[0] > 0) {
- o1 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0);
- Assert.assertTrue("The offset seems incorrect, got " + o1, o1 == TestPersistentKafkaSource.finalOffset[0]);
- }
- if(TestPersistentKafkaSource.finalOffset[1] > 0) {
- o2 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1);
- Assert.assertTrue("The offset seems incorrect, got " + o2, o2 == TestPersistentKafkaSource.finalOffset[1]);
- }
- if(TestPersistentKafkaSource.finalOffset[2] > 0) {
- o3 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2);
- Assert.assertTrue("The offset seems incorrect, got " + o3, o3 == TestPersistentKafkaSource.finalOffset[2]);
- }
- Assert.assertFalse("no offset has been set", TestPersistentKafkaSource.finalOffset[0] == 0 &&
- TestPersistentKafkaSource.finalOffset[1] == 0 &&
- TestPersistentKafkaSource.finalOffset[2] == 0);
- LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
-
- LOG.info("Manipulating offsets");
- // set the offset to 50 for the three partitions
- PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 50);
- PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 1, 50);
- PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 2, 50);
-
- // create new env
- env = StreamExecutionEnvironment.createLocalEnvironment(3);
- env.getConfig().disableSysoutLogging();
- readSequence(env, standardCC, topicName, 50, 50, 150);
-
- zk.close();
-
- LOG.info("Finished testPersistentSourceWithOffsetUpdates()");
- }
-
- private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, final String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception {
- LOG.info("Reading sequence for verification until final count {}", finalCount);
-
- TypeInformation<Tuple2<Integer, Integer>> tuple2info = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
- TestPersistentKafkaSource<Tuple2<Integer, Integer>> pks = new TestPersistentKafkaSource<>(topicName,
- new TypeInformationSerializationSchema<>(tuple2info, env.getConfig()), cc);
-
- DataStream<Tuple2<Integer, Integer>> source = env.addSource(pks).map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
- // we need to slow down the source so that it can participate in a few checkpoints.
- // Otherwise it would write its data into buffers and shut down.
- @Override
- public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
- Thread.sleep(50);
- return value;
- }
- });
-
- // verify data
- DataStream<Integer> validIndexes = source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
- private static final long serialVersionUID = 1L;
-
- int[] values = new int[valuesCount];
- int count = 0;
-
- @Override
- public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
- values[value.f1 - valuesStartFrom]++;
- count++;
-
- LOG.info("Reader " + getRuntimeContext().getIndexOfThisSubtask() + " got " + value + " count=" + count + "/" + finalCount);
- // verify if we've seen everything
- if (count == finalCount) {
- LOG.info("Received all values");
- for (int i = 0; i < values.length; i++) {
- int v = values[i];
- if (v != 3) {
- LOG.warn("Test is going to fail");
- printTopic(topicName, valuesCount, this.getRuntimeContext().getExecutionConfig());
- throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
- }
- }
- // test has passed
- throw new SuccessException();
- }
- }
-
- }).setParallelism(1);
-
- tryExecute(env, "Read data from Kafka");
-
- LOG.info("Successfully read sequence for verification");
- }
-
-
-
- private void writeSequence(StreamExecutionEnvironment env, String topicName, final int from, final int to) throws Exception {
- LOG.info("Writing sequence from {} to {} to topic {}", from, to, topicName);
-
- TypeInformation<Tuple2<Integer, Integer>> tuple2info = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
- DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
- private static final long serialVersionUID = 1L;
- boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
- LOG.info("Starting source.");
- int cnt = from;
- int partition = getRuntimeContext().getIndexOfThisSubtask();
- while (running) {
- LOG.info("Writing " + cnt + " to partition " + partition);
- ctx.collect(new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(),
- cnt));
- if (cnt == to) {
- LOG.info("Writer reached end.");
- return;
- }
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- LOG.info("Source got cancel()");
- running = false;
- }
- }).setParallelism(3);
- stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
- topicName,
- new TypeInformationSerializationSchema<>(tuple2info, env.getConfig()),
- new T2Partitioner()
- )).setParallelism(3);
- env.execute("Write sequence from " + from + " to " + to + " to topic " + topicName);
- LOG.info("Finished writing sequence");
- }
-
- private static class T2Partitioner implements SerializableKafkaPartitioner {
- private static final long serialVersionUID = 1L;
-
- @Override
- public int partition(Object key, int numPartitions) {
- if(numPartitions != 3) {
- throw new IllegalArgumentException("Expected three partitions");
- }
-
- @SuppressWarnings("unchecked")
- Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
- return element.f0;
- }
- }
-
-
- @Test
- public void regularKafkaSourceTest() throws Exception {
- LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
-
- String topic = "regularKafkaSourceTestTopic";
- createTestTopic(topic, 1, 1);
-
- TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
- // add consuming topology:
- DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
- new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup",
- new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig()), 5000));
-
- consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
- private static final long serialVersionUID = 1L;
-
- int elCnt = 0;
- int start = -1;
- BitSet validator = new BitSet(101);
-
- @Override
- public void invoke(Tuple2<Long, String> value) throws Exception {
- LOG.debug("Got value = " + value);
- String[] sp = value.f1.split("-");
- int v = Integer.parseInt(sp[1]);
-
- assertEquals(value.f0 - 1000, (long) v);
-
- if (start == -1) {
- start = v;
- }
- Assert.assertFalse("Received tuple twice", validator.get(v - start));
- validator.set(v - start);
- elCnt++;
- if (elCnt == 100) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) != 100) {
- throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
- }
- throw new SuccessException();
- }
- }
- });
-
- // add producing topology
- DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
- private static final long serialVersionUID = 1L;
- boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
- LOG.info("Starting source.");
- int cnt = 0;
- while (running) {
- ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignored) {
- }
- }
- }
-
- @Override
- public void cancel() {
- LOG.info("Source got cancel()");
- running = false;
- }
- });
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic,
- new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringInfo, env.getConfig())));
-
- tryExecute(env, "regular kafka source test");
-
- LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
- }
-
- @Test
- public void tupleTestTopology() throws Exception {
- LOG.info("Starting KafkaITCase.tupleTestTopology()");
-
- String topic = "tupleTestTopic";
- createTestTopic(topic, 1, 1);
-
- TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- // add consuming topology:
- DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
- new PersistentKafkaSource<>(topic,
- new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig()),
- standardCC
- ));
- consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
- private static final long serialVersionUID = 1L;
-
- int elCnt = 0;
- int start = -1;
- BitSet validator = new BitSet(101);
-
- @Override
- public void invoke(Tuple2<Long, String> value) throws Exception {
- LOG.info("Got value " + value);
- String[] sp = value.f1.split("-");
- int v = Integer.parseInt(sp[1]);
-
- assertEquals(value.f0 - 1000, (long) v);
-
- if (start == -1) {
- start = v;
- }
- Assert.assertFalse("Received tuple twice", validator.get(v - start));
- validator.set(v - start);
- elCnt++;
- if (elCnt == 100) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) != 100) {
- throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
- }
- throw new SuccessException();
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- Assert.assertTrue("No element received", elCnt > 0);
- }
- });
-
- // add producing topology
- DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
- private static final long serialVersionUID = 1L;
- boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
- LOG.info("Starting source.");
- int cnt = 0;
- while (running) {
- ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
- LOG.info("Produced " + cnt);
-
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignored) {
- }
- }
- }
-
- @Override
- public void cancel() {
- LOG.info("Source got cancel()");
- running = false;
- }
- });
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic,
- new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringInfo, env.getConfig())));
-
- tryExecute(env, "tupletesttopology");
-
- LOG.info("Finished KafkaITCase.tupleTestTopology()");
- }
-
- /**
- * Test Flink's Kafka integration also with very big records (30MB)
- *
- * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
- *
- * @throws Exception
- */
- @Test
- public void bigRecordTestTopology() throws Exception {
-
- LOG.info("Starting KafkaITCase.bigRecordTestTopology()");
-
- String topic = "bigRecordTestTopic";
- createTestTopic(topic, 1, 1);
-
- final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- // add consuming topology:
- TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
- new TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(longBytesInfo, env.getConfig());
-
- Properties consumerProps = new Properties();
- consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 30));
- consumerProps.setProperty("zookeeper.connect", zookeeperConnectionString);
- consumerProps.setProperty("group.id", "test");
- consumerProps.setProperty("auto.commit.enable", "false");
- consumerProps.setProperty("auto.offset.reset", "smallest");
-
- ConsumerConfig cc = new ConsumerConfig(consumerProps);
- DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(
- new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, cc));
-
- consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
- private static final long serialVersionUID = 1L;
-
- int elCnt = 0;
-
- @Override
- public void invoke(Tuple2<Long, byte[]> value) throws Exception {
- LOG.info("Received {}", value.f0);
- elCnt++;
- if(value.f0 == -1) {
- // we should have seen 11 elements now.
- if(elCnt == 11) {
- throw new SuccessException();
- } else {
- throw new RuntimeException("There have been "+elCnt+" elements");
- }
- }
- if(elCnt > 10) {
- throw new RuntimeException("More than 10 elements seen: "+elCnt);
- }
- }
- }).setParallelism(1);
-
- // add producing topology
- DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
- private static final long serialVersionUID = 1L;
- boolean running;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- running = true;
- }
-
- @Override
- public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
- LOG.info("Starting source.");
- long cnt = 0;
- Random rnd = new Random(1337);
- while (running) {
- //
- byte[] wl = new byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))];
- ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
- LOG.info("Emitted cnt=" + (cnt - 1) + " with byte.length = " + wl.length);
-
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignored) {
- }
- if(cnt == 10) {
- LOG.info("Send end signal");
- // signal end
- ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
- running = false;
- }
- }
- }
-
- @Override
- public void cancel() {
- LOG.info("Source got cancel()");
- running = false;
- }
- });
-
- stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
- new TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(longBytesInfo, env.getConfig()))
- );
-
- tryExecute(env, "big topology test");
-
- LOG.info("Finished KafkaITCase.bigRecordTestTopology()");
- }
-
-
- @Test
- public void customPartitioningTestTopology() throws Exception {
- LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
-
- String topic = "customPartitioningTestTopic";
-
- createTestTopic(topic, 3, 1);
-
- final TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- // add consuming topology:
- DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
- new PersistentKafkaSource<Tuple2<Long, String>>(topic,
- new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringInfo, env.getConfig()),
- standardCC));
- consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
- private static final long serialVersionUID = 1L;
-
- int start = -1;
- BitSet validator = new BitSet(101);
-
- boolean gotPartition1 = false;
- boolean gotPartition2 = false;
- boolean gotPartition3 = false;
-
- @Override
- public void invoke(Tuple2<Long, String> value) throws Exception {
- LOG.debug("Got " + value);
- String[] sp = value.f1.split("-");
- int v = Integer.parseInt(sp[1]);
-
- assertEquals(value.f0 - 1000, (long) v);
-
- switch (v) {
- case 9:
- gotPartition1 = true;
- break;
- case 19:
- gotPartition2 = true;
- break;
- case 99:
- gotPartition3 = true;
- break;
- }
-
- if (start == -1) {
- start = v;
- }
- Assert.assertFalse("Received tuple twice", validator.get(v - start));
- validator.set(v - start);
-
- if (gotPartition1 && gotPartition2 && gotPartition3) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) != 100) {
- throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
- }
- throw new SuccessException();
- }
- }
- });
-
- // add producing topology
- DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
- private static final long serialVersionUID = 1L;
- boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
- LOG.info("Starting source.");
- int cnt = 0;
- while (running) {
- ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignored) {
- }
- }
- }
-
- @Override
- public void cancel() {
- LOG.info("Source got cancel()");
- running = false;
- }
- });
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic,
- new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringInfo, env.getConfig()), new CustomPartitioner()));
-
- tryExecute(env, "custom partitioning test");
-
- LOG.info("Finished KafkaITCase.customPartitioningTestTopology()");
- }
-
- /**
- * This is for a topic with 3 partitions and Tuple2<Long, String>
- */
- private static class CustomPartitioner implements SerializableKafkaPartitioner {
- private static final long serialVersionUID = 1L;
-
- @Override
- public int partition(Object key, int numPartitions) {
-
- @SuppressWarnings("unchecked")
- Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
- if (tuple.f0 < 10) {
- return 0;
- } else if (tuple.f0 < 20) {
- return 1;
- } else {
- return 2;
- }
- }
- }
-
-
- @Test
- public void simpleTestTopology() throws Exception {
- String topic = "simpleTestTopic";
-
- createTestTopic(topic, 1, 1);
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- // add consuming topology:
- DataStreamSource<String> consuming = env.addSource(
- new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
- consuming.addSink(new SinkFunction<String>() {
- private static final long serialVersionUID = 1L;
-
- int elCnt = 0;
- int start = -1;
- BitSet validator = new BitSet(101);
-
- @Override
- public void invoke(String value) throws Exception {
- LOG.debug("Got " + value);
- String[] sp = value.split("-");
- int v = Integer.parseInt(sp[1]);
- if (start == -1) {
- start = v;
- }
- Assert.assertFalse("Received tuple twice", validator.get(v - start));
- validator.set(v - start);
- elCnt++;
- if (elCnt == 100) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) != 100) {
- throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
- }
- throw new SuccessException();
- }
- }
- });
-
- // add producing topology
- DataStream<String> stream = env.addSource(new SourceFunction<String>() {
- private static final long serialVersionUID = 1L;
- boolean running = true;
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- LOG.info("Starting source.");
- int cnt = 0;
- while (running) {
- ctx.collect("kafka-" + cnt++);
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignored) {
- }
- }
- }
-
- @Override
- public void cancel() {
- LOG.info("Source got cancel()");
- running = false;
- }
- });
- stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()));
-
- tryExecute(env, "simpletest");
- }
-
- private static boolean leaderHasShutDown = false;
- private static boolean shutdownKafkaBroker;
-
- @Test(timeout=60000)
- public void brokerFailureTest() throws Exception {
- String topic = "brokerFailureTestTopic";
-
- createTestTopic(topic, 2, 2);
-
- // --------------------------- write data to topic ---------------------
- LOG.info("Writing data to topic {}", topic);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- DataStream<String> stream = env.addSource(new SourceFunction<String>() {
- private static final long serialVersionUID = 1L;
-
- boolean running = true;
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- LOG.info("Starting source.");
- int cnt = 0;
- while (running) {
- String msg = "kafka-" + cnt++;
- ctx.collect(msg);
- LOG.info("sending message = "+msg);
-
- if ((cnt - 1) % 20 == 0) {
- LOG.debug("Sending message #{}", cnt - 1);
- }
- if(cnt == 200) {
- LOG.info("Stopping to produce after 200 msgs");
- break;
- }
-
- }
- }
-
- @Override
- public void cancel() {
- LOG.info("Source got chancel()");
- running = false;
- }
- });
- stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()))
- .setParallelism(1);
-
- tryExecute(env, "broker failure test - writer");
-
- // --------------------------- read and let broker fail ---------------------
-
- LOG.info("Reading data from topic {} and let a broker fail", topic);
- PartitionMetadata firstPart = null;
- do {
- if(firstPart != null) {
- LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
- // not the first try. Sleep a bit
- Thread.sleep(150);
- }
- Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
- firstPart = partitionMetadata.head();
- } while(firstPart.errorCode() != 0);
-
- final String leaderToShutDown = firstPart.leader().get().connectionString();
- LOG.info("Leader to shutdown {}", leaderToShutDown);
-
- final Thread brokerShutdown = new Thread(new Runnable() {
- @Override
- public void run() {
- shutdownKafkaBroker = false;
- while (!shutdownKafkaBroker) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOG.warn("Interruption", e);
- }
- }
-
- for (KafkaServer kafkaServer : brokers) {
- if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
- LOG.info("Killing Kafka Server {}", leaderToShutDown);
- kafkaServer.shutdown();
- leaderHasShutDown = true;
- break;
- }
- }
- }
- });
- brokerShutdown.start();
-
- // add consuming topology:
- DataStreamSource<String> consuming = env.addSource(new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
- consuming.setParallelism(1);
-
- consuming.addSink(new SinkFunction<String>() {
- private static final long serialVersionUID = 1L;
-
- int elCnt = 0;
- int start = 0;
- int numOfMessagesToBeCorrect = 100;
- int stopAfterMessages = 150;
-
- BitSet validator = new BitSet(numOfMessagesToBeCorrect + 1);
-
- @Override
- public void invoke(String value) throws Exception {
- LOG.info("Got message = " + value + " leader has shut down " + leaderHasShutDown + " el cnt = " + elCnt + " to rec" + numOfMessagesToBeCorrect);
- String[] sp = value.split("-");
- int v = Integer.parseInt(sp[1]);
-
- if (start == -1) {
- start = v;
- }
- int offset = v - start;
- Assert.assertFalse("Received tuple with value " + offset + " twice", validator.get(offset));
- if (v - start < 0 && LOG.isWarnEnabled()) {
- LOG.warn("Not in order: {}", value);
- }
-
- validator.set(offset);
- elCnt++;
- if (elCnt == 20) {
- LOG.info("Asking leading broker to shut down");
- // shut down a Kafka broker
- shutdownKafkaBroker = true;
- }
- if (shutdownKafkaBroker) {
- // we become a bit slower because the shutdown takes some time and we have
- // only a fixed nubmer of elements to read
- Thread.sleep(20);
- }
- if (leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
- if (elCnt >= stopAfterMessages) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) < numOfMessagesToBeCorrect) {
- throw new RuntimeException("The bitset was not set to 1 on all elements to be checked. Next clear:" + nc + " Set: " + validator);
- }
- throw new SuccessException();
- }
- }
- }
- });
- tryExecute(env, "broker failure test - reader");
-
- }
-
- public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
- try {
- see.execute(name);
- } catch (JobExecutionException good) {
- Throwable t = good.getCause();
- int limit = 0;
- while (!(t instanceof SuccessException)) {
- if(t == null) {
- LOG.warn("Test failed with exception", good);
- Assert.fail("Test failed with: " + good.getMessage());
- }
-
- t = t.getCause();
- if (limit++ == 20) {
- LOG.warn("Test failed with exception", good);
- Assert.fail("Test failed with: " + good.getMessage());
- }
- }
- }
- }
-
- private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
- // create topic
- Properties topicConfig = new Properties();
- LOG.info("Creating topic {}", topic);
- AdminUtils.createTopic(zkClient, topic, numberOfPartitions, replicationFactor, topicConfig);
- }
-
- private static TestingServer getZookeeper() throws Exception {
- return new TestingServer(zkPort, tmpZkDir);
- }
-
- /**
- * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
- */
- private static KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws UnknownHostException {
- Properties kafkaProperties = new Properties();
-
- int kafkaPort = NetUtils.getAvailablePort();
-
- // properties have to be Strings
- kafkaProperties.put("advertised.host.name", kafkaHost);
- kafkaProperties.put("port", Integer.toString(kafkaPort));
- kafkaProperties.put("broker.id", Integer.toString(brokerId));
- kafkaProperties.put("log.dir", tmpFolder.toString());
- kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
- kafkaProperties.put("message.max.bytes", "" + (35 * 1024 * 1024));
- kafkaProperties.put("replica.fetch.max.bytes", "" + (35 * 1024 * 1024));
- KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
- KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
- server.startup();
- return server;
- }
-
- public static class SuccessException extends Exception {
- private static final long serialVersionUID = 1L;
- }
-
-
- // ----------------------- Debugging utilities --------------------
-
- /**
- * Read topic to list, only using Kafka code.
- */
- private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
- ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
- // we request only one stream per consumer instance. Kafka will make sure that each consumer group
- // will see each message only once.
- Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
- Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
- if(streams.size() != 1) {
- throw new RuntimeException("Expected only one message stream but got "+streams.size());
- }
- List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
- if(kafkaStreams == null) {
- throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
- }
- if(kafkaStreams.size() != 1) {
- throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
- }
- LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
- ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
-
- List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
- int read = 0;
- while(iteratorToRead.hasNext()) {
- read++;
- result.add(iteratorToRead.next());
- if(read == stopAfter) {
- LOG.info("Read "+read+" elements");
- return result;
- }
- }
- return result;
- }
-
- private static void printTopic(String topicName, ConsumerConfig config, DeserializationSchema<?> deserializationSchema, int stopAfter){
- List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
- LOG.info("Printing contents of topic {} in consumer group {}", topicName, config.groupId());
- for(MessageAndMetadata<byte[], byte[]> message: contents) {
- Object out = deserializationSchema.deserialize(message.message());
- LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
- }
- }
-
- private static void printTopic(String topicName, int elements, ExecutionConfig ec) {
- // write the sequence to log for debugging purposes
- Properties stdProps = standardCC.props().props();
- Properties newProps = new Properties(stdProps);
- newProps.setProperty("group.id", "topic-printer"+UUID.randomUUID().toString());
- newProps.setProperty("auto.offset.reset", "smallest");
- newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
-
- ConsumerConfig printerConfig = new ConsumerConfig(newProps);
- TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
- DeserializationSchema<Tuple2<Integer, Integer>> deserializer =
- new TypeInformationSerializationSchema<>(typeInfo, ec);
- printTopic(topicName, printerConfig, deserializer, elements);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java
deleted file mode 100644
index 18fa46f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.utils.Time;
-
-public class KafkaLocalSystemTime implements Time {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
-
- @Override
- public long milliseconds() {
- return System.currentTimeMillis();
- }
-
- public long nanoseconds() {
- return System.nanoTime();
- }
-
- @Override
- public void sleep(long ms) {
- try {
- Thread.sleep(ms);
- } catch (InterruptedException e) {
- LOG.warn("Interruption", e);
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java
new file mode 100644
index 0000000..7befe14
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.testutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.KafkaSink;
+import org.apache.flink.streaming.connectors.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import java.util.Random;
+
+@SuppressWarnings("serial")
+public class DataGenerators {
+
+ public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
+ String brokerConnection, String topic,
+ int numPartitions,
+ final int from, final int to) throws Exception {
+
+ TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+
+ env.setParallelism(numPartitions);
+ env.getConfig().disableSysoutLogging();
+ env.setNumberOfExecutionRetries(0);
+
+ DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
+ new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+ int cnt = from;
+ int partition = getRuntimeContext().getIndexOfThisSubtask();
+
+ while (running && cnt <= to) {
+ ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
+ cnt++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnection, topic,
+ new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
+ new Tuple2Partitioner(numPartitions)
+ ));
+
+ env.execute("Data generator (Int, Int) stream to topic " + topic);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
+ String brokerConnection, String topic,
+ final int numPartitions,
+ final int numElements,
+ final boolean randomizeOrder) throws Exception {
+ env.setParallelism(numPartitions);
+ env.getConfig().disableSysoutLogging();
+ env.setNumberOfExecutionRetries(0);
+
+ DataStream<Integer> stream = env.addSource(
+ new RichParallelSourceFunction<Integer>() {
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Integer> ctx) {
+ // create a sequence
+ int[] elements = new int[numElements];
+ for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
+ i < numElements;
+ i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
+
+ elements[i] = val;
+ }
+
+ // scramble the sequence
+ if (randomizeOrder) {
+ Random rnd = new Random();
+ for (int i = 0; i < elements.length; i++) {
+ int otherPos = rnd.nextInt(elements.length);
+
+ int tmp = elements[i];
+ elements[i] = elements[otherPos];
+ elements[otherPos] = tmp;
+ }
+ }
+
+ // emit the sequence
+ int pos = 0;
+ while (running && pos < elements.length) {
+ ctx.collect(elements[pos++]);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ stream
+ .rebalance()
+ .addSink(new KafkaSink<>(brokerConnection, topic,
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
+ new SerializableKafkaPartitioner() {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ return ((Integer) key) % numPartitions;
+ }
+ }));
+
+ env.execute("Scrambles int sequence generator");
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static class InfiniteStringsGenerator extends Thread {
+
+ private final String kafkaConnectionString;
+
+ private final String topic;
+
+ private volatile Throwable error;
+
+ private volatile boolean running = true;
+
+
+ public InfiniteStringsGenerator(String kafkaConnectionString, String topic) {
+ this.kafkaConnectionString = kafkaConnectionString;
+ this.topic = topic;
+ }
+
+ @Override
+ public void run() {
+ // we manually feed data into the Kafka sink
+ KafkaSink<String> producer = null;
+ try {
+ producer = new KafkaSink<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
+ producer.open(new Configuration());
+
+ final StringBuilder bld = new StringBuilder();
+ final Random rnd = new Random();
+
+ while (running) {
+ bld.setLength(0);
+
+ int len = rnd.nextInt(100) + 1;
+ for (int i = 0; i < len; i++) {
+ bld.append((char) (rnd.nextInt(20) + 'a') );
+ }
+
+ String next = bld.toString();
+ producer.invoke(next);
+ }
+ }
+ catch (Throwable t) {
+ this.error = t;
+ }
+ finally {
+ if (producer != null) {
+ try {
+ producer.close();
+ }
+ catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ public void shutdown() {
+ this.running = false;
+ this.interrupt();
+ }
+
+ public Throwable getError() {
+ return this.error;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java
new file mode 100644
index 0000000..b89bd5c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.testutils;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+/**
+ * Sink function that discards data.
+ * @param <T> The type of the function.
+ */
+public class DiscardingSink<T> implements SinkFunction<T> {
+
+ private static final long serialVersionUID = 2777597566520109843L;
+
+ @Override
+ public void invoke(T value) {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java
new file mode 100644
index 0000000..7796af9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.testutils;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
+ Checkpointed<Integer>, CheckpointNotifier, Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
+
+ private static final long serialVersionUID = 6334389850158707313L;
+
+ public static volatile boolean failedBefore;
+ public static volatile boolean hasBeenCheckpointedBeforeFailure;
+
+ private final int failCount;
+ private int numElementsTotal;
+ private int numElementsThisTime;
+
+ private boolean failer;
+ private boolean hasBeenCheckpointed;
+
+ private Thread printer;
+ private volatile boolean printerRunning = true;
+
+ public FailingIdentityMapper(int failCount) {
+ this.failCount = failCount;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+ printer = new Thread(this, "FailingIdentityMapper Status Printer");
+ printer.start();
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ numElementsTotal++;
+ numElementsThisTime++;
+
+ if (!failedBefore) {
+ Thread.sleep(10);
+
+ if (failer && numElementsTotal >= failCount) {
+ hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+ failedBefore = true;
+ throw new Exception("Artificial Test Failure");
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public void close() throws Exception {
+ printerRunning = false;
+ if (printer != null) {
+ printer.interrupt();
+ printer = null;
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ this.hasBeenCheckpointed = true;
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return numElementsTotal;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ numElementsTotal = state;
+ }
+
+ @Override
+ public void run() {
+ while (printerRunning) {
+ try {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e) {
+ // ignore
+ }
+ LOG.info("============================> Failing mapper {}: count={}, totalCount={}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ numElementsThisTime, numElementsTotal);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java
new file mode 100644
index 0000000..a7fa2ff
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class JobManagerCommunicationUtils {
+
+ private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
+
+
+ public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
+
+ // find the jobID
+ Future<Object> listResponse = jobManager.ask(
+ JobManagerMessages.getRequestRunningJobsStatus(),
+ askTimeout);
+
+ List<JobStatusMessage> jobs;
+ try {
+ Object result = Await.result(listResponse, askTimeout);
+ jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+ }
+ catch (Exception e) {
+ throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
+ }
+
+ if (jobs.isEmpty()) {
+ throw new Exception("Could not cancel job - no running jobs");
+ }
+ if (jobs.size() != 1) {
+ throw new Exception("Could not cancel job - more than one running job.");
+ }
+
+ JobStatusMessage status = jobs.get(0);
+ if (status.getJobState().isTerminalState()) {
+ throw new Exception("Could not cancel job - job is not running any more");
+ }
+
+ JobID jobId = status.getJobId();
+
+ Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
+ try {
+ Await.result(response, askTimeout);
+ }
+ catch (Exception e) {
+ throw new Exception("Sending the 'cancel' message failed.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java
new file mode 100644
index 0000000..1f71271
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class MockRuntimeContext implements RuntimeContext {
+
+ private final int numberOfParallelSubtasks;
+ private final int indexOfThisSubtask;
+
+ public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
+ this.numberOfParallelSubtasks = numberOfParallelSubtasks;
+ this.indexOfThisSubtask = indexOfThisSubtask;
+ }
+
+
+ @Override
+ public String getTaskName() {
+ return null;
+ }
+
+ @Override
+ public int getNumberOfParallelSubtasks() {
+ return numberOfParallelSubtasks;
+ }
+
+ @Override
+ public int getIndexOfThisSubtask() {
+ return indexOfThisSubtask;
+ }
+
+ @Override
+ public ExecutionConfig getExecutionConfig() {
+ return null;
+ }
+
+ @Override
+ public ClassLoader getUserCodeClassLoader() {
+ return null;
+ }
+
+ @Override
+ public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {}
+
+ @Override
+ public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
+ return null;
+ }
+
+ @Override
+ public Map<String, Accumulator<?, ?>> getAllAccumulators() {
+ return null;
+ }
+
+ @Override
+ public IntCounter getIntCounter(String name) {
+ return null;
+ }
+
+ @Override
+ public LongCounter getLongCounter(String name) {
+ return null;
+ }
+
+ @Override
+ public DoubleCounter getDoubleCounter(String name) {
+ return null;
+ }
+
+ @Override
+ public Histogram getHistogram(String name) {
+ return null;
+ }
+
+ @Override
+ public <RT> List<RT> getBroadcastVariable(String name) {
+ return null;
+ }
+
+ @Override
+ public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
+ return null;
+ }
+
+ @Override
+ public DistributedCache getDistributedCache() {
+ return null;
+ }
+
+ @Override
+ public <S, C extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException {
+ return null;
+ }
+
+ @Override
+ public <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState, boolean partitioned) throws IOException {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/PartitionValidatingMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/PartitionValidatingMapper.java
new file mode 100644
index 0000000..c59e779
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/PartitionValidatingMapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.testutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+public class PartitionValidatingMapper implements MapFunction<Integer, Integer> {
+
+ private static final long serialVersionUID = 1088381231244959088L;
+
+ /* the partitions from which this function received data */
+ private final Set<Integer> myPartitions = new HashSet<>();
+
+ private final int numPartitions;
+ private final int maxPartitions;
+
+ public PartitionValidatingMapper(int numPartitions, int maxPartitions) {
+ this.numPartitions = numPartitions;
+ this.maxPartitions = maxPartitions;
+ }
+
+ @Override
+ public Integer map(Integer value) throws Exception {
+ // validate that the partitioning is identical
+ int partition = value % numPartitions;
+ myPartitions.add(partition);
+ if (myPartitions.size() > maxPartitions) {
+ throw new Exception("Error: Elements from too many different partitions: " + myPartitions
+ + ". Expect elements only from " + maxPartitions + " partitions");
+ }
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/SuccessException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/SuccessException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/SuccessException.java
new file mode 100644
index 0000000..60e2e51
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/SuccessException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.testutils;
+
+/**
+ * Exception that is thrown to terminate a program and indicate success.
+ */
+public class SuccessException extends Exception {
+ private static final long serialVersionUID = -7011865671593955887L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ThrottledMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ThrottledMapper.java
new file mode 100644
index 0000000..872d42f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ThrottledMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.testutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * An identity map function that sleeps between elements, throttling the
+ * processing speed.
+ *
+ * @param <T> The type mapped.
+ */
+public class ThrottledMapper<T> implements MapFunction<T,T> {
+
+ private static final long serialVersionUID = 467008933767159126L;
+
+ private final int sleep;
+
+ public ThrottledMapper(int sleep) {
+ this.sleep = sleep;
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ Thread.sleep(this.sleep);
+ return value;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/Tuple2Partitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/Tuple2Partitioner.java
new file mode 100644
index 0000000..1e5f027
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/Tuple2Partitioner.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.SerializableKafkaPartitioner;
+
+/**
+ * Special partitioner that uses the first field of a 2-tuple as the partition,
+ * and that expects a specific number of partitions.
+ */
+public class Tuple2Partitioner implements SerializableKafkaPartitioner {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int expectedPartitions;
+
+
+ public Tuple2Partitioner(int expectedPartitions) {
+ this.expectedPartitions = expectedPartitions;
+ }
+
+ @Override
+ public int partition(Object key, int numPartitions) {
+ if (numPartitions != expectedPartitions) {
+ throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
+ }
+ @SuppressWarnings("unchecked")
+ Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
+
+ return element.f0;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ValidatingExactlyOnceSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ValidatingExactlyOnceSink.java
new file mode 100644
index 0000000..ec8db73
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ValidatingExactlyOnceSink.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+
+public class ValidatingExactlyOnceSink implements SinkFunction<Integer>, Checkpointed<Tuple2<Integer, BitSet>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
+
+ private static final long serialVersionUID = 1748426382527469932L;
+
+ private final int numElementsTotal;
+
+ private BitSet duplicateChecker = new BitSet(); // this is checkpointed
+
+ private int numElements; // this is checkpointed
+
+
+ public ValidatingExactlyOnceSink(int numElementsTotal) {
+ this.numElementsTotal = numElementsTotal;
+ }
+
+
+ @Override
+ public void invoke(Integer value) throws Exception {
+ numElements++;
+
+ if (duplicateChecker.get(value)) {
+ throw new Exception("Received a duplicate");
+ }
+ duplicateChecker.set(value);
+ if (numElements == numElementsTotal) {
+ // validate
+ if (duplicateChecker.cardinality() != numElementsTotal) {
+ throw new Exception("Duplicate checker has wrong cardinality");
+ }
+ else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
+ throw new Exception("Received sparse sequence");
+ }
+ else {
+ throw new SuccessException();
+ }
+ }
+ }
+
+ @Override
+ public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) {
+ LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId);
+ return new Tuple2<>(numElements, duplicateChecker);
+ }
+
+ @Override
+ public void restoreState(Tuple2<Integer, BitSet> state) {
+ LOG.info("restoring num elements to {}", state.f0);
+ this.numElements = state.f0;
+ this.duplicateChecker = state.f1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
index dc20726..6bdfb48 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
@@ -24,4 +24,6 @@ log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 2bffefd..4c002d1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -53,7 +53,8 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*/
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- JobExecutionResult result = ClusterUtil.runOnMiniCluster(getStreamGraph().getJobGraph(), getParallelism(), -1, getConfig().isSysoutLoggingEnabled());
+ JobExecutionResult result = ClusterUtil.runOnMiniCluster(getStreamGraph().getJobGraph(),
+ getParallelism(), -1, getConfig().isSysoutLoggingEnabled(), false, this.conf);
transformations.clear();
return result;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 906d35d..2352623 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1196,7 +1196,7 @@ public abstract class StreamExecutionEnvironment {
* @return A local execution environment with the specified parallelism.
*/
public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
- currentEnvironment = new LocalStreamEnvironment(configuration);
+ LocalStreamEnvironment currentEnvironment = new LocalStreamEnvironment(configuration);
currentEnvironment.setParallelism(parallelism);
return (LocalStreamEnvironment) currentEnvironment;
}