You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:54 UTC
[04/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
deleted file mode 100644
index dccf698..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.InstantiationUtil;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Properties;
-
-import static org.apache.flink.test.util.TestUtils.tryExecute;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-/**
- * A class containing a special Kafka broker which has a log retention of only 250 ms.
- * This way, we can make sure our consumer is properly handling cases where we run into out of offset
- * errors
- */
-@SuppressWarnings("serial")
-public class KafkaShortRetentionTestBase implements Serializable {
-
- protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
-
- private static KafkaTestEnvironment kafkaServer;
- private static Properties standardProps;
- private static LocalFlinkMiniCluster flink;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
-
- protected static Properties secureProps = new Properties();
-
- @BeforeClass
- public static void prepare() throws IOException, ClassNotFoundException {
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" Starting KafkaShortRetentionTestBase ");
- LOG.info("-------------------------------------------------------------------------");
-
- Configuration flinkConfig = new Configuration();
-
- // dynamically load the implementation for the test
- Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
- kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
-
- LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
-
- if(kafkaServer.isSecureRunSupported()) {
- secureProps = kafkaServer.getSecureProperties();
- }
-
- Properties specificProperties = new Properties();
- specificProperties.setProperty("log.retention.hours", "0");
- specificProperties.setProperty("log.retention.minutes", "0");
- specificProperties.setProperty("log.retention.ms", "250");
- specificProperties.setProperty("log.retention.check.interval.ms", "100");
- kafkaServer.prepare(1, specificProperties, false);
-
- standardProps = kafkaServer.getStandardProperties();
-
- // start also a re-usable Flink mini cluster
- flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
- flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
-
- flink = new LocalFlinkMiniCluster(flinkConfig, false);
- flink.start();
- }
-
- @AfterClass
- public static void shutDownServices() {
- if (flink != null) {
- flink.shutdown();
- }
- kafkaServer.shutdown();
-
- secureProps.clear();
- }
-
- /**
- * This test is concurrently reading and writing from a kafka topic.
- * The job will run for a while
- * In a special deserializationSchema, we make sure that the offsets from the topic
- * are non-continuous (because the data is expiring faster than its consumed --> with auto.offset.reset = 'earliest', some offsets will not show up)
- *
- */
- private static boolean stopProducer = false;
-
- public void runAutoOffsetResetTest() throws Exception {
- final String topic = "auto-offset-reset-test";
-
- final int parallelism = 1;
- final int elementsPerPartition = 50000;
-
- Properties tprops = new Properties();
- tprops.setProperty("retention.ms", "250");
- kafkaServer.createTestTopic(topic, parallelism, 1, tprops);
-
- final StreamExecutionEnvironment env =
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort());
- env.setParallelism(parallelism);
- env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
- env.getConfig().disableSysoutLogging();
-
-
- // ----------- add producer dataflow ----------
-
-
- DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() {
-
- private boolean running = true;
-
- @Override
- public void run(SourceContext<String> ctx) throws InterruptedException {
- int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
- int limit = cnt + elementsPerPartition;
-
-
- while (running && !stopProducer && cnt < limit) {
- ctx.collect("element-" + cnt);
- cnt++;
- Thread.sleep(10);
- }
- LOG.info("Stopping producer");
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
- Properties props = new Properties();
- props.putAll(standardProps);
- props.putAll(secureProps);
- kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null);
-
- // ----------- add consumer dataflow ----------
-
- NonContinousOffsetsDeserializationSchema deserSchema = new NonContinousOffsetsDeserializationSchema();
- FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, props);
-
- DataStreamSource<String> consuming = env.addSource(source);
- consuming.addSink(new DiscardingSink<String>());
-
- tryExecute(env, "run auto offset reset test");
-
- kafkaServer.deleteTestTopic(topic);
- }
-
-
- private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema<String> {
- private int numJumps;
- long nextExpected = 0;
-
- @Override
- public String deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
- if(offset != nextExpected) {
- numJumps++;
- nextExpected = offset;
- LOG.info("Registered now jump at offset {}", offset);
- }
- nextExpected++;
- try {
- Thread.sleep(10); // slow down data consumption to trigger log eviction
- } catch (InterruptedException e) {
- throw new RuntimeException("Stopping it");
- }
- return "";
- }
-
- @Override
- public boolean isEndOfStream(String nextElement) {
- if( numJumps >= 5) {
- // we saw 5 jumps and no failures --> consumer can handle auto.offset.reset
- stopProducer = true;
- return true;
- }
- return false;
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return TypeInfoParser.parse("String");
- }
- }
-
-
- /**
- * Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none"
- * @throws Exception
- */
- public void runFailOnAutoOffsetResetNone() throws Exception {
- final String topic = "auto-offset-reset-none-test";
- final int parallelism = 1;
-
- kafkaServer.createTestTopic(topic, parallelism, 1);
-
- final StreamExecutionEnvironment env =
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort());
- env.setParallelism(parallelism);
- env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
- env.getConfig().disableSysoutLogging();
-
- // ----------- add consumer ----------
-
- Properties customProps = new Properties();
- customProps.putAll(standardProps);
- customProps.putAll(secureProps);
- customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
- FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
-
- DataStreamSource<String> consuming = env.addSource(source);
- consuming.addSink(new DiscardingSink<String>());
-
- try {
- env.execute("Test auto offset reset none");
- } catch(Throwable e) {
- System.out.println("MESSAGE: " + e.getCause().getCause().getMessage());
- // check if correct exception has been thrown
- if(!e.getCause().getCause().getMessage().contains("Unable to find previous offset") // kafka 0.8
- && !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9
- ) {
- throw e;
- }
- }
-
- kafkaServer.deleteTestTopic(topic);
- }
-
- public void runFailOnAutoOffsetResetNoneEager() throws Exception {
- final String topic = "auto-offset-reset-none-test";
- final int parallelism = 1;
-
- kafkaServer.createTestTopic(topic, parallelism, 1);
-
- // ----------- add consumer ----------
-
- Properties customProps = new Properties();
- customProps.putAll(standardProps);
- customProps.putAll(secureProps);
- customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
-
- try {
- kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
- fail("should fail with an exception");
- }
- catch (IllegalArgumentException e) {
- // expected
- assertTrue(e.getMessage().contains("none"));
- }
-
- kafkaServer.deleteTestTopic(topic);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
deleted file mode 100644
index ae0af52..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.util.Properties;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
-public abstract class KafkaTableSinkTestBase {
-
- private static final String TOPIC = "testTopic";
- protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
- private static final TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
- private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
- private static final Properties PROPERTIES = createSinkProperties();
- @SuppressWarnings("unchecked")
- private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
- TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) {
-
- @Override
- protected void flush() {}
- };
-
- @Test
- @SuppressWarnings("unchecked")
- public void testKafkaTableSink() throws Exception {
- DataStream dataStream = mock(DataStream.class);
-
- KafkaTableSink kafkaTableSink = spy(createTableSink());
- kafkaTableSink.emitDataStream(dataStream);
-
- verify(dataStream).addSink(eq(PRODUCER));
-
- verify(kafkaTableSink).createKafkaProducer(
- eq(TOPIC),
- eq(PROPERTIES),
- any(getSerializationSchema().getClass()),
- eq(PARTITIONER));
- }
-
- @Test
- public void testConfiguration() {
- KafkaTableSink kafkaTableSink = createTableSink();
- KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
- assertNotSame(kafkaTableSink, newKafkaTableSink);
-
- assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
- assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
- assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
- }
-
- protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
- KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
-
- protected abstract SerializationSchema<Row> getSerializationSchema();
-
- private KafkaTableSink createTableSink() {
- return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
- }
-
- private static Properties createSinkProperties() {
- Properties properties = new Properties();
- properties.setProperty("bootstrap.servers", "localhost:12345");
- return properties;
- }
-
- private static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable {
- @Override
- public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- return 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
deleted file mode 100644
index 2a281e8..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import java.util.Properties;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.junit.Test;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
-public abstract class KafkaTableSourceTestBase {
-
- private static final String TOPIC = "testTopic";
- private static final String[] FIELD_NAMES = new String[] { "long", "string", "boolean", "double", "missing-field" };
- private static final TypeInformation<?>[] FIELD_TYPES = new TypeInformation<?>[] {
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO };
- private static final Properties PROPERTIES = createSourceProperties();
-
- @Test
- public void testKafkaTableSource() {
- KafkaTableSource kafkaTableSource = spy(createTableSource());
- StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class);
- kafkaTableSource.getDataStream(env);
-
- verify(env).addSource(any(getFlinkKafkaConsumer()));
-
- verify(kafkaTableSource).getKafkaConsumer(
- eq(TOPIC),
- eq(PROPERTIES),
- any(getDeserializationSchema()));
- }
-
- protected abstract KafkaTableSource createTableSource(String topic, Properties properties,
- String[] fieldNames, TypeInformation<?>[] typeInfo);
-
- protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();
-
- protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer();
-
- private KafkaTableSource createTableSource() {
- return createTableSource(TOPIC, PROPERTIES, FIELD_NAMES, FIELD_TYPES);
- }
-
- private static Properties createSourceProperties() {
- Properties properties = new Properties();
- properties.setProperty("zookeeper.connect", "dummy");
- properties.setProperty("group.id", "dummy");
- return properties;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
deleted file mode 100644
index 5cec4f0..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.jmx.JMXReporter;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.SuccessException;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.IOException;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * The base for the Kafka tests. It brings up:
- * <ul>
- * <li>A ZooKeeper mini cluster</li>
- * <li>Three Kafka Brokers (mini clusters)</li>
- * <li>A Flink mini cluster</li>
- * </ul>
- *
- * <p>Code in this test is based on the following GitHub repository:
- * <a href="https://github.com/sakserv/hadoop-mini-clusters">
- * https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
- * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
- */
-@SuppressWarnings("serial")
-public abstract class KafkaTestBase extends TestLogger {
-
- protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
-
- protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
-
- protected static String brokerConnectionStrings;
-
- protected static Properties standardProps;
-
- protected static LocalFlinkMiniCluster flink;
-
- protected static int flinkPort;
-
- protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
- protected static KafkaTestEnvironment kafkaServer;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
-
- protected static Properties secureProps = new Properties();
-
- // ------------------------------------------------------------------------
- // Setup and teardown of the mini clusters
- // ------------------------------------------------------------------------
-
- @BeforeClass
- public static void prepare() throws IOException, ClassNotFoundException {
-
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" Starting KafkaTestBase ");
- LOG.info("-------------------------------------------------------------------------");
-
- startClusters(false);
-
- }
-
- @AfterClass
- public static void shutDownServices() {
-
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" Shut down KafkaTestBase ");
- LOG.info("-------------------------------------------------------------------------");
-
- shutdownClusters();
-
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" KafkaTestBase finished");
- LOG.info("-------------------------------------------------------------------------");
- }
-
- protected static Configuration getFlinkConfiguration() {
- Configuration flinkConfig = new Configuration();
- flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
- flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
- flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
- flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
- return flinkConfig;
- }
-
- protected static void startClusters(boolean secureMode) throws ClassNotFoundException {
-
- // dynamically load the implementation for the test
- Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
- kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
-
- LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
-
- kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode);
-
- standardProps = kafkaServer.getStandardProperties();
-
- brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
-
- if (secureMode) {
- if (!kafkaServer.isSecureRunSupported()) {
- throw new IllegalStateException(
- "Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment.");
- }
- secureProps = kafkaServer.getSecureProperties();
- }
-
- // start also a re-usable Flink mini cluster
- flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
- flink.start();
-
- flinkPort = flink.getLeaderRPCPort();
-
- }
-
- protected static void shutdownClusters() {
-
- flinkPort = -1;
- if (flink != null) {
- flink.shutdown();
- }
-
- if(secureProps != null) {
- secureProps.clear();
- }
-
- kafkaServer.shutdown();
-
- }
-
-
-
- // ------------------------------------------------------------------------
- // Execution utilities
- // ------------------------------------------------------------------------
-
-
- protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception {
- try {
- see.execute(name);
- }
- catch (ProgramInvocationException | JobExecutionException root) {
- Throwable cause = root.getCause();
-
- // search for nested SuccessExceptions
- int depth = 0;
- while (!(cause instanceof SuccessException)) {
- if (cause == null || depth++ == 20) {
- throw root;
- }
- else {
- cause = cause.getCause();
- }
- }
- }
- }
-
- protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
- kafkaServer.createTestTopic(topic, numberOfPartitions, replicationFactor);
- }
-
- protected static void deleteTestTopic(String topic) {
- kafkaServer.deleteTestTopic(topic);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
deleted file mode 100644
index 10c7b86..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.server.KafkaServer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Properties;
-
-/**
- * Abstract class providing a Kafka test environment
- */
-public abstract class KafkaTestEnvironment {
-
- protected static final String KAFKA_HOST = "localhost";
-
- public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode);
-
- public void prepare(int numberOfKafkaServers, boolean secureMode) {
- this.prepare(numberOfKafkaServers, null, secureMode);
- }
-
- public abstract void shutdown();
-
- public abstract void deleteTestTopic(String topic);
-
- public abstract void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties properties);
-
- public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
- this.createTestTopic(topic, numberOfPartitions, replicationFactor, new Properties());
- }
-
- public abstract Properties getStandardProperties();
-
- public abstract Properties getSecureProperties();
-
- public abstract String getBrokerConnectionString();
-
- public abstract String getVersion();
-
- public abstract List<KafkaServer> getBrokers();
-
- // -- consumer / producer instances:
- public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, DeserializationSchema<T> deserializationSchema, Properties props) {
- return getConsumer(topics, new KeyedDeserializationSchemaWrapper<T>(deserializationSchema), props);
- }
-
- public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, KeyedDeserializationSchema<T> readSchema, Properties props) {
- return getConsumer(Collections.singletonList(topic), readSchema, props);
- }
-
- public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
- return getConsumer(Collections.singletonList(topic), deserializationSchema, props);
- }
-
- public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props);
-
- public abstract <T> StreamSink<T> getProducerSink(String topic,
- KeyedSerializationSchema<T> serSchema, Properties props,
- KafkaPartitioner<T> partitioner);
-
- public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
- KeyedSerializationSchema<T> serSchema, Properties props,
- KafkaPartitioner<T> partitioner);
-
- // -- offset handlers
-
- public interface KafkaOffsetHandler {
- Long getCommittedOffset(String topicName, int partition);
- void close();
- }
-
- public abstract KafkaOffsetHandler createOffsetHandler(Properties props);
-
- // -- leader failure simulation
-
- public abstract void restartBroker(int leaderId) throws Exception;
-
- public abstract int getLeaderToShutDown(String topic) throws Exception;
-
- public abstract int getBrokerId(KafkaServer server);
-
- public abstract boolean isSecureRunSupported();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
deleted file mode 100644
index 5dab05a..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-
-public class TestFixedPartitioner {
-
-
- /**
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 --------------/
- * 3 -------------/
- * 4 ------------/
- * </pre>
- */
- @Test
- public void testMoreFlinkThanBrokers() {
- FixedPartitioner<String> part = new FixedPartitioner<>();
-
- int[] partitions = new int[]{0};
-
- part.open(0, 4, partitions);
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
- part.open(1, 4, partitions);
- Assert.assertEquals(0, part.partition("abc2", null, null, partitions.length));
-
- part.open(2, 4, partitions);
- Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length));
- Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); // check if it is changing ;)
-
- part.open(3, 4, partitions);
- Assert.assertEquals(0, part.partition("abc4", null, null, partitions.length));
- }
-
- /**
- *
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 ----------------> 2
- * 3
- * 4
- * 5
- *
- * </pre>
- */
- @Test
- public void testFewerPartitions() {
- FixedPartitioner<String> part = new FixedPartitioner<>();
-
- int[] partitions = new int[]{0, 1, 2, 3, 4};
- part.open(0, 2, partitions);
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
- part.open(1, 2, partitions);
- Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
- Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
- }
-
- /*
- * Flink Sinks: Kafka Partitions
- * 1 ------------>---> 1
- * 2 -----------/----> 2
- * 3 ----------/
- */
- @Test
- public void testMixedCase() {
- FixedPartitioner<String> part = new FixedPartitioner<>();
- int[] partitions = new int[]{0,1};
-
- part.open(0, 3, partitions);
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
- part.open(1, 3, partitions);
- Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
-
- part.open(2, 3, partitions);
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
deleted file mode 100644
index 0b3507a..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.util.SerializedValue;
-
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-public class AbstractFetcherTimestampsTest {
-
- @Test
- public void testPunctuatedWatermarks() throws Exception {
- final String testTopic = "test topic name";
- List<KafkaTopicPartition> originalPartitions = Arrays.asList(
- new KafkaTopicPartition(testTopic, 7),
- new KafkaTopicPartition(testTopic, 13),
- new KafkaTopicPartition(testTopic, 21));
-
- TestSourceContext<Long> sourceContext = new TestSourceContext<>();
-
- TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
-
- TestFetcher<Long> fetcher = new TestFetcher<>(
- sourceContext,
- originalPartitions,
- null, /* periodic watermark assigner */
- new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()),
- processingTimeProvider,
- 0);
-
- final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
- final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
- final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
-
- // elements generate a watermark if the timestamp is a multiple of three
-
- // elements for partition 1
- fetcher.emitRecord(1L, part1, 1L);
- fetcher.emitRecord(2L, part1, 2L);
- fetcher.emitRecord(3L, part1, 3L);
- assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
- assertFalse(sourceContext.hasWatermark());
-
- // elements for partition 2
- fetcher.emitRecord(12L, part2, 1L);
- assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
- assertFalse(sourceContext.hasWatermark());
-
- // elements for partition 3
- fetcher.emitRecord(101L, part3, 1L);
- fetcher.emitRecord(102L, part3, 2L);
- assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
-
- // now, we should have a watermark
- assertTrue(sourceContext.hasWatermark());
- assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
-
- // advance partition 3
- fetcher.emitRecord(1003L, part3, 3L);
- fetcher.emitRecord(1004L, part3, 4L);
- fetcher.emitRecord(1005L, part3, 5L);
- assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
-
- // advance partition 1 beyond partition 2 - this bumps the watermark
- fetcher.emitRecord(30L, part1, 4L);
- assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
- assertTrue(sourceContext.hasWatermark());
- assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
-
- // advance partition 2 again - this bumps the watermark
- fetcher.emitRecord(13L, part2, 2L);
- assertFalse(sourceContext.hasWatermark());
- fetcher.emitRecord(14L, part2, 3L);
- assertFalse(sourceContext.hasWatermark());
- fetcher.emitRecord(15L, part2, 3L);
- assertTrue(sourceContext.hasWatermark());
- assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
- }
-
- @Test
- public void testPeriodicWatermarks() throws Exception {
- final String testTopic = "test topic name";
- List<KafkaTopicPartition> originalPartitions = Arrays.asList(
- new KafkaTopicPartition(testTopic, 7),
- new KafkaTopicPartition(testTopic, 13),
- new KafkaTopicPartition(testTopic, 21));
-
- TestSourceContext<Long> sourceContext = new TestSourceContext<>();
-
- TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
-
- TestFetcher<Long> fetcher = new TestFetcher<>(
- sourceContext,
- originalPartitions,
- new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
- null, /* punctuated watermarks assigner*/
- processingTimeService,
- 10);
-
- final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
- final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
- final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
-
- // elements generate a watermark if the timestamp is a multiple of three
-
- // elements for partition 1
- fetcher.emitRecord(1L, part1, 1L);
- fetcher.emitRecord(2L, part1, 2L);
- fetcher.emitRecord(3L, part1, 3L);
- assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
-
- // elements for partition 2
- fetcher.emitRecord(12L, part2, 1L);
- assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
-
- // elements for partition 3
- fetcher.emitRecord(101L, part3, 1L);
- fetcher.emitRecord(102L, part3, 2L);
- assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
-
- processingTimeService.setCurrentTime(10);
-
- // now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
- assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
-
- // advance partition 3
- fetcher.emitRecord(1003L, part3, 3L);
- fetcher.emitRecord(1004L, part3, 4L);
- fetcher.emitRecord(1005L, part3, 5L);
- assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
-
- // advance partition 1 beyond partition 2 - this bumps the watermark
- fetcher.emitRecord(30L, part1, 4L);
- assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
-
- processingTimeService.setCurrentTime(20);
-
- // this blocks until the periodic thread emitted the watermark
- assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
-
- // advance partition 2 again - this bumps the watermark
- fetcher.emitRecord(13L, part2, 2L);
- fetcher.emitRecord(14L, part2, 3L);
- fetcher.emitRecord(15L, part2, 3L);
-
- processingTimeService.setCurrentTime(30);
- // this blocks until the periodic thread emitted the watermark
- long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
- assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
- }
-
- // ------------------------------------------------------------------------
- // Test mocks
- // ------------------------------------------------------------------------
-
- private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
-
- protected TestFetcher(
- SourceContext<T> sourceContext,
- List<KafkaTopicPartition> assignedPartitions,
- SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
- SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- ProcessingTimeService processingTimeProvider,
- long autoWatermarkInterval) throws Exception
- {
- super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), false);
- }
-
- @Override
- public void runFetchLoop() throws Exception {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void cancel() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
- return new Object();
- }
-
- @Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
- throw new UnsupportedOperationException();
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static final class TestSourceContext<T> implements SourceContext<T> {
-
- private final Object checkpointLock = new Object();
- private final Object watermarkLock = new Object();
-
- private volatile StreamRecord<T> latestElement;
- private volatile Watermark currentWatermark;
-
- @Override
- public void collect(T element) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- this.latestElement = new StreamRecord<>(element, timestamp);
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- synchronized (watermarkLock) {
- currentWatermark = mark;
- watermarkLock.notifyAll();
- }
- }
-
- @Override
- public Object getCheckpointLock() {
- return checkpointLock;
- }
-
- @Override
- public void close() {}
-
- public StreamRecord<T> getLatestElement() {
- return latestElement;
- }
-
- public boolean hasWatermark() {
- return currentWatermark != null;
- }
-
- public Watermark getLatestWatermark() throws InterruptedException {
- synchronized (watermarkLock) {
- while (currentWatermark == null) {
- watermarkLock.wait();
- }
- Watermark wm = currentWatermark;
- currentWatermark = null;
- return wm;
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
-
- private volatile long maxTimestamp = Long.MIN_VALUE;
-
- @Override
- public long extractTimestamp(Long element, long previousElementTimestamp) {
- maxTimestamp = Math.max(maxTimestamp, element);
- return element;
- }
-
- @Nullable
- @Override
- public Watermark getCurrentWatermark() {
- return new Watermark(maxTimestamp);
- }
- }
-
- private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> {
-
- @Override
- public long extractTimestamp(Long element, long previousElementTimestamp) {
- return element;
- }
-
- @Nullable
- @Override
- public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
- return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
deleted file mode 100644
index 0e16263..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-
-import static org.junit.Assert.*;
-
-public class KafkaTopicPartitionTest {
-
- @Test
- public void validateUid() {
- Field uidField;
- try {
- uidField = KafkaTopicPartition.class.getDeclaredField("serialVersionUID");
- uidField.setAccessible(true);
- }
- catch (NoSuchFieldException e) {
- fail("serialVersionUID is not defined");
- return;
- }
-
- assertTrue(Modifier.isStatic(uidField.getModifiers()));
- assertTrue(Modifier.isFinal(uidField.getModifiers()));
- assertTrue(Modifier.isPrivate(uidField.getModifiers()));
-
- assertEquals(long.class, uidField.getType());
-
- // the UID has to be constant to make sure old checkpoints/savepoints can be read
- try {
- assertEquals(722083576322742325L, uidField.getLong(null));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
deleted file mode 100644
index 9e8e1d9..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
-import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import java.util.Collection;
-import java.util.Properties;
-import java.util.Random;
-
-import static org.mockito.Mockito.mock;
-
-@SuppressWarnings("serial")
-public class DataGenerators {
-
- public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
- KafkaTestEnvironment testServer, String topic,
- final int numPartitions,
- final int numElements,
- final boolean randomizeOrder) throws Exception {
- env.setParallelism(numPartitions);
- env.getConfig().disableSysoutLogging();
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- DataStream<Integer> stream = env.addSource(
- new RichParallelSourceFunction<Integer>() {
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Integer> ctx) {
- // create a sequence
- int[] elements = new int[numElements];
- for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
- i < numElements;
- i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
-
- elements[i] = val;
- }
-
- // scramble the sequence
- if (randomizeOrder) {
- Random rnd = new Random();
- for (int i = 0; i < elements.length; i++) {
- int otherPos = rnd.nextInt(elements.length);
-
- int tmp = elements[i];
- elements[i] = elements[otherPos];
- elements[otherPos] = tmp;
- }
- }
-
- // emit the sequence
- int pos = 0;
- while (running && pos < elements.length) {
- ctx.collect(elements[pos++]);
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- Properties props = new Properties();
- props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()));
- Properties secureProps = testServer.getSecureProperties();
- if(secureProps != null) {
- props.putAll(testServer.getSecureProperties());
- }
-
- stream = stream.rebalance();
- testServer.produceIntoKafka(stream, topic,
- new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
- props,
- new KafkaPartitioner<Integer>() {
- @Override
- public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- return next % numPartitions;
- }
- });
-
- env.execute("Scrambles int sequence generator");
- }
-
- // ------------------------------------------------------------------------
-
- public static class InfiniteStringsGenerator extends Thread {
-
- private final KafkaTestEnvironment server;
-
- private final String topic;
-
- private volatile Throwable error;
-
- private volatile boolean running = true;
-
-
- public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) {
- this.server = server;
- this.topic = topic;
- }
-
- @Override
- public void run() {
- // we manually feed data into the Kafka sink
- RichFunction producer = null;
- try {
- Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString());
- producerProperties.setProperty("retries", "3");
- StreamTransformation<String> mockTransform = new MockStreamTransformation();
- DataStream<String> stream = new DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform);
-
- StreamSink<String> sink = server.getProducerSink(
- topic,
- new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
- producerProperties,
- new FixedPartitioner<String>());
-
- OneInputStreamOperatorTestHarness<String, Object> testHarness =
- new OneInputStreamOperatorTestHarness<>(sink);
-
- testHarness.open();
-
- final StringBuilder bld = new StringBuilder();
- final Random rnd = new Random();
-
- while (running) {
- bld.setLength(0);
-
- int len = rnd.nextInt(100) + 1;
- for (int i = 0; i < len; i++) {
- bld.append((char) (rnd.nextInt(20) + 'a') );
- }
-
- String next = bld.toString();
- testHarness.processElement(new StreamRecord<>(next));
- }
- }
- catch (Throwable t) {
- this.error = t;
- }
- finally {
- if (producer != null) {
- try {
- producer.close();
- }
- catch (Throwable t) {
- // ignore
- }
- }
- }
- }
-
- public void shutdown() {
- this.running = false;
- this.interrupt();
- }
-
- public Throwable getError() {
- return this.error;
- }
-
- private static class MockStreamTransformation extends StreamTransformation<String> {
- public MockStreamTransformation() {
- super("MockTransform", TypeInfoParser.<String>parse("String"), 1);
- }
-
- @Override
- public void setChainingStrategy(ChainingStrategy strategy) {
-
- }
-
- @Override
- public Collection<StreamTransformation<?>> getTransitivePredecessors() {
- return null;
- }
- }
-
- public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
-
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- return null;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
deleted file mode 100644
index 2bd400c..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
- Checkpointed<Integer>, CheckpointListener, Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
-
- private static final long serialVersionUID = 6334389850158707313L;
-
- public static volatile boolean failedBefore;
- public static volatile boolean hasBeenCheckpointedBeforeFailure;
-
- private final int failCount;
- private int numElementsTotal;
- private int numElementsThisTime;
-
- private boolean failer;
- private boolean hasBeenCheckpointed;
-
- private Thread printer;
- private volatile boolean printerRunning = true;
-
- public FailingIdentityMapper(int failCount) {
- this.failCount = failCount;
- }
-
- @Override
- public void open(Configuration parameters) {
- failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
- printer = new Thread(this, "FailingIdentityMapper Status Printer");
- printer.start();
- }
-
- @Override
- public T map(T value) throws Exception {
- numElementsTotal++;
- numElementsThisTime++;
-
- if (!failedBefore) {
- Thread.sleep(10);
-
- if (failer && numElementsTotal >= failCount) {
- hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
- failedBefore = true;
- throw new Exception("Artificial Test Failure");
- }
- }
- return value;
- }
-
- @Override
- public void close() throws Exception {
- printerRunning = false;
- if (printer != null) {
- printer.interrupt();
- printer = null;
- }
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- this.hasBeenCheckpointed = true;
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return numElementsTotal;
- }
-
- @Override
- public void restoreState(Integer state) {
- numElementsTotal = state;
- }
-
- @Override
- public void run() {
- while (printerRunning) {
- try {
- Thread.sleep(5000);
- }
- catch (InterruptedException e) {
- // ignore
- }
- LOG.info("============================> Failing mapper {}: count={}, totalCount={}",
- getRuntimeContext().getIndexOfThisSubtask(),
- numElementsThisTime, numElementsTotal);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
deleted file mode 100644
index 055326d..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-
-import java.util.Properties;
-
-public class FakeStandardProducerConfig {
-
- public static Properties get() {
- Properties p = new Properties();
- p.setProperty("bootstrap.servers", "localhost:12345");
- p.setProperty("key.serializer", ByteArraySerializer.class.getName());
- p.setProperty("value.serializer", ByteArraySerializer.class.getName());
- return p;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
deleted file mode 100644
index acdad5a..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class JobManagerCommunicationUtils {
-
- private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
-
-
- public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception {
- while (true) {
- // find the jobID
- Future<Object> listResponse = jobManager.ask(
- JobManagerMessages.getRequestRunningJobsStatus(), askTimeout);
-
- Object result = Await.result(listResponse, askTimeout);
- List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-
-
- if (jobs.isEmpty()) {
- return;
- }
-
- Thread.sleep(50);
- }
- }
-
- public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
- cancelCurrentJob(jobManager, null);
- }
-
- public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception {
- JobStatusMessage status = null;
-
- for (int i = 0; i < 200; i++) {
- // find the jobID
- Future<Object> listResponse = jobManager.ask(
- JobManagerMessages.getRequestRunningJobsStatus(),
- askTimeout);
-
- List<JobStatusMessage> jobs;
- try {
- Object result = Await.result(listResponse, askTimeout);
- jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
- }
- catch (Exception e) {
- throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
- }
-
- if (jobs.isEmpty()) {
- // try again, fall through the loop
- Thread.sleep(50);
- }
- else if (jobs.size() == 1) {
- status = jobs.get(0);
- }
- else if(name != null) {
- for(JobStatusMessage msg: jobs) {
- if(msg.getJobName().equals(name)) {
- status = msg;
- }
- }
- if(status == null) {
- throw new Exception("Could not cancel job - no job matched expected name = '" + name +"' in " + jobs);
- }
- } else {
- String jobNames = "";
- for(JobStatusMessage jsm: jobs) {
- jobNames += jsm.getJobName() + ", ";
- }
- throw new Exception("Could not cancel job - more than one running job: " + jobNames);
- }
- }
-
- if (status == null) {
- throw new Exception("Could not cancel job - no running jobs");
- }
- else if (status.getJobState().isGloballyTerminalState()) {
- throw new Exception("Could not cancel job - job is not running any more");
- }
-
- JobID jobId = status.getJobId();
-
- Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
- try {
- Await.result(response, askTimeout);
- }
- catch (Exception e) {
- throw new Exception("Sending the 'cancel' message failed.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
deleted file mode 100644
index e105e01..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-
-import java.util.HashSet;
-import java.util.Set;
-
-
-public class PartitionValidatingMapper implements MapFunction<Integer, Integer> {
-
- private static final long serialVersionUID = 1088381231244959088L;
-
- /* the partitions from which this function received data */
- private final Set<Integer> myPartitions = new HashSet<>();
-
- private final int numPartitions;
- private final int maxPartitions;
-
- public PartitionValidatingMapper(int numPartitions, int maxPartitions) {
- this.numPartitions = numPartitions;
- this.maxPartitions = maxPartitions;
- }
-
- @Override
- public Integer map(Integer value) throws Exception {
- // validate that the partitioning is identical
- int partition = value % numPartitions;
- myPartitions.add(partition);
- if (myPartitions.size() > maxPartitions) {
- throw new Exception("Error: Elements from too many different partitions: " + myPartitions
- + ". Expect elements only from " + maxPartitions + " partitions");
- }
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
deleted file mode 100644
index 1d61229..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-
-/**
- * An identity map function that sleeps between elements, throttling the
- * processing speed.
- *
- * @param <T> The type mapped.
- */
-public class ThrottledMapper<T> implements MapFunction<T,T> {
-
- private static final long serialVersionUID = 467008933767159126L;
-
- private final int sleep;
-
- public ThrottledMapper(int sleep) {
- this.sleep = sleep;
- }
-
- @Override
- public T map(T value) throws Exception {
- Thread.sleep(this.sleep);
- return value;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
deleted file mode 100644
index c9e9ac1..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-
-import java.io.Serializable;
-
-/**
- * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions.
- */
-public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final int expectedPartitions;
-
- public Tuple2Partitioner(int expectedPartitions) {
- this.expectedPartitions = expectedPartitions;
- }
-
- @Override
- public int partition(Tuple2<Integer, Integer> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- if (numPartitions != expectedPartitions) {
- throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
- }
-
- return next.f0;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
deleted file mode 100644
index 7813561..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.test.util.SuccessException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.BitSet;
-
-public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements Checkpointed<Tuple2<Integer, BitSet>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
-
- private static final long serialVersionUID = 1748426382527469932L;
-
- private final int numElementsTotal;
-
- private BitSet duplicateChecker = new BitSet(); // this is checkpointed
-
- private int numElements; // this is checkpointed
-
-
- public ValidatingExactlyOnceSink(int numElementsTotal) {
- this.numElementsTotal = numElementsTotal;
- }
-
-
- @Override
- public void invoke(Integer value) throws Exception {
- numElements++;
-
- if (duplicateChecker.get(value)) {
- throw new Exception("Received a duplicate: " + value);
- }
- duplicateChecker.set(value);
- if (numElements == numElementsTotal) {
- // validate
- if (duplicateChecker.cardinality() != numElementsTotal) {
- throw new Exception("Duplicate checker has wrong cardinality");
- }
- else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
- throw new Exception("Received sparse sequence");
- }
- else {
- throw new SuccessException();
- }
- }
- }
-
- @Override
- public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) {
- LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId);
- return new Tuple2<>(numElements, duplicateChecker);
- }
-
- @Override
- public void restoreState(Tuple2<Integer, BitSet> state) {
- LOG.info("restoring num elements to {}", state.f0);
- this.numElements = state.f0;
- this.duplicateChecker = state.f1;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
deleted file mode 100644
index 8a4c408..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import java.nio.charset.Charset;
-
-/**
- * Simple ZooKeeper serializer for Strings.
- */
-public class ZooKeeperStringSerializer implements ZkSerializer {
-
- private static final Charset CHARSET = Charset.forName("UTF-8");
-
- @Override
- public byte[] serialize(Object data) {
- if (data instanceof String) {
- return ((String) data).getBytes(CHARSET);
- }
- else {
- throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
- }
- }
-
- @Override
- public Object deserialize(byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- else {
- return new String(bytes, CHARSET);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
deleted file mode 100644
index 6bdfb48..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-