You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:53 UTC
[36/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector]
Remove copied Kafka code again. Implemented our own topic metadata retrieval.
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java
deleted file mode 100644
index 7b4961d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java
+++ /dev/null
@@ -1,1137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.connectors.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.testutils.DataGenerators;
-import org.apache.flink.streaming.connectors.testutils.DiscardingSink;
-import org.apache.flink.streaming.connectors.testutils.FailingIdentityMapper;
-import org.apache.flink.streaming.connectors.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.connectors.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.connectors.testutils.PartitionValidatingMapper;
-import org.apache.flink.streaming.connectors.testutils.SuccessException;
-import org.apache.flink.streaming.connectors.testutils.ThrottledMapper;
-import org.apache.flink.streaming.connectors.testutils.Tuple2Partitioner;
-import org.apache.flink.streaming.connectors.testutils.ValidatingExactlyOnceSink;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import org.apache.flink.util.Collector;
-
-import org.junit.Assert;
-
-import scala.collection.Seq;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-
-@SuppressWarnings("serial")
-public abstract class KafkaConsumerTestBase extends KafkaTestBase {
-
-
- // ------------------------------------------------------------------------
- // Required methods by the abstract test base
- // ------------------------------------------------------------------------
-
- protected abstract <T> FlinkKafkaConsumer<T> getConsumer(
- String topic, DeserializationSchema<T> deserializationSchema, Properties props);
-
- // ------------------------------------------------------------------------
- // Suite of Tests
- //
- // The tests here are all not activated (by an @Test tag), but need
- // to be invoked from the extending classes. That way, the classes can
- // select which tests to run.
- // ------------------------------------------------------------------------
-
- /**
- * Test that validates that checkpointing and checkpoint notification works properly
- */
- public void runCheckpointingTest() {
- try {
- createTestTopic("testCheckpointing", 1, 1);
-
- FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new JavaDefaultStringSchema(), standardProps);
- Field pendingCheckpointsField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
- pendingCheckpointsField.setAccessible(true);
- LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
-
- Assert.assertEquals(0, pendingCheckpoints.size());
- source.setRuntimeContext(new MockRuntimeContext(1, 0));
-
- final long[] initialOffsets = new long[] { 1337 };
-
- // first restore
- source.restoreState(initialOffsets);
-
- // then open
- source.open(new Configuration());
- long[] state1 = source.snapshotState(1, 15);
-
- assertArrayEquals(initialOffsets, state1);
-
- long[] state2 = source.snapshotState(2, 30);
- Assert.assertArrayEquals(initialOffsets, state2);
- Assert.assertEquals(2, pendingCheckpoints.size());
-
- source.notifyCheckpointComplete(1);
- Assert.assertEquals(1, pendingCheckpoints.size());
-
- source.notifyCheckpointComplete(2);
- Assert.assertEquals(0, pendingCheckpoints.size());
-
- source.notifyCheckpointComplete(666); // invalid checkpoint
- Assert.assertEquals(0, pendingCheckpoints.size());
-
- // create 500 snapshots
- for (int i = 100; i < 600; i++) {
- source.snapshotState(i, 15 * i);
- }
- Assert.assertEquals(FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
-
- // commit only the second last
- source.notifyCheckpointComplete(598);
- Assert.assertEquals(1, pendingCheckpoints.size());
-
- // access invalid checkpoint
- source.notifyCheckpointComplete(590);
-
- // and the last
- source.notifyCheckpointComplete(599);
- Assert.assertEquals(0, pendingCheckpoints.size());
-
- source.close();
-
- deleteTestTopic("testCheckpointing");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper.
- *
- * This test is only applicable if Teh Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
- */
- public void runOffsetInZookeeperValidationTest() {
- try {
- LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
-
- final String topicName = "testOffsetHacking";
- final int parallelism = 3;
-
- createTestTopic(topicName, parallelism, 1);
-
- StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env1.getConfig().disableSysoutLogging();
- env1.enableCheckpointing(50);
- env1.setNumberOfExecutionRetries(0);
- env1.setParallelism(parallelism);
-
- StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env2.getConfig().disableSysoutLogging();
- env2.enableCheckpointing(50);
- env2.setNumberOfExecutionRetries(0);
- env2.setParallelism(parallelism);
-
- StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env3.getConfig().disableSysoutLogging();
- env3.enableCheckpointing(50);
- env3.setNumberOfExecutionRetries(0);
- env3.setParallelism(parallelism);
-
- // write a sequence from 0 to 99 to each of the 3 partitions.
- writeSequence(env1, topicName, 100, parallelism);
-
- readSequence(env2, standardProps, parallelism, topicName, 100, 0);
-
- ZkClient zkClient = createZookeeperClient();
-
- long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
- long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
- long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
-
- LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
-
- assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
- assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
- assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-
- LOG.info("Manipulating offsets");
-
- // set the offset to 50 for the three partitions
- ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 0, 49);
- ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 1, 49);
- ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 2, 49);
-
- zkClient.close();
-
- // create new env
- readSequence(env3, standardProps, parallelism, topicName, 50, 50);
-
- deleteTestTopic(topicName);
-
- LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Ensure Kafka is working on both producer and consumer side.
- * This executes a job that contains two Flink pipelines.
- *
- * <pre>
- * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
- * </pre>
- */
- public void runSimpleConcurrentProducerConsumerTopology() {
- try {
- LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
-
- final String topic = "concurrentProducerConsumerTopic";
- final int parallelism = 3;
- final int elementsPerPartition = 100;
- final int totalElements = parallelism * elementsPerPartition;
-
- createTestTopic(topic, parallelism, 2);
-
- final StreamExecutionEnvironment env =
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(parallelism);
- env.setNumberOfExecutionRetries(0);
- env.getConfig().disableSysoutLogging();
-
- TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
-
- TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
- new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
-
- TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
- new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
-
- // ----------- add producer dataflow ----------
-
- DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
-
- private boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Long, String>> ctx) {
- int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
- int limit = cnt + elementsPerPartition;
-
-
- while (running && cnt < limit) {
- ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, sinkSchema));
-
- // ----------- add consumer dataflow ----------
-
- FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topic, sourceSchema, standardProps);
-
- DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
-
- consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
-
- private int elCnt = 0;
- private BitSet validator = new BitSet(totalElements);
-
- @Override
- public void invoke(Tuple2<Long, String> value) throws Exception {
- String[] sp = value.f1.split("-");
- int v = Integer.parseInt(sp[1]);
-
- assertEquals(value.f0 - 1000, (long) v);
-
- assertFalse("Received tuple twice", validator.get(v));
- validator.set(v);
- elCnt++;
-
- if (elCnt == totalElements) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) != totalElements) {
- fail("The bitset was not set to 1 on all elements. Next clear:"
- + nc + " Set: " + validator);
- }
- throw new SuccessException();
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- }
- }).setParallelism(1);
-
- tryExecute(env, "runSimpleConcurrentProducerConsumerTopology");
-
- LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
-
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
- * Flink sources.
- */
- public void runOneToOneExactlyOnceTest() {
- try {
- LOG.info("Starting runOneToOneExactlyOnceTest()");
-
- final String topic = "oneToOneTopic";
- final int parallelism = 5;
- final int numElementsPerPartition = 1000;
- final int totalElements = parallelism * numElementsPerPartition;
- final int failAfterElements = numElementsPerPartition / 3;
-
- createTestTopic(topic, parallelism, 1);
-
- DataGenerators.generateRandomizedIntegerSequence(
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
- brokerConnectionStrings,
- topic, parallelism, numElementsPerPartition, true);
-
- // run the topology that fails and recovers
-
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.enableCheckpointing(500);
- env.setParallelism(parallelism);
- env.setNumberOfExecutionRetries(3);
- env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .map(new PartitionValidatingMapper(parallelism, 1))
- .map(new FailingIdentityMapper<Integer>(failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
- FailingIdentityMapper.failedBefore = false;
- tryExecute(env, "One-to-one exactly once test");
-
- // this cannot be reliably checked, as checkpoints come in time intervals, and
- // failures after a number of elements
-// assertTrue("Job did not do a checkpoint before the failure",
-// FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so
- * one Flink source will read multiple Kafka partitions.
- */
- public void runOneSourceMultiplePartitionsExactlyOnceTest() {
- try {
- LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
-
- final String topic = "oneToManyTopic";
- final int numPartitions = 5;
- final int numElementsPerPartition = 1000;
- final int totalElements = numPartitions * numElementsPerPartition;
- final int failAfterElements = numElementsPerPartition / 3;
-
- final int parallelism = 2;
-
- createTestTopic(topic, numPartitions, 1);
-
- DataGenerators.generateRandomizedIntegerSequence(
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
- brokerConnectionStrings,
- topic, numPartitions, numElementsPerPartition, true);
-
- // run the topology that fails and recovers
-
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.enableCheckpointing(500);
- env.setParallelism(parallelism);
- env.setNumberOfExecutionRetries(3);
- env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .map(new PartitionValidatingMapper(numPartitions, 3))
- .map(new FailingIdentityMapper<Integer>(failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
- FailingIdentityMapper.failedBefore = false;
- tryExecute(env, "One-source-multi-partitions exactly once test");
-
- // this cannot be reliably checked, as checkpoints come in time intervals, and
- // failures after a number of elements
-// assertTrue("Job did not do a checkpoint before the failure",
-// FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests the proper consumption when having more Flink sources than Kafka partitions, which means
- * that some Flink sources will read no partitions.
- */
- public void runMultipleSourcesOnePartitionExactlyOnceTest() {
- try {
- LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
-
- final String topic = "manyToOneTopic";
- final int numPartitions = 5;
- final int numElementsPerPartition = 1000;
- final int totalElements = numPartitions * numElementsPerPartition;
- final int failAfterElements = numElementsPerPartition / 3;
-
- final int parallelism = 8;
-
- createTestTopic(topic, numPartitions, 1);
-
- DataGenerators.generateRandomizedIntegerSequence(
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
- brokerConnectionStrings,
- topic, numPartitions, numElementsPerPartition, true);
-
- // run the topology that fails and recovers
-
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.enableCheckpointing(500);
- env.setParallelism(parallelism);
- env.setNumberOfExecutionRetries(3);
- env.getConfig().disableSysoutLogging();
- env.setBufferTimeout(0);
-
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .map(new PartitionValidatingMapper(numPartitions, 1))
- .map(new FailingIdentityMapper<Integer>(failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
- FailingIdentityMapper.failedBefore = false;
- tryExecute(env, "multi-source-one-partitions exactly once test");
-
- // this cannot be reliably checked, as checkpoints come in time intervals, and
- // failures after a number of elements
-// assertTrue("Job did not do a checkpoint before the failure",
-// FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-
- /**
- * Tests that the source can be properly canceled when reading full partitions.
- */
- public void runCancelingOnFullInputTest() {
- try {
- final String topic = "cancelingOnFullTopic";
-
- final int parallelism = 3;
- createTestTopic(topic, parallelism, 1);
-
- // launch a producer thread
- DataGenerators.InfiniteStringsGenerator generator =
- new DataGenerators.InfiniteStringsGenerator(brokerConnectionStrings, topic);
- generator.start();
-
- // launch a consumer asynchronously
-
- final AtomicReference<Throwable> jobError = new AtomicReference<>();
-
- final Runnable jobRunner = new Runnable() {
- @Override
- public void run() {
- try {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(parallelism);
- env.enableCheckpointing(100);
- env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
-
- env.addSource(source).addSink(new DiscardingSink<String>());
-
- env.execute();
- }
- catch (Throwable t) {
- jobError.set(t);
- }
- }
- };
-
- Thread runnerThread = new Thread(jobRunner, "program runner thread");
- runnerThread.start();
-
- // wait a bit before canceling
- Thread.sleep(2000);
-
- // cancel
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManagerGateway());
-
- // wait for the program to be done and validate that we failed with the right exception
- runnerThread.join();
-
- Throwable failueCause = jobError.get();
- assertNotNull("program did not fail properly due to canceling", failueCause);
- assertTrue(failueCause.getMessage().contains("Job was cancelled"));
-
- if (generator.isAlive()) {
- generator.shutdown();
- generator.join();
- }
- else {
- Throwable t = generator.getError();
- if (t != null) {
- t.printStackTrace();
- fail("Generator failed: " + t.getMessage());
- } else {
- fail("Generator failed with no exception");
- }
- }
-
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests that the source can be properly canceled when reading empty partitions.
- */
- public void runCancelingOnEmptyInputTest() {
- try {
- final String topic = "cancelingOnEmptyInputTopic";
-
- final int parallelism = 3;
- createTestTopic(topic, parallelism, 1);
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- final Runnable jobRunner = new Runnable() {
- @Override
- public void run() {
- try {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(parallelism);
- env.enableCheckpointing(100);
- env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
-
- env.addSource(source).addSink(new DiscardingSink<String>());
-
- env.execute();
- }
- catch (Throwable t) {
- error.set(t);
- }
- }
- };
-
- Thread runnerThread = new Thread(jobRunner, "program runner thread");
- runnerThread.start();
-
- // wait a bit before canceling
- Thread.sleep(2000);
-
- // cancel
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManagerGateway());
-
- // wait for the program to be done and validate that we failed with the right exception
- runnerThread.join();
-
- Throwable failueCause = error.get();
- assertNotNull("program did not fail properly due to canceling", failueCause);
- assertTrue(failueCause.getMessage().contains("Job was cancelled"));
-
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests that the source can be properly canceled when reading full partitions.
- */
- public void runFailOnDeployTest() {
- try {
- final String topic = "failOnDeployTopic";
-
- createTestTopic(topic, 2, 1);
-
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(12); // needs to be more that the mini cluster has slots
- env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .addSink(new DiscardingSink<Integer>());
-
- try {
- env.execute();
- fail("this test should fail with an exception");
- }
- catch (ProgramInvocationException e) {
-
- // validate that we failed due to a NoResourceAvailableException
- Throwable cause = e.getCause();
- int depth = 0;
- boolean foundResourceException = false;
-
- while (cause != null && depth++ < 20) {
- if (cause instanceof NoResourceAvailableException) {
- foundResourceException = true;
- break;
- }
- cause = cause.getCause();
- }
-
- assertTrue("Wrong exception", foundResourceException);
- }
-
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Test Flink's Kafka integration also with very big records (30MB)
- * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
- */
- public void runBigRecordTestTopology() {
- try {
- LOG.info("Starting runBigRecordTestTopology()");
-
- final String topic = "bigRecordTestTopic";
- final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
-
- createTestTopic(topic, parallelism, 1);
-
- final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
-
- final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
- new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
-
- final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
- new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setNumberOfExecutionRetries(0);
- env.getConfig().disableSysoutLogging();
- env.enableCheckpointing(100);
- env.setParallelism(parallelism);
-
- // add consuming topology:
- Properties consumerProps = new Properties();
- consumerProps.putAll(standardProps);
- consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
- consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
- consumerProps.setProperty("queued.max.message.chunks", "1");
-
- FlinkKafkaConsumer<Tuple2<Long, byte[]>> source = getConsumer(topic, serSchema, consumerProps);
- DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
-
- consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
-
- private int elCnt = 0;
-
- @Override
- public void invoke(Tuple2<Long, byte[]> value) throws Exception {
- elCnt++;
- if (value.f0 == -1) {
- // we should have seen 11 elements now.
- if(elCnt == 11) {
- throw new SuccessException();
- } else {
- throw new RuntimeException("There have been "+elCnt+" elements");
- }
- }
- if(elCnt > 10) {
- throw new RuntimeException("More than 10 elements seen: "+elCnt);
- }
- }
- });
-
- // add producing topology
- Properties producerProps = new Properties();
- producerProps.setProperty("max.message.size", Integer.toString(1024 * 1024 * 30));
-
- DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
-
- private boolean running;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- running = true;
- }
-
- @Override
- public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
- Random rnd = new Random();
- long cnt = 0;
- int fifteenMb = 1024 * 1024 * 15;
-
- while (running) {
- byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
- ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
-
- Thread.sleep(100);
-
- if (cnt == 10) {
- // signal end
- ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
- break;
- }
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
- producerProps, deserSchema));
-
- tryExecute(env, "big topology test");
-
- deleteTestTopic(topic);
-
- LOG.info("Finished runBigRecordTestTopology()");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-
- public void runBrokerFailureTest() {
- try {
- LOG.info("starting runBrokerFailureTest()");
-
- final String topic = "brokerFailureTestTopic";
-
- final int parallelism = 2;
- final int numElementsPerPartition = 1000;
- final int totalElements = parallelism * numElementsPerPartition;
- final int failAfterElements = numElementsPerPartition / 3;
-
-
- createTestTopic(topic, parallelism, 2);
-
- DataGenerators.generateRandomizedIntegerSequence(
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
- brokerConnectionStrings,
- topic, parallelism, numElementsPerPartition, true);
-
- // find leader to shut down
- ZkClient zkClient = createZookeeperClient();
- PartitionMetadata firstPart = null;
- do {
- if (firstPart != null) {
- LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
- // not the first try. Sleep a bit
- Thread.sleep(150);
- }
-
- Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
- firstPart = partitionMetadata.head();
- }
- while (firstPart.errorCode() != 0);
- zkClient.close();
-
- final String leaderToShutDown = firstPart.leader().get().connectionString();
- LOG.info("Leader to shutdown {}", leaderToShutDown);
-
-
- // run the topology that fails and recovers
-
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(parallelism);
- env.enableCheckpointing(500);
- env.setNumberOfExecutionRetries(3);
- env.getConfig().disableSysoutLogging();
-
-
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .map(new PartitionValidatingMapper(parallelism, 1))
- .map(new BrokerKillingMapper<Integer>(leaderToShutDown, failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
- BrokerKillingMapper.killedLeaderBefore = false;
- tryExecute(env, "One-to-one exactly once test");
-
- // this cannot be reliably checked, as checkpoints come in time intervals, and
- // failures after a number of elements
-// assertTrue("Job did not do a checkpoint before the failure",
-// BrokerKillingMapper.hasBeenCheckpointedBeforeFailure);
-
- LOG.info("finished runBrokerFailureTest()");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
- // Reading writing test data sets
- // ------------------------------------------------------------------------
-
- private void readSequence(StreamExecutionEnvironment env, Properties cc,
- final int sourceParallelism,
- final String topicName,
- final int valuesCount, final int startFrom) throws Exception {
-
- final int finalCount = valuesCount * sourceParallelism;
-
- final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
- final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
- new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
-
- // create the consumer
- FlinkKafkaConsumer<Tuple2<Integer, Integer>> consumer = getConsumer(topicName, deser, cc);
-
- DataStream<Tuple2<Integer, Integer>> source = env
- .addSource(consumer).setParallelism(sourceParallelism)
- .map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism);
-
- // verify data
- source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
-
- private int[] values = new int[valuesCount];
- private int count = 0;
-
- @Override
- public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
- values[value.f1 - startFrom]++;
- count++;
-
- // verify if we've seen everything
- if (count == finalCount) {
- for (int i = 0; i < values.length; i++) {
- int v = values[i];
- if (v != sourceParallelism) {
- printTopic(topicName, valuesCount, deser);
- throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
- }
- }
- // test has passed
- throw new SuccessException();
- }
- }
-
- }).setParallelism(1);
-
- tryExecute(env, "Read data from Kafka");
-
- LOG.info("Successfully read sequence for verification");
- }
-
- private static void writeSequence(StreamExecutionEnvironment env, String topicName,
- final int numElements, int parallelism) throws Exception {
-
- TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
- DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
- private boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
- int cnt = 0;
- int partition = getRuntimeContext().getIndexOfThisSubtask();
-
- while (running && cnt < numElements) {
- ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }).setParallelism(parallelism);
-
- stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
- topicName,
- new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
- new Tuple2Partitioner(parallelism)
- )).setParallelism(parallelism);
-
- env.execute("Write sequence");
-
- LOG.info("Finished writing sequence");
- }
-
- // ------------------------------------------------------------------------
- // Debugging utilities
- // ------------------------------------------------------------------------
-
- /**
- * Read topic to list, only using Kafka code.
- */
- private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
- ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
- // we request only one stream per consumer instance. Kafka will make sure that each consumer group
- // will see each message only once.
- Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
- Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
- if(streams.size() != 1) {
- throw new RuntimeException("Expected only one message stream but got "+streams.size());
- }
- List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
- if(kafkaStreams == null) {
- throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
- }
- if(kafkaStreams.size() != 1) {
- throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
- }
- LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
- ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
-
- List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
- int read = 0;
- while(iteratorToRead.hasNext()) {
- read++;
- result.add(iteratorToRead.next());
- if(read == stopAfter) {
- LOG.info("Read "+read+" elements");
- return result;
- }
- }
- return result;
- }
-
- private static void printTopic(String topicName, ConsumerConfig config,
- DeserializationSchema<?> deserializationSchema,
- int stopAfter) {
-
- List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
- LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
-
- for (MessageAndMetadata<byte[], byte[]> message: contents) {
- Object out = deserializationSchema.deserialize(message.message());
- LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
- }
- }
-
- private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) {
- // write the sequence to log for debugging purposes
- Properties stdProps = standardCC.props().props();
- Properties newProps = new Properties(stdProps);
- newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
- newProps.setProperty("auto.offset.reset", "smallest");
- newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
-
- ConsumerConfig printerConfig = new ConsumerConfig(newProps);
- printTopic(topicName, printerConfig, deserializer, elements);
- }
-
-
- public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
- implements Checkpointed<Integer>, CheckpointNotifier {
-
- private static final long serialVersionUID = 6334389850158707313L;
-
- public static volatile boolean killedLeaderBefore;
- public static volatile boolean hasBeenCheckpointedBeforeFailure;
-
- private final String leaderToShutDown;
- private final int failCount;
- private int numElementsTotal;
-
- private boolean failer;
- private boolean hasBeenCheckpointed;
-
-
- public BrokerKillingMapper(String leaderToShutDown, int failCount) {
- this.leaderToShutDown = leaderToShutDown;
- this.failCount = failCount;
- }
-
- @Override
- public void open(Configuration parameters) {
- failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
- }
-
- @Override
- public T map(T value) throws Exception {
- numElementsTotal++;
-
- if (!killedLeaderBefore) {
- Thread.sleep(10);
-
- if (failer && numElementsTotal >= failCount) {
- // shut down a Kafka broker
- KafkaServer toShutDown = null;
- for (KafkaServer kafkaServer : brokers) {
- if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
- toShutDown = kafkaServer;
- break;
- }
- }
-
- if (toShutDown == null) {
- throw new Exception("Cannot find broker to shut down");
- }
- else {
- hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
- killedLeaderBefore = true;
- toShutDown.shutdown();
- }
- }
- }
- return value;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- hasBeenCheckpointed = true;
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return numElementsTotal;
- }
-
- @Override
- public void restoreState(Integer state) {
- this.numElementsTotal = state;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java
deleted file mode 100644
index b910b54..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import kafka.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaLocalSystemTime implements Time {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
-
- @Override
- public long milliseconds() {
- return System.currentTimeMillis();
- }
-
- @Override
- public long nanoseconds() {
- return System.nanoTime();
- }
-
- @Override
- public void sleep(long ms) {
- try {
- Thread.sleep(ms);
- } catch (InterruptedException e) {
- LOG.warn("Interruption", e);
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java
deleted file mode 100644
index fd980d9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.testutils.SuccessException;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings("serial")
-public class KafkaProducerITCase extends KafkaTestBase {
-
-
- /**
- *
- * <pre>
- * +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
- * / | \
- * / | \
- * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
- * \ | /
- * \ | /
- * +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
- * </pre>
- *
- * The mapper validates that the values come consistently from the correct Kafka partition.
- *
- * The final sink validates that there are no duplicates and that all partitions are present.
- */
- @Test
- public void testCustomPartitioning() {
- try {
- LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
-
- final String topic = "customPartitioningTestTopic";
- final int parallelism = 3;
-
- createTestTopic(topic, parallelism, 1);
-
- TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setNumberOfExecutionRetries(0);
- env.getConfig().disableSysoutLogging();
-
- TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
- new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
-
- TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
- new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
-
- // ------ producing topology ---------
-
- // source has DOP 1 to make sure it generates no duplicates
- DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
-
- private boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
- long cnt = 0;
- while (running) {
- ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt));
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- })
- .setParallelism(1);
-
- // sink partitions into
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(
- brokerConnectionStrings, topic,serSchema, new CustomPartitioner(parallelism)))
- .setParallelism(parallelism);
-
- // ------ consuming topology ---------
-
- FlinkKafkaConsumer<Tuple2<Long, String>> source =
- new FlinkKafkaConsumer<>(topic, deserSchema, standardProps,
- FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
- FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
-
- env.addSource(source).setParallelism(parallelism)
-
- // mapper that validates partitioning and maps to partition
- .map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
-
- private int ourPartition = -1;
- @Override
- public Integer map(Tuple2<Long, String> value) {
- int partition = value.f0.intValue() % parallelism;
- if (ourPartition != -1) {
- assertEquals("inconsistent partitioning", ourPartition, partition);
- } else {
- ourPartition = partition;
- }
- return partition;
- }
- }).setParallelism(parallelism)
-
- .addSink(new SinkFunction<Integer>() {
-
- private int[] valuesPerPartition = new int[parallelism];
-
- @Override
- public void invoke(Integer value) throws Exception {
- valuesPerPartition[value]++;
-
- boolean missing = false;
- for (int i : valuesPerPartition) {
- if (i < 100) {
- missing = true;
- break;
- }
- }
- if (!missing) {
- throw new SuccessException();
- }
- }
- }).setParallelism(1);
-
- tryExecute(env, "custom partitioning test");
-
- deleteTestTopic(topic);
-
- LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-
- // ------------------------------------------------------------------------
-
- public static class CustomPartitioner implements SerializableKafkaPartitioner {
-
- private final int expectedPartitions;
-
- public CustomPartitioner(int expectedPartitions) {
- this.expectedPartitions = expectedPartitions;
- }
-
- @Override
- public int partition(Object key, int numPartitions) {
- @SuppressWarnings("unchecked")
- Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
-
- assertEquals(expectedPartitions, numPartitions);
-
- return (int) (tuple.f0 % numPartitions);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java
deleted file mode 100644
index 35f050c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import kafka.admin.AdminUtils;
-import kafka.consumer.ConsumerConfig;
-import kafka.network.SocketServer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kafka_backport.clients.consumer.KafkaConsumer;
-import org.apache.flink.kafka_backport.common.PartitionInfo;
-import org.apache.flink.kafka_backport.common.serialization.ByteArrayDeserializer;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.net.NetUtils;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.internals.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.testutils.SuccessException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * The base for the Kafka tests. It brings up:
- * <ul>
- * <li>A ZooKeeper mini cluster</li>
- * <li>Three Kafka Brokers (mini clusters)</li>
- * <li>A Flink mini cluster</li>
- * </ul>
- *
- * <p>Code in this test is based on the following GitHub repository:
- * <a href="https://github.com/sakserv/hadoop-mini-clusters">
- * https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
- * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
- */
-@SuppressWarnings("serial")
-public abstract class KafkaTestBase {
-
- protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
-
- protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
-
- protected static String zookeeperConnectionString;
-
- protected static File tmpZkDir;
-
- protected static File tmpKafkaParent;
-
- protected static TestingServer zookeeper;
- protected static List<KafkaServer> brokers;
- protected static String brokerConnectionStrings = "";
-
- protected static ConsumerConfig standardCC;
- protected static Properties standardProps;
-
- protected static ForkableFlinkMiniCluster flink;
-
- protected static int flinkPort;
-
-
-
- // ------------------------------------------------------------------------
- // Setup and teardown of the mini clusters
- // ------------------------------------------------------------------------
-
- @BeforeClass
- public static void prepare() throws IOException {
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" Starting KafkaITCase ");
- LOG.info("-------------------------------------------------------------------------");
-
- LOG.info("Starting KafkaITCase.prepare()");
-
- File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
- tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
- assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
-
- tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
- assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
-
- List<File> tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_KAFKA_SERVERS);
- for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
- File tmpDir = new File(tmpKafkaParent, "server-" + i);
- assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
- tmpKafkaDirs.add(tmpDir);
- }
-
- String kafkaHost = "localhost";
- int zkPort = NetUtils.getAvailablePort();
- zookeeperConnectionString = "localhost:" + zkPort;
-
- zookeeper = null;
- brokers = null;
-
- try {
- LOG.info("Starting Zookeeper");
- zookeeper = new TestingServer(zkPort, tmpZkDir);
-
- LOG.info("Starting KafkaServer");
- brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
-
- for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
- brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i), kafkaHost, zookeeperConnectionString));
- SocketServer socketServer = brokers.get(i).socketServer();
-
- String host = socketServer.host() == null ? "localhost" : socketServer.host();
- brokerConnectionStrings += host+":"+socketServer.port()+",";
- }
-
- LOG.info("ZK and KafkaServer started.");
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Test setup failed: " + t.getMessage());
- }
-
- standardProps = new Properties();
-
- standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
- standardProps.setProperty("bootstrap.servers", brokerConnectionStrings);
- standardProps.setProperty("group.id", "flink-tests");
- standardProps.setProperty("auto.commit.enable", "false");
- standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
- standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
-
- Properties consumerConfigProps = new Properties();
- consumerConfigProps.putAll(standardProps);
- consumerConfigProps.setProperty("auto.offset.reset", "smallest");
- standardCC = new ConsumerConfig(consumerConfigProps);
-
- // start also a re-usable Flink mini cluster
-
- Configuration flinkConfig = new Configuration();
- flinkConfig.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
- flinkConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s");
-
-// flinkConfig.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true);
-// flinkConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8080);
-
- flink = new ForkableFlinkMiniCluster(flinkConfig, false, StreamingMode.STREAMING);
- flinkPort = flink.getJobManagerRPCPort();
- }
-
- @AfterClass
- public static void shutDownServices() {
-
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" Shut down KafkaITCase ");
- LOG.info("-------------------------------------------------------------------------");
-
- flinkPort = -1;
- flink.shutdown();
-
- for (KafkaServer broker : brokers) {
- if (broker != null) {
- broker.shutdown();
- }
- }
- brokers.clear();
-
- if (zookeeper != null) {
- try {
- zookeeper.stop();
- }
- catch (Exception e) {
- LOG.warn("ZK.stop() failed", e);
- }
- zookeeper = null;
- }
-
- // clean up the temp spaces
-
- if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
- try {
- FileUtils.deleteDirectory(tmpKafkaParent);
- }
- catch (Exception e) {
- // ignore
- }
- }
- if (tmpZkDir != null && tmpZkDir.exists()) {
- try {
- FileUtils.deleteDirectory(tmpZkDir);
- }
- catch (Exception e) {
- // ignore
- }
- }
-
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" KafkaITCase finished");
- LOG.info("-------------------------------------------------------------------------");
- }
-
- /**
- * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
- */
- private static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
- String kafkaHost,
- String zookeeperConnectionString) throws Exception {
- Properties kafkaProperties = new Properties();
-
- int kafkaPort = NetUtils.getAvailablePort();
-
- // properties have to be Strings
- kafkaProperties.put("advertised.host.name", kafkaHost);
- kafkaProperties.put("port", Integer.toString(kafkaPort));
- kafkaProperties.put("broker.id", Integer.toString(brokerId));
- kafkaProperties.put("log.dir", tmpFolder.toString());
- kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
- kafkaProperties.put("message.max.bytes", "" + (50 * 1024 * 1024));
- kafkaProperties.put("replica.fetch.max.bytes", "" + (50 * 1024 * 1024));
- KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
- KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
- server.startup();
- return server;
- }
-
- // ------------------------------------------------------------------------
- // Execution utilities
- // ------------------------------------------------------------------------
-
- protected ZkClient createZookeeperClient() {
- return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
- standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
- }
-
- protected static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
- try {
- see.execute(name);
- }
- catch (ProgramInvocationException | JobExecutionException root) {
- Throwable cause = root.getCause();
-
- // search for nested SuccessExceptions
- int depth = 0;
- while (!(cause instanceof SuccessException)) {
- if (cause == null || depth++ == 20) {
- root.printStackTrace();
- fail("Test failed: " + root.getMessage());
- }
- else {
- cause = cause.getCause();
- }
- }
- }
- }
-
- protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
-
- // create topic with one client
- Properties topicConfig = new Properties();
- LOG.info("Creating topic {}", topic);
-
- ZkClient creator = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
- standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-
- AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig);
- creator.close();
-
- // validate that the topic has been created
-
- try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(
- standardProps, null, new ByteArrayDeserializer(), new ByteArrayDeserializer()))
- {
- final long deadline = System.currentTimeMillis() + 30000;
- do {
- List<PartitionInfo> partitions = consumer.partitionsFor(topic);
- if (partitions != null && partitions.size() > 0) {
- return;
- }
- }
- while (System.currentTimeMillis() < deadline);
- fail("Test topic could not be created");
- }
- }
-
- protected static void deleteTestTopic(String topic) {
- LOG.info("Deleting topic {}", topic);
-
- ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
- standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-
- AdminUtils.deleteTopic(zk, topic);
-
- zk.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java
deleted file mode 100644
index c412136..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.internals;
-
-import kafka.admin.AdminUtils;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.flink.streaming.connectors.KafkaTestBase;
-
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
-
- @Test
- public void runOffsetManipulationinZooKeeperTest() {
- try {
- final String topicName = "ZookeeperOffsetHandlerTest-Topic";
- final String groupId = "ZookeeperOffsetHandlerTest-Group";
-
- final long offset = (long) (Math.random() * Long.MAX_VALUE);
-
- ZkClient zkClient = createZookeeperClient();
- AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties());
-
- ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, groupId, topicName, 0, offset);
-
- long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, groupId, topicName, 0);
-
- zkClient.close();
-
- assertEquals(offset, fetchedOffset);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java
deleted file mode 100644
index 7befe14..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.KafkaSink;
-import org.apache.flink.streaming.connectors.SerializableKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import java.util.Random;
-
-@SuppressWarnings("serial")
-public class DataGenerators {
-
- public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
- String brokerConnection, String topic,
- int numPartitions,
- final int from, final int to) throws Exception {
-
- TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
- env.setParallelism(numPartitions);
- env.getConfig().disableSysoutLogging();
- env.setNumberOfExecutionRetries(0);
-
- DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
- new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
- int cnt = from;
- int partition = getRuntimeContext().getIndexOfThisSubtask();
-
- while (running && cnt <= to) {
- ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnection, topic,
- new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
- new Tuple2Partitioner(numPartitions)
- ));
-
- env.execute("Data generator (Int, Int) stream to topic " + topic);
- }
-
- // ------------------------------------------------------------------------
-
- public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
- String brokerConnection, String topic,
- final int numPartitions,
- final int numElements,
- final boolean randomizeOrder) throws Exception {
- env.setParallelism(numPartitions);
- env.getConfig().disableSysoutLogging();
- env.setNumberOfExecutionRetries(0);
-
- DataStream<Integer> stream = env.addSource(
- new RichParallelSourceFunction<Integer>() {
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Integer> ctx) {
- // create a sequence
- int[] elements = new int[numElements];
- for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
- i < numElements;
- i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
-
- elements[i] = val;
- }
-
- // scramble the sequence
- if (randomizeOrder) {
- Random rnd = new Random();
- for (int i = 0; i < elements.length; i++) {
- int otherPos = rnd.nextInt(elements.length);
-
- int tmp = elements[i];
- elements[i] = elements[otherPos];
- elements[otherPos] = tmp;
- }
- }
-
- // emit the sequence
- int pos = 0;
- while (running && pos < elements.length) {
- ctx.collect(elements[pos++]);
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- stream
- .rebalance()
- .addSink(new KafkaSink<>(brokerConnection, topic,
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
- new SerializableKafkaPartitioner() {
- @Override
- public int partition(Object key, int numPartitions) {
- return ((Integer) key) % numPartitions;
- }
- }));
-
- env.execute("Scrambles int sequence generator");
- }
-
- // ------------------------------------------------------------------------
-
- public static class InfiniteStringsGenerator extends Thread {
-
- private final String kafkaConnectionString;
-
- private final String topic;
-
- private volatile Throwable error;
-
- private volatile boolean running = true;
-
-
- public InfiniteStringsGenerator(String kafkaConnectionString, String topic) {
- this.kafkaConnectionString = kafkaConnectionString;
- this.topic = topic;
- }
-
- @Override
- public void run() {
- // we manually feed data into the Kafka sink
- KafkaSink<String> producer = null;
- try {
- producer = new KafkaSink<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
- producer.open(new Configuration());
-
- final StringBuilder bld = new StringBuilder();
- final Random rnd = new Random();
-
- while (running) {
- bld.setLength(0);
-
- int len = rnd.nextInt(100) + 1;
- for (int i = 0; i < len; i++) {
- bld.append((char) (rnd.nextInt(20) + 'a') );
- }
-
- String next = bld.toString();
- producer.invoke(next);
- }
- }
- catch (Throwable t) {
- this.error = t;
- }
- finally {
- if (producer != null) {
- try {
- producer.close();
- }
- catch (Throwable t) {
- // ignore
- }
- }
- }
- }
-
- public void shutdown() {
- this.running = false;
- this.interrupt();
- }
-
- public Throwable getError() {
- return this.error;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java
deleted file mode 100644
index b89bd5c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-
-/**
- * Sink function that discards data.
- * @param <T> The type of the function.
- */
-public class DiscardingSink<T> implements SinkFunction<T> {
-
- private static final long serialVersionUID = 2777597566520109843L;
-
- @Override
- public void invoke(T value) {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java
deleted file mode 100644
index 7796af9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
- Checkpointed<Integer>, CheckpointNotifier, Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
-
- private static final long serialVersionUID = 6334389850158707313L;
-
- public static volatile boolean failedBefore;
- public static volatile boolean hasBeenCheckpointedBeforeFailure;
-
- private final int failCount;
- private int numElementsTotal;
- private int numElementsThisTime;
-
- private boolean failer;
- private boolean hasBeenCheckpointed;
-
- private Thread printer;
- private volatile boolean printerRunning = true;
-
- public FailingIdentityMapper(int failCount) {
- this.failCount = failCount;
- }
-
- @Override
- public void open(Configuration parameters) {
- failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
- printer = new Thread(this, "FailingIdentityMapper Status Printer");
- printer.start();
- }
-
- @Override
- public T map(T value) throws Exception {
- numElementsTotal++;
- numElementsThisTime++;
-
- if (!failedBefore) {
- Thread.sleep(10);
-
- if (failer && numElementsTotal >= failCount) {
- hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
- failedBefore = true;
- throw new Exception("Artificial Test Failure");
- }
- }
- return value;
- }
-
- @Override
- public void close() throws Exception {
- printerRunning = false;
- if (printer != null) {
- printer.interrupt();
- printer = null;
- }
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- this.hasBeenCheckpointed = true;
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return numElementsTotal;
- }
-
- @Override
- public void restoreState(Integer state) {
- numElementsTotal = state;
- }
-
- @Override
- public void run() {
- while (printerRunning) {
- try {
- Thread.sleep(5000);
- }
- catch (InterruptedException e) {
- // ignore
- }
- LOG.info("============================> Failing mapper {}: count={}, totalCount={}",
- getRuntimeContext().getIndexOfThisSubtask(),
- numElementsThisTime, numElementsTotal);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java
deleted file mode 100644
index a7fa2ff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class JobManagerCommunicationUtils {
-
- private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
-
-
- public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
-
- // find the jobID
- Future<Object> listResponse = jobManager.ask(
- JobManagerMessages.getRequestRunningJobsStatus(),
- askTimeout);
-
- List<JobStatusMessage> jobs;
- try {
- Object result = Await.result(listResponse, askTimeout);
- jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
- }
- catch (Exception e) {
- throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
- }
-
- if (jobs.isEmpty()) {
- throw new Exception("Could not cancel job - no running jobs");
- }
- if (jobs.size() != 1) {
- throw new Exception("Could not cancel job - more than one running job.");
- }
-
- JobStatusMessage status = jobs.get(0);
- if (status.getJobState().isTerminalState()) {
- throw new Exception("Could not cancel job - job is not running any more");
- }
-
- JobID jobId = status.getJobId();
-
- Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
- try {
- Await.result(response, askTimeout);
- }
- catch (Exception e) {
- throw new Exception("Sending the 'cancel' message failed.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java
deleted file mode 100644
index 1f71271..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.DoubleCounter;
-import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-public class MockRuntimeContext implements RuntimeContext {
-
- private final int numberOfParallelSubtasks;
- private final int indexOfThisSubtask;
-
- public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
- this.numberOfParallelSubtasks = numberOfParallelSubtasks;
- this.indexOfThisSubtask = indexOfThisSubtask;
- }
-
-
- @Override
- public String getTaskName() {
- return null;
- }
-
- @Override
- public int getNumberOfParallelSubtasks() {
- return numberOfParallelSubtasks;
- }
-
- @Override
- public int getIndexOfThisSubtask() {
- return indexOfThisSubtask;
- }
-
- @Override
- public ExecutionConfig getExecutionConfig() {
- return null;
- }
-
- @Override
- public ClassLoader getUserCodeClassLoader() {
- return null;
- }
-
- @Override
- public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {}
-
- @Override
- public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
- return null;
- }
-
- @Override
- public Map<String, Accumulator<?, ?>> getAllAccumulators() {
- return null;
- }
-
- @Override
- public IntCounter getIntCounter(String name) {
- return null;
- }
-
- @Override
- public LongCounter getLongCounter(String name) {
- return null;
- }
-
- @Override
- public DoubleCounter getDoubleCounter(String name) {
- return null;
- }
-
- @Override
- public Histogram getHistogram(String name) {
- return null;
- }
-
- @Override
- public <RT> List<RT> getBroadcastVariable(String name) {
- return null;
- }
-
- @Override
- public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
- return null;
- }
-
- @Override
- public DistributedCache getDistributedCache() {
- return null;
- }
-
- @Override
- public <S, C extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException {
- return null;
- }
-
- @Override
- public <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState, boolean partitioned) throws IOException {
- return null;
- }
-}