You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:04:02 UTC
[46/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
deleted file mode 100644
index d511796..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.common.KafkaException;
-import kafka.consumer.ConsumerConfig;
-import kafka.network.SocketServer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.common.PartitionInfo;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * The base for the Kafka tests. It brings up:
- * <ul>
- * <li>A ZooKeeper mini cluster</li>
- * <li>Three Kafka Brokers (mini clusters)</li>
- * <li>A Flink mini cluster</li>
- * </ul>
- *
- * <p>Code in this test is based on the following GitHub repository:
- * <a href="https://github.com/sakserv/hadoop-mini-clusters">
- * https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
- * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
- */
-@SuppressWarnings("serial")
-public abstract class KafkaTestBase extends TestLogger {
-
- protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
-
- protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
-
- protected static String zookeeperConnectionString;
-
- protected static File tmpZkDir;
-
- protected static File tmpKafkaParent;
-
- protected static TestingServer zookeeper;
- protected static List<KafkaServer> brokers;
- protected static String brokerConnectionStrings = "";
-
- protected static ConsumerConfig standardCC;
- protected static Properties standardProps;
-
- protected static ForkableFlinkMiniCluster flink;
-
- protected static int flinkPort;
-
- protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
- protected static List<File> tmpKafkaDirs;
-
- protected static String kafkaHost = "localhost";
-
- // ------------------------------------------------------------------------
- // Setup and teardown of the mini clusters
- // ------------------------------------------------------------------------
-
- @BeforeClass
- public static void prepare() throws IOException {
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" Starting KafkaITCase ");
- LOG.info("-------------------------------------------------------------------------");
-
- LOG.info("Starting KafkaITCase.prepare()");
-
- File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
- tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
- assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
-
- tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
- assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
-
- tmpKafkaDirs = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
- for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
- File tmpDir = new File(tmpKafkaParent, "server-" + i);
- assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
- tmpKafkaDirs.add(tmpDir);
- }
-
- zookeeper = null;
- brokers = null;
-
- try {
- LOG.info("Starting Zookeeper");
- zookeeper = new TestingServer(-1, tmpZkDir);
- zookeeperConnectionString = zookeeper.getConnectString();
-
- LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
-
- for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
- brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i), kafkaHost, zookeeperConnectionString));
- SocketServer socketServer = brokers.get(i).socketServer();
-
- String host = socketServer.host() == null ? "localhost" : socketServer.host();
- brokerConnectionStrings += hostAndPortToUrlString(host, socketServer.port()) + ",";
- }
-
- LOG.info("ZK and KafkaServer started.");
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Test setup failed: " + t.getMessage());
- }
-
- standardProps = new Properties();
-
- standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
- standardProps.setProperty("bootstrap.servers", brokerConnectionStrings);
- standardProps.setProperty("group.id", "flink-tests");
- standardProps.setProperty("auto.commit.enable", "false");
- standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis.
- standardProps.setProperty("zookeeper.connection.timeout.ms", "20000");
- standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
- standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
-
- Properties consumerConfigProps = new Properties();
- consumerConfigProps.putAll(standardProps);
- consumerConfigProps.setProperty("auto.offset.reset", "smallest");
- standardCC = new ConsumerConfig(consumerConfigProps);
-
- // start also a re-usable Flink mini cluster
-
- Configuration flinkConfig = new Configuration();
- flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
- flinkConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s");
-
- flink = new ForkableFlinkMiniCluster(flinkConfig, false, StreamingMode.STREAMING);
- flink.start();
-
- flinkPort = flink.getLeaderRPCPort();
- }
-
- @AfterClass
- public static void shutDownServices() {
-
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" Shut down KafkaITCase ");
- LOG.info("-------------------------------------------------------------------------");
-
- flinkPort = -1;
- if (flink != null) {
- flink.shutdown();
- }
-
- for (KafkaServer broker : brokers) {
- if (broker != null) {
- broker.shutdown();
- }
- }
- brokers.clear();
-
- if (zookeeper != null) {
- try {
- zookeeper.stop();
- }
- catch (Exception e) {
- LOG.warn("ZK.stop() failed", e);
- }
- zookeeper = null;
- }
-
- // clean up the temp spaces
-
- if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
- try {
- FileUtils.deleteDirectory(tmpKafkaParent);
- }
- catch (Exception e) {
- // ignore
- }
- }
- if (tmpZkDir != null && tmpZkDir.exists()) {
- try {
- FileUtils.deleteDirectory(tmpZkDir);
- }
- catch (Exception e) {
- // ignore
- }
- }
-
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" KafkaITCase finished");
- LOG.info("-------------------------------------------------------------------------");
- }
-
- /**
- * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
- */
- protected static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
- String kafkaHost,
- String zookeeperConnectionString) throws Exception {
- Properties kafkaProperties = new Properties();
-
- // properties have to be Strings
- kafkaProperties.put("advertised.host.name", kafkaHost);
- kafkaProperties.put("broker.id", Integer.toString(brokerId));
- kafkaProperties.put("log.dir", tmpFolder.toString());
- kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
- kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
- kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
-
- // for CI stability, increase zookeeper session timeout
- kafkaProperties.put("zookeeper.session.timeout.ms", "20000");
-
- final int numTries = 5;
-
- for (int i = 1; i <= numTries; i++) {
- int kafkaPort = NetUtils.getAvailablePort();
- kafkaProperties.put("port", Integer.toString(kafkaPort));
- KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
- try {
- KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
- server.startup();
- return server;
- }
- catch (KafkaException e) {
- if (e.getCause() instanceof BindException) {
- // port conflict, retry...
- LOG.info("Port conflict when starting Kafka Broker. Retrying...");
- }
- else {
- throw e;
- }
- }
- }
-
- throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
- }
-
- // ------------------------------------------------------------------------
- // Execution utilities
- // ------------------------------------------------------------------------
-
- protected ZkClient createZookeeperClient() {
- return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
- standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
- }
-
- protected static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
- try {
- see.execute(name);
- }
- catch (ProgramInvocationException | JobExecutionException root) {
- Throwable cause = root.getCause();
-
- // search for nested SuccessExceptions
- int depth = 0;
- while (!(cause instanceof SuccessException)) {
- if (cause == null || depth++ == 20) {
- root.printStackTrace();
- fail("Test failed: " + root.getMessage());
- }
- else {
- cause = cause.getCause();
- }
- }
- }
- }
-
- protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception {
- try {
- see.execute(name);
- }
- catch (ProgramInvocationException | JobExecutionException root) {
- Throwable cause = root.getCause();
-
- // search for nested SuccessExceptions
- int depth = 0;
- while (!(cause instanceof SuccessException)) {
- if (cause == null || depth++ == 20) {
- throw root;
- }
- else {
- cause = cause.getCause();
- }
- }
- }
- }
-
-
-
- protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
-
- // create topic with one client
- Properties topicConfig = new Properties();
- LOG.info("Creating topic {}", topic);
-
- ZkClient creator = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
- standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-
- AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig);
- creator.close();
-
- // validate that the topic has been created
- final long deadline = System.currentTimeMillis() + 30000;
- do {
- try {
- Thread.sleep(100);
- }
- catch (InterruptedException e) {
- // restore interrupted state
- }
- List<PartitionInfo> partitions = FlinkKafkaConsumer.getPartitionsForTopic(topic, standardProps);
- if (partitions != null && partitions.size() > 0) {
- return;
- }
- }
- while (System.currentTimeMillis() < deadline);
- fail ("Test topic could not be created");
- }
-
- protected static void deleteTestTopic(String topic) {
- LOG.info("Deleting topic {}", topic);
-
- ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
- standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-
- AdminUtils.deleteTopic(zk, topic);
-
- zk.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
deleted file mode 100644
index 75fdd46..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-
-public class TestFixedPartitioner {
-
-
- /**
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 --------------/
- * 3 -------------/
- * 4 ------------/
- * </pre>
- */
- @Test
- public void testMoreFlinkThanBrokers() {
- FixedPartitioner part = new FixedPartitioner();
-
- int[] partitions = new int[]{0};
-
- part.open(0, 4, partitions);
- Assert.assertEquals(0, part.partition("abc1", partitions.length));
-
- part.open(1, 4, partitions);
- Assert.assertEquals(0, part.partition("abc2", partitions.length));
-
- part.open(2, 4, partitions);
- Assert.assertEquals(0, part.partition("abc3", partitions.length));
- Assert.assertEquals(0, part.partition("abc3", partitions.length)); // check if it is changing ;)
-
- part.open(3, 4, partitions);
- Assert.assertEquals(0, part.partition("abc4", partitions.length));
- }
-
- /**
- *
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 ----------------> 2
- * 3
- * 4
- * 5
- *
- * </pre>
- */
- @Test
- public void testFewerPartitions() {
- FixedPartitioner part = new FixedPartitioner();
-
- int[] partitions = new int[]{0, 1, 2, 3, 4};
- part.open(0, 2, partitions);
- Assert.assertEquals(0, part.partition("abc1", partitions.length));
- Assert.assertEquals(0, part.partition("abc1", partitions.length));
-
- part.open(1, 2, partitions);
- Assert.assertEquals(1, part.partition("abc1", partitions.length));
- Assert.assertEquals(1, part.partition("abc1", partitions.length));
- }
-
- /*
- * Flink Sinks: Kafka Partitions
- * 1 ------------>---> 1
- * 2 -----------/----> 2
- * 3 ----------/
- */
- @Test
- public void testMixedCase() {
- FixedPartitioner part = new FixedPartitioner();
- int[] partitions = new int[]{0,1};
-
- part.open(0, 3, partitions);
- Assert.assertEquals(0, part.partition("abc1", partitions.length));
-
- part.open(1, 3, partitions);
- Assert.assertEquals(1, part.partition("abc1", partitions.length));
-
- part.open(2, 3, partitions);
- Assert.assertEquals(0, part.partition("abc1", partitions.length));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
deleted file mode 100644
index 27ad2e8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.admin.AdminUtils;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
-
-import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
-
- @Test
- public void runOffsetManipulationinZooKeeperTest() {
- try {
- final String topicName = "ZookeeperOffsetHandlerTest-Topic";
- final String groupId = "ZookeeperOffsetHandlerTest-Group";
-
- final long offset = (long) (Math.random() * Long.MAX_VALUE);
-
- ZkClient zkClient = createZookeeperClient();
- AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties());
-
- ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, groupId, topicName, 0, offset);
-
- long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, groupId, topicName, 0);
-
- zkClient.close();
-
- assertEquals(offset, fetchedOffset);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
deleted file mode 100644
index 32377ae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import java.util.Random;
-
-@SuppressWarnings("serial")
-public class DataGenerators {
-
- public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
- String brokerConnection, String topic,
- int numPartitions,
- final int from, final int to) throws Exception {
-
- TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
- env.setParallelism(numPartitions);
- env.getConfig().disableSysoutLogging();
- env.setNumberOfExecutionRetries(0);
-
- DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
- new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
- int cnt = from;
- int partition = getRuntimeContext().getIndexOfThisSubtask();
-
- while (running && cnt <= to) {
- ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- stream.addSink(new FlinkKafkaProducer<>(topic,
- new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
- FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
- new Tuple2Partitioner(numPartitions)
- ));
-
- env.execute("Data generator (Int, Int) stream to topic " + topic);
- }
-
- // ------------------------------------------------------------------------
-
- public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
- String brokerConnection, String topic,
- final int numPartitions,
- final int numElements,
- final boolean randomizeOrder) throws Exception {
- env.setParallelism(numPartitions);
- env.getConfig().disableSysoutLogging();
- env.setNumberOfExecutionRetries(0);
-
- DataStream<Integer> stream = env.addSource(
- new RichParallelSourceFunction<Integer>() {
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Integer> ctx) {
- // create a sequence
- int[] elements = new int[numElements];
- for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
- i < numElements;
- i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
-
- elements[i] = val;
- }
-
- // scramble the sequence
- if (randomizeOrder) {
- Random rnd = new Random();
- for (int i = 0; i < elements.length; i++) {
- int otherPos = rnd.nextInt(elements.length);
-
- int tmp = elements[i];
- elements[i] = elements[otherPos];
- elements[otherPos] = tmp;
- }
- }
-
- // emit the sequence
- int pos = 0;
- while (running && pos < elements.length) {
- ctx.collect(elements[pos++]);
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- stream
- .rebalance()
- .addSink(new FlinkKafkaProducer<>(topic,
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
- FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
- new KafkaPartitioner() {
- @Override
- public int partition(Object key, int numPartitions) {
- return ((Integer) key) % numPartitions;
- }
- }));
-
- env.execute("Scrambles int sequence generator");
- }
-
- // ------------------------------------------------------------------------
-
- public static class InfiniteStringsGenerator extends Thread {
-
- private final String kafkaConnectionString;
-
- private final String topic;
-
- private volatile Throwable error;
-
- private volatile boolean running = true;
-
-
- public InfiniteStringsGenerator(String kafkaConnectionString, String topic) {
- this.kafkaConnectionString = kafkaConnectionString;
- this.topic = topic;
- }
-
- @Override
- public void run() {
- // we manually feed data into the Kafka sink
- FlinkKafkaProducer<String> producer = null;
- try {
- producer = new FlinkKafkaProducer<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
- producer.setRuntimeContext(new MockRuntimeContext(1,0));
- producer.open(new Configuration());
-
- final StringBuilder bld = new StringBuilder();
- final Random rnd = new Random();
-
- while (running) {
- bld.setLength(0);
-
- int len = rnd.nextInt(100) + 1;
- for (int i = 0; i < len; i++) {
- bld.append((char) (rnd.nextInt(20) + 'a') );
- }
-
- String next = bld.toString();
- producer.invoke(next);
- }
- }
- catch (Throwable t) {
- this.error = t;
- }
- finally {
- if (producer != null) {
- try {
- producer.close();
- }
- catch (Throwable t) {
- // ignore
- }
- }
- }
- }
-
- public void shutdown() {
- this.running = false;
- this.interrupt();
- }
-
- public Throwable getError() {
- return this.error;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
deleted file mode 100644
index 987e6c5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-
-/**
- * Sink function that discards data.
- * @param <T> The type of the function.
- */
-public class DiscardingSink<T> implements SinkFunction<T> {
-
- private static final long serialVersionUID = 2777597566520109843L;
-
- @Override
- public void invoke(T value) {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
deleted file mode 100644
index 5a8ffaa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
- Checkpointed<Integer>, CheckpointNotifier, Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
-
- private static final long serialVersionUID = 6334389850158707313L;
-
- public static volatile boolean failedBefore;
- public static volatile boolean hasBeenCheckpointedBeforeFailure;
-
- private final int failCount;
- private int numElementsTotal;
- private int numElementsThisTime;
-
- private boolean failer;
- private boolean hasBeenCheckpointed;
-
- private Thread printer;
- private volatile boolean printerRunning = true;
-
- public FailingIdentityMapper(int failCount) {
- this.failCount = failCount;
- }
-
- @Override
- public void open(Configuration parameters) {
- failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
- printer = new Thread(this, "FailingIdentityMapper Status Printer");
- printer.start();
- }
-
- @Override
- public T map(T value) throws Exception {
- numElementsTotal++;
- numElementsThisTime++;
-
- if (!failedBefore) {
- Thread.sleep(10);
-
- if (failer && numElementsTotal >= failCount) {
- hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
- failedBefore = true;
- throw new Exception("Artificial Test Failure");
- }
- }
- return value;
- }
-
- @Override
- public void close() throws Exception {
- printerRunning = false;
- if (printer != null) {
- printer.interrupt();
- printer = null;
- }
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- this.hasBeenCheckpointed = true;
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return numElementsTotal;
- }
-
- @Override
- public void restoreState(Integer state) {
- numElementsTotal = state;
- }
-
- @Override
- public void run() {
- while (printerRunning) {
- try {
- Thread.sleep(5000);
- }
- catch (InterruptedException e) {
- // ignore
- }
- LOG.info("============================> Failing mapper {}: count={}, totalCount={}",
- getRuntimeContext().getIndexOfThisSubtask(),
- numElementsThisTime, numElementsTotal);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
deleted file mode 100644
index e94adb5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class JobManagerCommunicationUtils {
-
- private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
-
-
- public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
-
- // find the jobID
- Future<Object> listResponse = jobManager.ask(
- JobManagerMessages.getRequestRunningJobsStatus(),
- askTimeout);
-
- List<JobStatusMessage> jobs;
- try {
- Object result = Await.result(listResponse, askTimeout);
- jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
- }
- catch (Exception e) {
- throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
- }
-
- if (jobs.isEmpty()) {
- throw new Exception("Could not cancel job - no running jobs");
- }
- if (jobs.size() != 1) {
- throw new Exception("Could not cancel job - more than one running job.");
- }
-
- JobStatusMessage status = jobs.get(0);
- if (status.getJobState().isTerminalState()) {
- throw new Exception("Could not cancel job - job is not running any more");
- }
-
- JobID jobId = status.getJobId();
-
- Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
- try {
- Await.result(response, askTimeout);
- }
- catch (Exception e) {
- throw new Exception("Sending the 'cancel' message failed.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
deleted file mode 100644
index b9fc3de..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.DoubleCounter;
-import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-public class MockRuntimeContext implements RuntimeContext {
-
- private final int numberOfParallelSubtasks;
- private final int indexOfThisSubtask;
-
- public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
- this.numberOfParallelSubtasks = numberOfParallelSubtasks;
- this.indexOfThisSubtask = indexOfThisSubtask;
- }
-
-
- @Override
- public String getTaskName() {
- return null;
- }
-
- @Override
- public int getNumberOfParallelSubtasks() {
- return numberOfParallelSubtasks;
- }
-
- @Override
- public int getIndexOfThisSubtask() {
- return indexOfThisSubtask;
- }
-
- @Override
- public ExecutionConfig getExecutionConfig() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ClassLoader getUserCodeClassLoader() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<String, Accumulator<?, ?>> getAllAccumulators() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IntCounter getIntCounter(String name) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public LongCounter getLongCounter(String name) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DoubleCounter getDoubleCounter(String name) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Histogram getHistogram(String name) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <RT> List<RT> getBroadcastVariable(String name) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DistributedCache getDistributedCache() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
deleted file mode 100644
index e105e01..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-
-import java.util.HashSet;
-import java.util.Set;
-
-
-public class PartitionValidatingMapper implements MapFunction<Integer, Integer> {
-
- private static final long serialVersionUID = 1088381231244959088L;
-
- /* the partitions from which this function received data */
- private final Set<Integer> myPartitions = new HashSet<>();
-
- private final int numPartitions;
- private final int maxPartitions;
-
- public PartitionValidatingMapper(int numPartitions, int maxPartitions) {
- this.numPartitions = numPartitions;
- this.maxPartitions = maxPartitions;
- }
-
- @Override
- public Integer map(Integer value) throws Exception {
- // validate that the partitioning is identical
- int partition = value % numPartitions;
- myPartitions.add(partition);
- if (myPartitions.size() > maxPartitions) {
- throw new Exception("Error: Elements from too many different partitions: " + myPartitions
- + ". Expect elements only from " + maxPartitions + " partitions");
- }
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
deleted file mode 100644
index 12e3460..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-/**
- * Exception that is thrown to terminate a program and indicate success.
- */
-public class SuccessException extends Exception {
- private static final long serialVersionUID = -7011865671593955887L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
deleted file mode 100644
index 1d61229..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-
-/**
- * An identity map function that sleeps between elements, throttling the
- * processing speed.
- *
- * @param <T> The type mapped.
- */
-public class ThrottledMapper<T> implements MapFunction<T,T> {
-
- private static final long serialVersionUID = 467008933767159126L;
-
- private final int sleep;
-
- public ThrottledMapper(int sleep) {
- this.sleep = sleep;
- }
-
- @Override
- public T map(T value) throws Exception {
- Thread.sleep(this.sleep);
- return value;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
deleted file mode 100644
index b762e21..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-
-import java.io.Serializable;
-
-/**
- * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions.
- */
-public class Tuple2Partitioner extends KafkaPartitioner implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final int expectedPartitions;
-
-
- public Tuple2Partitioner(int expectedPartitions) {
- this.expectedPartitions = expectedPartitions;
- }
-
- @Override
- public int partition(Object key, int numPartitions) {
- if (numPartitions != expectedPartitions) {
- throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
- }
- @SuppressWarnings("unchecked")
- Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
-
- return element.f0;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
deleted file mode 100644
index f3cc4fa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.BitSet;
-
-public class ValidatingExactlyOnceSink implements SinkFunction<Integer>, Checkpointed<Tuple2<Integer, BitSet>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
-
- private static final long serialVersionUID = 1748426382527469932L;
-
- private final int numElementsTotal;
-
- private BitSet duplicateChecker = new BitSet(); // this is checkpointed
-
- private int numElements; // this is checkpointed
-
-
- public ValidatingExactlyOnceSink(int numElementsTotal) {
- this.numElementsTotal = numElementsTotal;
- }
-
-
- @Override
- public void invoke(Integer value) throws Exception {
- numElements++;
-
- if (duplicateChecker.get(value)) {
- throw new Exception("Received a duplicate");
- }
- duplicateChecker.set(value);
- if (numElements == numElementsTotal) {
- // validate
- if (duplicateChecker.cardinality() != numElementsTotal) {
- throw new Exception("Duplicate checker has wrong cardinality");
- }
- else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
- throw new Exception("Received sparse sequence");
- }
- else {
- throw new SuccessException();
- }
- }
- }
-
- @Override
- public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) {
- LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId);
- return new Tuple2<>(numElements, duplicateChecker);
- }
-
- @Override
- public void restoreState(Tuple2<Integer, BitSet> state) {
- LOG.info("restoring num elements to {}", state.f0);
- this.numElements = state.f0;
- this.duplicateChecker = state.f1;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
deleted file mode 100644
index 6bdfb48..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
- <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
deleted file mode 100644
index 9168822..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
+++ /dev/null
@@ -1,94 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors-parent</artifactId>
- <version>0.10-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-nifi</artifactId>
- <name>flink-connector-nifi</name>
-
- <packaging>jar</packaging>
-
- <!-- Allow users to pass custom connector versions -->
- <properties>
- <nifi.version>0.3.0</nifi.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-site-to-site-client</artifactId>
- <version>${nifi.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <rerunFailingTestsCount>3</rerunFailingTestsCount>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <configuration>
- <rerunFailingTestsCount>3</rerunFailingTestsCount>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
deleted file mode 100644
index c8ceb57..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.nifi;
-
-import java.util.Map;
-
-/**
- * <p>
- * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
- * a FlowFile's content and its attributes so that they can be processed by Flink.
- * </p>
- */
-public interface NiFiDataPacket {
-
- /**
- * @return the contents of a NiFi FlowFile
- */
- byte[] getContent();
-
- /**
- * @return a Map of attributes that are associated with the NiFi FlowFile
- */
- Map<String, String> getAttributes();
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
deleted file mode 100644
index 9bb521b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.nifi;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-import java.io.Serializable;
-
-/**
- * A function that can create a NiFiDataPacket from an incoming instance of the given type.
- *
- * @param <T>
- */
-public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
-
- NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
deleted file mode 100644
index abc6b35..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.nifi;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-
-/**
- * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client. The sink requires
- * a NiFiDataPacketBuilder which can create instances of NiFiDataPacket from the incoming data.
- */
-public class NiFiSink<T> extends RichSinkFunction<T> {
-
- private SiteToSiteClient client;
- private SiteToSiteClientConfig clientConfig;
- private NiFiDataPacketBuilder<T> builder;
-
- /**
- * Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
- *
- * @param clientConfig the configuration for building a NiFi SiteToSiteClient
- * @param builder a builder to produce NiFiDataPackets from incoming data
- */
- public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder) {
- this.clientConfig = clientConfig;
- this.builder = builder;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
- }
-
- @Override
- public void invoke(T value) throws Exception {
- final NiFiDataPacket niFiDataPacket = builder.createNiFiDataPacket(value, getRuntimeContext());
-
- final Transaction transaction = client.createTransaction(TransferDirection.SEND);
- if (transaction == null) {
- throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
- }
-
- transaction.send(niFiDataPacket.getContent(), niFiDataPacket.getAttributes());
- transaction.confirm();
- transaction.complete();
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- client.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
deleted file mode 100644
index a213bb4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.nifi;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.protocol.DataPacket;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source
- * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile.
- */
-public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
-
- private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class);
-
- private static final long DEFAULT_WAIT_TIME_MS = 1000;
-
- private long waitTimeMs;
- private SiteToSiteClient client;
- private SiteToSiteClientConfig clientConfig;
- private transient volatile boolean running;
-
- /**
- * Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
- *
- * @param clientConfig the configuration for building a NiFi SiteToSiteClient
- */
- public NiFiSource(SiteToSiteClientConfig clientConfig) {
- this(clientConfig, DEFAULT_WAIT_TIME_MS);
- }
-
- /**
- * Constructs a new NiFiSource using the given client config and wait time.
- *
- * @param clientConfig the configuration for building a NiFi SiteToSiteClient
- * @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to pull from NiFi
- */
- public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs) {
- this.clientConfig = clientConfig;
- this.waitTimeMs = waitTimeMs;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
- running = true;
- }
-
- @Override
- public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
- try {
- while (running) {
- final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
- if (transaction == null) {
- LOG.warn("A transaction could not be created, waiting and will try again...");
- try {
- Thread.sleep(waitTimeMs);
- } catch (InterruptedException e) {
-
- }
- continue;
- }
-
- DataPacket dataPacket = transaction.receive();
- if (dataPacket == null) {
- transaction.confirm();
- transaction.complete();
-
- LOG.debug("No data available to pull, waiting and will try again...");
- try {
- Thread.sleep(waitTimeMs);
- } catch (InterruptedException e) {
-
- }
- continue;
- }
-
- final List<NiFiDataPacket> niFiDataPackets = new ArrayList<>();
- do {
- // Read the data into a byte array and wrap it along with the attributes
- // into a NiFiDataPacket.
- final InputStream inStream = dataPacket.getData();
- final byte[] data = new byte[(int) dataPacket.getSize()];
- StreamUtils.fillBuffer(inStream, data);
-
- final Map<String, String> attributes = dataPacket.getAttributes();
-
- niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes));
- dataPacket = transaction.receive();
- } while (dataPacket != null);
-
- // Confirm transaction to verify the data
- transaction.confirm();
-
- for (NiFiDataPacket dp : niFiDataPackets) {
- ctx.collect(dp);
- }
-
- transaction.complete();
- }
- } finally {
- ctx.close();
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- client.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
deleted file mode 100644
index 5ad4bae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.nifi;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * An implementation of NiFiDataPacket.
- */
-public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
- private static final long serialVersionUID = 6364005260220243322L;
-
- private final byte[] content;
- private final Map<String, String> attributes;
-
- public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) {
- this.content = content;
- this.attributes = attributes;
- }
-
- @Override
- public byte[] getContent() {
- return content;
- }
-
- @Override
- public Map<String, String> getAttributes() {
- return attributes;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
deleted file mode 100644
index 572f949..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.connectors.nifi.examples;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
-import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
-import org.apache.flink.streaming.connectors.nifi.NiFiSink;
-import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-
-import java.util.HashMap;
-
-/**
- * An example topology that sends data to a NiFi input port named "Data from Flink".
- */
-public class NiFiSinkTopologyExample {
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("Data from Flink")
- .buildConfig();
-
- DataStreamSink<String> dataStream = env.fromElements("one", "two", "three", "four", "five", "q")
- .addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() {
- @Override
- public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
- return new StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>());
- }
- }));
-
- env.execute();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
deleted file mode 100644
index 79c9a1c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.connectors.nifi.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
-import org.apache.flink.streaming.connectors.nifi.NiFiSource;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-
-import java.nio.charset.Charset;
-
-/**
- * An example topology that pulls data from a NiFi output port named "Data for Flink".
- */
-public class NiFiSourceTopologyExample {
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("Data for Flink")
- .requestBatchCount(5)
- .buildConfig();
-
- SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
- DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);
-
- DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
- @Override
- public String map(NiFiDataPacket value) throws Exception {
- return new String(value.getContent(), Charset.defaultCharset());
- }
- });
-
- dataStream.print();
- env.execute();
- }
-
-}