You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/28 15:36:30 UTC
[4/7] flink git commit: [FLINK-1874] [streaming] Connectors separated
into individual projects
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/KafkaITCase.java
new file mode 100644
index 0000000..7b7bdcc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/KafkaITCase.java
@@ -0,0 +1,1023 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+
+import kafka.admin.AdminUtils;
+import kafka.api.PartitionMetadata;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.network.SocketServer;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
+import org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource;
+import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.StringUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import scala.collection.Seq;
+
+/**
+ * Code in this test is based on the following GitHub repository:
+ * (as per commit bc6b2b2d5f6424d5f377aa6c0871e82a956462ef)
+ * <p/>
+ * https://github.com/sakserv/hadoop-mini-clusters (ASL licensed)
+ */
+
+public class KafkaITCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);
+ private static final int NUMBER_OF_KAFKA_SERVERS = 3;
+
+ private static int zkPort;
+ private static String kafkaHost;
+
+ private static String zookeeperConnectionString;
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+ public static File tmpZkDir;
+ public static List<File> tmpKafkaDirs;
+
+ private static TestingServer zookeeper;
+ private static List<KafkaServer> brokers;
+ private static String brokerConnectionStrings = "";
+
+ private static ConsumerConfig standardCC;
+
+ private static ZkClient zkClient;
+
+
+ @BeforeClass
+ public static void prepare() throws IOException {
+ LOG.info("Starting KafkaITCase.prepare()");
+ tmpZkDir = tempFolder.newFolder();
+
+ tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_KAFKA_SERVERS);
+ for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
+ tmpKafkaDirs.add(tempFolder.newFolder());
+ }
+
+ kafkaHost = InetAddress.getLocalHost().getHostName();
+ zkPort = NetUtils.getAvailablePort();
+ zookeeperConnectionString = "localhost:" + zkPort;
+
+ zookeeper = null;
+ brokers = null;
+
+ try {
+ LOG.info("Starting Zookeeper");
+ zookeeper = getZookeeper();
+ LOG.info("Starting KafkaServer");
+ brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
+ for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
+ brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+ SocketServer socketServer = brokers.get(i).socketServer();
+ String host = "localhost";
+ if(socketServer.host() != null) {
+ host = socketServer.host();
+ }
+ brokerConnectionStrings += host+":"+socketServer.port()+",";
+ }
+
+ LOG.info("ZK and KafkaServer started.");
+ } catch (Throwable t) {
+ LOG.warn("Test failed with exception", t);
+ Assert.fail("Test failed with: " + t.getMessage());
+ }
+
+ Properties cProps = new Properties();
+ cProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+ cProps.setProperty("group.id", "flink-tests");
+ cProps.setProperty("auto.commit.enable", "false");
+
+ cProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning.
+
+ standardCC = new ConsumerConfig(cProps);
+
+ zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+ }
+
+ @AfterClass
+ public static void shutDownServices() {
+ LOG.info("Shutting down all services");
+ for (KafkaServer broker : brokers) {
+ if (broker != null) {
+ broker.shutdown();
+ }
+ }
+ if (zookeeper != null) {
+ try {
+ zookeeper.stop();
+ } catch (IOException e) {
+ LOG.warn("ZK.stop() failed", e);
+ }
+ }
+ zkClient.close();
+ }
+
+
+ @Test
+ public void testOffsetManipulation() {
+ ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+
+ final String topicName = "testOffsetManipulation";
+
+ // create topic
+ Properties topicConfig = new Properties();
+ LOG.info("Creating topic {}", topicName);
+ AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
+
+ PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 1337);
+
+ Assert.assertEquals(1337L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
+
+ zk.close();
+ }
+ /**
+ * We want to use the High level java consumer API but manage the offset in Zookeeper manually.
+ *
+ */
+ @Test
+ public void testPersistentSourceWithOffsetUpdates() throws Exception {
+ LOG.info("Starting testPersistentSourceWithOffsetUpdates()");
+
+ ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+
+ final String topicName = "testOffsetHacking";
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3);
+ env.getConfig().disableSysoutLogging();
+ env.enableCheckpointing(50);
+ env.setNumberOfExecutionRetries(0);
+
+ // create topic
+ Properties topicConfig = new Properties();
+ LOG.info("Creating topic {}", topicName);
+ AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
+
+ // write a sequence from 0 to 99 to each of the three partitions.
+ writeSequence(env, topicName, 0, 99);
+
+
+ readSequence(env, standardCC, topicName, 0, 100, 300);
+
+ // check offsets to be set at least higher than 50.
+ // correctly, we would expect them to be set to 99, but right now there is no way of stopping a topology once all pending
+ // checkpoints have been committed.
+ // To work around that limitation, the persistent kafka consumer is throtteled with a thread.sleep().
+ long o1 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0);
+ long o2 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1);
+ long o3 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2);
+ Assert.assertTrue("The offset seems incorrect, got "+o1, o1 > 50L);
+ Assert.assertTrue("The offset seems incorrect, got "+o2, o2 > 50L);
+ Assert.assertTrue("The offset seems incorrect, got "+o3, o3 > 50L);
+ /** Once we have proper shutdown of streaming jobs, enable these tests
+ Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
+ Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1));
+ Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2));*/
+
+
+ LOG.info("Manipulating offsets");
+ // set the offset to 25, 50, and 75 for the three partitions
+ PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 50);
+ PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 1, 50);
+ PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 2, 50);
+
+ // create new env
+ env = StreamExecutionEnvironment.createLocalEnvironment(3);
+ env.getConfig().disableSysoutLogging();
+ readSequence(env, standardCC, topicName, 50, 50, 150);
+
+ zk.close();
+
+ LOG.info("Finished testPersistentSourceWithOffsetUpdates()");
+ }
+
+ private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, final String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception {
+ LOG.info("Reading sequence for verification until final count {}", finalCount);
+ DataStream<Tuple2<Integer, Integer>> source = env.addSource(
+ new PersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), env.getConfig()), cc)
+ )
+ //add a sleeper mapper. Since there is no good way of "shutting down" a running topology, we have
+ // to play this trick. The problem is that we have to wait until all checkpoints are confirmed
+ .map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
+ @Override
+ public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+ Thread.sleep(150);
+ return value;
+ }
+ }).setParallelism(3);
+
+ // verify data
+ DataStream<Integer> validIndexes = source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
+ int[] values = new int[valuesCount];
+ int count = 0;
+
+ @Override
+ public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
+ values[value.f1 - valuesStartFrom]++;
+ count++;
+
+ LOG.info("Reader " + getRuntimeContext().getIndexOfThisSubtask() + " got " + value + " count=" + count + "/" + finalCount);
+ // verify if we've seen everything
+
+ if (count == finalCount) {
+ LOG.info("Received all values");
+ for (int i = 0; i < values.length; i++) {
+ int v = values[i];
+ if (v != 3) {
+ LOG.warn("Test is going to fail");
+ printTopic(topicName, valuesCount, this.getRuntimeContext().getExecutionConfig());
+ throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
+ }
+ }
+ // test has passed
+ throw new SuccessException();
+ }
+ }
+
+ }).setParallelism(1);
+
+ tryExecute(env, "Read data from Kafka");
+
+ LOG.info("Successfully read sequence for verification");
+ }
+
+
+
+ private void writeSequence(StreamExecutionEnvironment env, String topicName, final int from, final int to) throws Exception {
+ LOG.info("Writing sequence from {} to {} to topic {}", from, to, topicName);
+
+ DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+ private static final long serialVersionUID = 1L;
+ int cnt = from;
+ int partition;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ partition = getRuntimeContext().getIndexOfThisSubtask();
+
+ }
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return cnt > to;
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> next() throws Exception {
+ LOG.info("Writing " + cnt + " to partition " + partition);
+ Tuple2<Integer, Integer> result = new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), cnt);
+ cnt++;
+ return result;
+ }
+ }).setParallelism(3);
+
+ stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
+ topicName,
+ new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1, 1), env.getConfig()),
+ new T2Partitioner()
+ )).setParallelism(3);
+ env.execute("Write sequence from " + from + " to " + to + " to topic " + topicName);
+ LOG.info("Finished writing sequence");
+ }
+
+ private static class T2Partitioner implements SerializableKafkaPartitioner {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ if(numPartitions != 3) {
+ throw new IllegalArgumentException("Expected three partitions");
+ }
+ Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
+ return element.f0;
+ }
+ }
+
+
+ @Test
+ public void regularKafkaSourceTest() throws Exception {
+ LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
+
+ String topic = "regularKafkaSourceTestTopic";
+ createTestTopic(topic, 1, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ // add consuming topology:
+ DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+ new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), 5000));
+ consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+ int elCnt = 0;
+ int start = -1;
+ BitSet validator = new BitSet(101);
+
+ @Override
+ public void invoke(Tuple2<Long, String> value) throws Exception {
+ LOG.debug("Got value = " + value);
+ String[] sp = value.f1.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ assertEquals(value.f0 - 1000, (long) v);
+
+ if (start == -1) {
+ start = v;
+ }
+ Assert.assertFalse("Received tuple twice", validator.get(v - start));
+ validator.set(v - start);
+ elCnt++;
+ if (elCnt == 100) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != 100) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+ });
+
+ // add producing topology
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+ private static final long serialVersionUID = 1L;
+ int cnt = 0;
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
+ }
+
+ @Override
+ public Tuple2<Long, String> next() throws Exception {
+ Thread.sleep(100);
+ return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
+ }
+ });
+ stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
+
+ tryExecute(env, "regular kafka source test");
+
+ LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
+ }
+
+ @Test
+ public void tupleTestTopology() throws Exception {
+ LOG.info("Starting KafkaITCase.tupleTestTopology()");
+
+ String topic = "tupleTestTopic";
+ createTestTopic(topic, 1, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ // add consuming topology:
+ DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+ new PersistentKafkaSource<Tuple2<Long, String>>(topic,
+ new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
+ standardCC
+ ));
+ consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
+ int elCnt = 0;
+ int start = -1;
+ BitSet validator = new BitSet(101);
+
+ @Override
+ public void invoke(Tuple2<Long, String> value) throws Exception {
+ LOG.info("Got value " + value);
+ String[] sp = value.f1.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ assertEquals(value.f0 - 1000, (long) v);
+
+ if (start == -1) {
+ start = v;
+ }
+ Assert.assertFalse("Received tuple twice", validator.get(v - start));
+ validator.set(v - start);
+ elCnt++;
+ if (elCnt == 100) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != 100) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ Assert.assertTrue("No element received", elCnt > 0);
+ }
+ });
+
+ // add producing topology
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+ private static final long serialVersionUID = 1L;
+ int cnt = 0;
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
+ }
+
+ @Override
+ public Tuple2<Long, String> next() throws Exception {
+ Thread.sleep(100);
+ return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
+ }
+ });
+ stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
+
+ tryExecute(env, "tupletesttopology");
+
+ LOG.info("Finished KafkaITCase.tupleTestTopology()");
+ }
+
+ /**
+ * Test Flink's Kafka integration also with very big records (30MB)
+ *
+ * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
+ *
+ * @throws Exception
+ */
+ @Test
+ public void bigRecordTestTopology() throws Exception {
+
+ LOG.info("Starting KafkaITCase.bigRecordTestTopology()");
+
+ String topic = "bigRecordTestTopic";
+ createTestTopic(topic, 1, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ // add consuming topology:
+ Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema = new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig());
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 30));
+ consumerProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+ consumerProps.setProperty("group.id", "test");
+ consumerProps.setProperty("auto.commit.enable", "false");
+ consumerProps.setProperty("auto.offset.reset", "smallest");
+
+ ConsumerConfig cc = new ConsumerConfig(consumerProps);
+ DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(
+ new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, cc));
+
+ consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
+ int elCnt = 0;
+
+ @Override
+ public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+ LOG.info("Received {}", value.f0);
+ elCnt++;
+ if(value.f0 == -1) {
+ // we should have seen 11 elements now.
+ if(elCnt == 11) {
+ throw new SuccessException();
+ } else {
+ throw new RuntimeException("There have been "+elCnt+" elements");
+ }
+ }
+ if(elCnt > 10) {
+ throw new RuntimeException("More than 10 elements seen: "+elCnt);
+ }
+ }
+ }).setParallelism(1);
+
+ // add producing topology
+ DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
+ private static final long serialVersionUID = 1L;
+ boolean running = true;
+ long cnt;
+ transient Random rnd;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ cnt = 0;
+ rnd = new Random(1337);
+
+ }
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return cnt > 10;
+ }
+
+ @Override
+ public Tuple2<Long, byte[]> next() throws Exception {
+ Thread.sleep(100);
+
+ if (cnt < 10) {
+ byte[] wl = new byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))];
+ Tuple2<Long, byte[]> result = new Tuple2<Long, byte[]>(cnt++, wl);
+ LOG.info("Emitted cnt=" + (cnt - 1) + " with byte.length = " + wl.length);
+ return result;
+
+ } else if (cnt == 10) {
+ Tuple2<Long, byte[]> result = new Tuple2<Long, byte[]>(-1L, new byte[]{1});
+ cnt++;
+ return result;
+ } else {
+ throw new RuntimeException("Source is exhausted.");
+ }
+ }
+ });
+
+ stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
+ new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig()))
+ );
+
+ tryExecute(env, "big topology test");
+
+ LOG.info("Finished KafkaITCase.bigRecordTestTopology()");
+ }
+
+
+ @Test
+ public void customPartitioningTestTopology() throws Exception {
+ LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
+
+ String topic = "customPartitioningTestTopic";
+
+ createTestTopic(topic, 3, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ // add consuming topology:
+ DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+ new PersistentKafkaSource<Tuple2<Long, String>>(topic,
+ new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
+ standardCC));
+ consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+ int start = -1;
+ BitSet validator = new BitSet(101);
+
+ boolean gotPartition1 = false;
+ boolean gotPartition2 = false;
+ boolean gotPartition3 = false;
+
+ @Override
+ public void invoke(Tuple2<Long, String> value) throws Exception {
+ LOG.debug("Got " + value);
+ String[] sp = value.f1.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ assertEquals(value.f0 - 1000, (long) v);
+
+ switch (v) {
+ case 9:
+ gotPartition1 = true;
+ break;
+ case 19:
+ gotPartition2 = true;
+ break;
+ case 99:
+ gotPartition3 = true;
+ break;
+ }
+
+ if (start == -1) {
+ start = v;
+ }
+ Assert.assertFalse("Received tuple twice", validator.get(v - start));
+ validator.set(v - start);
+
+ if (gotPartition1 && gotPartition2 && gotPartition3) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != 100) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+ });
+
+ // add producing topology
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+ private static final long serialVersionUID = 1L;
+ int cnt = 0;
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
+ }
+
+ @Override
+ public Tuple2<Long, String> next() throws Exception {
+ Thread.sleep(100);
+ return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
+ }
+ });
+ stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), new CustomPartitioner()));
+
+ tryExecute(env, "custom partitioning test");
+
+ LOG.info("Finished KafkaITCase.customPartitioningTestTopology()");
+ }
+
+ /**
+ * This is for a topic with 3 partitions and Tuple2<Long, String>
+ */
+ private static class CustomPartitioner implements SerializableKafkaPartitioner {
+
+ @Override
+ public int partition(Object key, int numPartitions) {
+
+ @SuppressWarnings("unchecked")
+ Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
+ if (tuple.f0 < 10) {
+ return 0;
+ } else if (tuple.f0 < 20) {
+ return 1;
+ } else {
+ return 2;
+ }
+ }
+ }
+
+
+ @Test
+ public void simpleTestTopology() throws Exception {
+ String topic = "simpleTestTopic";
+
+ createTestTopic(topic, 1, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ // add consuming topology:
+ DataStreamSource<String> consuming = env.addSource(
+ new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
+ consuming.addSink(new SinkFunction<String>() {
+ int elCnt = 0;
+ int start = -1;
+ BitSet validator = new BitSet(101);
+
+ @Override
+ public void invoke(String value) throws Exception {
+ LOG.debug("Got " + value);
+ String[] sp = value.split("-");
+ int v = Integer.parseInt(sp[1]);
+ if (start == -1) {
+ start = v;
+ }
+ Assert.assertFalse("Received tuple twice", validator.get(v - start));
+ validator.set(v - start);
+ elCnt++;
+ if (elCnt == 100) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != 100) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+ });
+
+ // add producing topology
+ DataStream<String> stream = env.addSource(new SourceFunction<String>() {
+ private static final long serialVersionUID = 1L;
+ boolean running = true;
+ int cnt = 0;
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
+ }
+
+ @Override
+ public String next() throws Exception {
+ Thread.sleep(100);
+ return "kafka-" + cnt++;
+ }
+ });
+ stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()));
+
+ tryExecute(env, "simpletest");
+ }
+
+ private static boolean leaderHasShutDown = false;
+ private static boolean shutdownKafkaBroker;
+
+ @Test(timeout=60000)
+ public void brokerFailureTest() throws Exception {
+ String topic = "brokerFailureTestTopic";
+
+ createTestTopic(topic, 2, 2);
+
+ // --------------------------- write data to topic ---------------------
+ LOG.info("Writing data to topic {}", topic);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ DataStream<String> stream = env.addSource(new SourceFunction<String>() {
+
+ private int cnt = 0;
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return cnt == 200;
+ }
+
+ @Override
+ public String next() throws Exception {
+ String msg = "kafka-" + cnt++;
+ LOG.info("sending message = "+msg);
+
+ if ((cnt - 1) % 20 == 0) {
+ LOG.debug("Sending message #{}", cnt - 1);
+ }
+
+ return msg;
+ }
+
+ });
+ stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()))
+ .setParallelism(1);
+
+ tryExecute(env, "broker failure test - writer");
+
+ // --------------------------- read and let broker fail ---------------------
+
+ LOG.info("Reading data from topic {} and let a broker fail", topic);
+ PartitionMetadata firstPart = null;
+ do {
+ if(firstPart != null) {
+ LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+ // not the first try. Sleep a bit
+ Thread.sleep(150);
+ }
+ Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
+ firstPart = partitionMetadata.head();
+ } while(firstPart.errorCode() != 0);
+
+ final String leaderToShutDown = firstPart.leader().get().connectionString();
+ LOG.info("Leader to shutdown {}", leaderToShutDown);
+
+ final Thread brokerShutdown = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ shutdownKafkaBroker = false;
+ while (!shutdownKafkaBroker) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOG.warn("Interruption", e);
+ }
+ }
+
+ for (KafkaServer kafkaServer : brokers) {
+ if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
+ LOG.info("Killing Kafka Server {}", leaderToShutDown);
+ kafkaServer.shutdown();
+ leaderHasShutDown = true;
+ break;
+ }
+ }
+ }
+ });
+ brokerShutdown.start();
+
+ // add consuming topology:
+ DataStreamSource<String> consuming = env.addSource(new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
+ consuming.setParallelism(1);
+
+ consuming.addSink(new SinkFunction<String>() {
+ int elCnt = 0;
+ int start = 0;
+ int numOfMessagesToBeCorrect = 100;
+ int stopAfterMessages = 150;
+
+ BitSet validator = new BitSet(numOfMessagesToBeCorrect + 1);
+
+ @Override
+ public void invoke(String value) throws Exception {
+ LOG.info("Got message = " + value + " leader has shut down " + leaderHasShutDown + " el cnt = " + elCnt + " to rec" + numOfMessagesToBeCorrect);
+ String[] sp = value.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ if (start == -1) {
+ start = v;
+ }
+ int offset = v - start;
+ Assert.assertFalse("Received tuple with value " + offset + " twice", validator.get(offset));
+ if (v - start < 0 && LOG.isWarnEnabled()) {
+ LOG.warn("Not in order: {}", value);
+ }
+
+ validator.set(offset);
+ elCnt++;
+ if (elCnt == 20) {
+ LOG.info("Asking leading broker to shut down");
+ // shut down a Kafka broker
+ shutdownKafkaBroker = true;
+ }
+ if (shutdownKafkaBroker) {
+ // we become a bit slower because the shutdown takes some time and we have
+ // only a fixed nubmer of elements to read
+ Thread.sleep(20);
+ }
+ if (leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
+ if (elCnt >= stopAfterMessages) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) < numOfMessagesToBeCorrect) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements to be checked. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+ }
+ });
+ tryExecute(env, "broker failure test - reader");
+
+ }
+
+ public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
+ try {
+ see.execute(name);
+ } catch (JobExecutionException good) {
+ Throwable t = good.getCause();
+ int limit = 0;
+ while (!(t instanceof SuccessException)) {
+ if(t == null) {
+ LOG.warn("Test failed with exception", good);
+ Assert.fail("Test failed with: " + StringUtils.stringifyException(good));
+ }
+
+ t = t.getCause();
+ if (limit++ == 20) {
+ LOG.warn("Test failed with exception", good);
+ Assert.fail("Test failed with: " + StringUtils.stringifyException(good));
+ }
+ }
+ }
+ }
+
+ private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
+ // create topic
+ Properties topicConfig = new Properties();
+ LOG.info("Creating topic {}", topic);
+ AdminUtils.createTopic(zkClient, topic, numberOfPartitions, replicationFactor, topicConfig);
+ }
+
+ private static TestingServer getZookeeper() throws Exception {
+ return new TestingServer(zkPort, tmpZkDir);
+ }
+
+ /**
+ * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+ */
+ private static KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws UnknownHostException {
+ Properties kafkaProperties = new Properties();
+
+ int kafkaPort = NetUtils.getAvailablePort();
+
+ // properties have to be Strings
+ kafkaProperties.put("advertised.host.name", kafkaHost);
+ kafkaProperties.put("port", Integer.toString(kafkaPort));
+ kafkaProperties.put("broker.id", Integer.toString(brokerId));
+ kafkaProperties.put("log.dir", tmpFolder.toString());
+ kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+ kafkaProperties.put("message.max.bytes", "" + (35 * 1024 * 1024));
+ kafkaProperties.put("replica.fetch.max.bytes", "" + (35 * 1024 * 1024));
+ KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+ KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
+ server.startup();
+ return server;
+ }
+
+ public static class SuccessException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+
+
+ // ----------------------- Debugging utilities --------------------
+
+ /**
+ * Read topic to list, only using Kafka code.
+ * @return
+ */
+ private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
+ ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
+ // we request only one stream per consumer instance. Kafka will make sure that each consumer group
+ // will see each message only once.
+ Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+ Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
+ if(streams.size() != 1) {
+ throw new RuntimeException("Expected only one message stream but got "+streams.size());
+ }
+ List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+ if(kafkaStreams == null) {
+ throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+ }
+ if(kafkaStreams.size() != 1) {
+ throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+ }
+ LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
+ ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
+
+ List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
+ int read = 0;
+ while(iteratorToRead.hasNext()) {
+ read++;
+ result.add(iteratorToRead.next());
+ if(read == stopAfter) {
+ LOG.info("Read "+read+" elements");
+ return result;
+ }
+ }
+ return result;
+ }
+
+ private static void printTopic(String topicName, ConsumerConfig config, DeserializationSchema deserializationSchema, int stopAfter){
+ List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
+ LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
+ for(MessageAndMetadata<byte[], byte[]> message: contents) {
+ Object out = deserializationSchema.deserialize(message.message());
+ LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
+ }
+ }
+
+ private static void printTopic(String topicName, int elements, ExecutionConfig ec) {
+ // write the sequence to log for debugging purposes
+ Properties stdProps = standardCC.props().props();
+ Properties newProps = new Properties(stdProps);
+ newProps.setProperty("group.id", "topic-printer"+UUID.randomUUID().toString());
+ newProps.setProperty("auto.offset.reset", "smallest");
+ newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
+
+ ConsumerConfig printerConfig = new ConsumerConfig(newProps);
+ DeserializationSchema deserializer = new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), ec);
+ printTopic(topicName, printerConfig, deserializer, elements);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/util/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/util/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/util/KafkaLocalSystemTime.java
new file mode 100644
index 0000000..18fa46f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/util/KafkaLocalSystemTime.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.utils.Time;
+
+public class KafkaLocalSystemTime implements Time {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
+
+ @Override
+ public long milliseconds() {
+ return System.currentTimeMillis();
+ }
+
+ public long nanoseconds() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ LOG.warn("Interruption", e);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..9ede613
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
new file mode 100644
index 0000000..ebb0af9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-connectors-parent</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-rabbitmq</artifactId>
+ <name>flink-connector-rabbitmq</name>
+
+ <packaging>jar</packaging>
+
+ <!-- Allow users to pass custom connector versions -->
+ <properties>
+ <rabbitmq.version>3.3.1</rabbitmq.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>${rabbitmq.version}</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
new file mode 100644
index 0000000..fa729d6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import java.io.IOException;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public class RMQSink<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
+
+ private String QUEUE_NAME;
+ private String HOST_NAME;
+ private transient ConnectionFactory factory;
+ private transient Connection connection;
+ private transient Channel channel;
+ private SerializationSchema<IN, byte[]> schema;
+
+ public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]> schema) {
+ this.HOST_NAME = HOST_NAME;
+ this.QUEUE_NAME = QUEUE_NAME;
+ this.schema = schema;
+ }
+
+ /**
+ * Initializes the connection to RMQ.
+ */
+ public void initializeConnection() {
+ factory = new ConnectionFactory();
+ factory.setHost(HOST_NAME);
+ try {
+ connection = factory.newConnection();
+ channel = connection.createChannel();
+ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to RMQ.
+ *
+ * @param value
+ * The incoming data
+ */
+ @Override
+ public void invoke(IN value) {
+ try {
+ byte[] msg = schema.serialize(value);
+
+ channel.basicPublish("", QUEUE_NAME, null, msg);
+
+ } catch (IOException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+ }
+ }
+
+ }
+
+ /**
+ * Closes the connection.
+ */
+ private void closeChannel() {
+ try {
+ channel.close();
+ connection.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
+ + " at " + HOST_NAME, e);
+ }
+
+ }
+
+ @Override
+ public void open(Configuration config) {
+ initializeConnection();
+ }
+
+ @Override
+ public void close() {
+ closeChannel();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
new file mode 100644
index 0000000..a4c833e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import java.io.IOException;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.ConnectorSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+
+public class RMQSource<OUT> extends ConnectorSource<OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
+
+ private final String QUEUE_NAME;
+ private final String HOST_NAME;
+
+ private transient ConnectionFactory factory;
+ private transient Connection connection;
+ private transient Channel channel;
+ private transient QueueingConsumer consumer;
+ private transient QueueingConsumer.Delivery delivery;
+
+ private volatile boolean isRunning = false;
+
+ OUT out;
+
+ public RMQSource(String HOST_NAME, String QUEUE_NAME,
+ DeserializationSchema<OUT> deserializationSchema) {
+ super(deserializationSchema);
+ this.HOST_NAME = HOST_NAME;
+ this.QUEUE_NAME = QUEUE_NAME;
+ }
+
+ /**
+ * Initializes the connection to RMQ.
+ */
+ private void initializeConnection() {
+ factory = new ConnectionFactory();
+ factory.setHost(HOST_NAME);
+ try {
+ connection = factory.newConnection();
+ channel = connection.createChannel();
+ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+ consumer = new QueueingConsumer(channel);
+ channel.basicConsume(QUEUE_NAME, true, consumer);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot create RMQ connection with " + QUEUE_NAME + " at "
+ + HOST_NAME, e);
+ }
+ }
+
+ @Override
+ public void open(Configuration config) throws Exception {
+ initializeConnection();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ try {
+ connection.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
+ + " at " + HOST_NAME, e);
+ }
+ }
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ if (out != null) {
+ return true;
+ }
+ try {
+ delivery = consumer.nextDelivery();
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+ }
+ }
+
+ out = schema.deserialize(delivery.getBody());
+ if (schema.isEndOfStream(out)) {
+ out = null;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public OUT next() throws Exception {
+ if (out != null) {
+ OUT result = out;
+ out = null;
+ return result;
+ }
+
+ try {
+ delivery = consumer.nextDelivery();
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+ }
+ }
+
+ out = schema.deserialize(delivery.getBody());
+ if (schema.isEndOfStream(out)) {
+ throw new RuntimeException("RMQ source is at end.");
+ }
+ OUT result = out;
+ out = null;
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
new file mode 100644
index 0000000..0f06235
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+public class RMQTopology {
+
+ public static void main(String[] args) throws Exception {
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ @SuppressWarnings("unused")
+ DataStream<String> dataStream1 = env.addSource(
+ new RMQSource<String>("localhost", "hello", new SimpleStringSchema())).print();
+
+ @SuppressWarnings("unused")
+ DataStream<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
+ "q").addSink(
+ new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
+
+ env.execute();
+ }
+
+ public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public byte[] serialize(String element) {
+ return element.getBytes();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/pom.xml
new file mode 100644
index 0000000..871741e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-connectors-parent</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-twitter</artifactId>
+ <name>flink-connector-twitter</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>hbc-core</artifactId>
+ <version>2.2.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.json</artifactId>
+ <version>2.0.6</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+ <!-- We include all dependencies that transitively depend on guava -->
+ <include>com.twitter:hbc-core</include>
+ <include>com.twitter:joauth</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
new file mode 100644
index 0000000..0f16541
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+ * Abstract class derived from {@link RichFlatMapFunction} to handle JSON files.
+ *
+ * @param <IN>
+ * Type of the input elements.
+ * @param <OUT>
+ * Type of the returned elements.
+ */
+public abstract class JSONParseFlatMap<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ // private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class);
+
+ /**
+ * Get the value object associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public Object get(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).get("retValue");
+ }
+
+ /**
+ * Get the boolean value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public boolean getBoolean(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getBoolean("retValue");
+ }
+
+ /**
+ * Get the double value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public double getDouble(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getDouble("retValue");
+ }
+
+ /**
+ * Get the int value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public int getInt(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getInt("retValue");
+ }
+
+ /**
+ * Get the long value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public long getLong(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getLong("retValue");
+ }
+
+ /**
+ * Get the String value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public String getString(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getString("retValue");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
new file mode 100644
index 0000000..c1eabbd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+
+/**
+ * A JSONParser contains a JSONObject and provides opportunity to access
+ * embedded fields in JSON code.
+ */
+public class JSONParser {
+
+ private JSONObject originalJO;
+ private String searchedfield;
+ private Object temp;
+
+ /**
+ * Construct a JSONParser from a string. The string has to be a JSON code
+ * from which we want to get a field.
+ *
+ * @param jsonText
+ * A string which contains a JSON code. String representation of
+ * a JSON code.
+ * @throws JSONException
+ * If there is a syntax error in the source string.
+ */
+ public JSONParser(String jsonText) throws JSONException {
+ originalJO = new JSONObject(jsonText);
+ }
+
+ /**
+ *
+ * Parse the JSON code passed to the constructor to find the given key.
+ *
+ * @param key
+ * The key whose value is searched for.
+ * @return A JSONObject which has only one field called "retValue" and the
+ * value associated to it is the searched value. The methods of
+ * JSONObject can be used to get the field value in a desired
+ * format.
+ * @throws JSONException
+ * If the key is not found.
+ */
+ public JSONObject parse(String key) throws JSONException {
+ initializeParser(key);
+ parsing();
+ return putResultInJSONObj();
+ }
+
+ /**
+ * Prepare the fields of the class for the parsing
+ *
+ * @param key
+ * The key whose value is searched for.
+ * @throws JSONException
+ * If the key is not found.
+ */
+ private void initializeParser(String key) throws JSONException {
+ searchedfield = key;
+ temp = new JSONObject(originalJO.toString());
+ }
+
+ /**
+ * This function goes through the given field and calls the appropriate
+ * functions to treat the units between the punctuation marks.
+ *
+ * @throws JSONException
+ * If the key is not found.
+ */
+ private void parsing() throws JSONException {
+ StringTokenizer st = new StringTokenizer(searchedfield, ".");
+ while (st.hasMoreTokens()) {
+ find(st.nextToken());
+ }
+ }
+
+ /**
+ * Search for the next part of the field and update the state if it was
+ * found.
+ *
+ * @param nextToken
+ * The current part of the searched field.
+ * @throws JSONException
+ * If the key is not found.
+ */
+ private void find(String nextToken) throws JSONException {
+ if (endsWithBracket(nextToken)) {
+ treatAllBracket(nextToken);
+ } else {
+ temp = ((JSONObject) temp).get(nextToken);
+ }
+ }
+
+ /**
+ * Determine whether the given string ends with a closing square bracket ']'
+ *
+ * @param nextToken
+ * The current part of the searched field.
+ * @return True if the given string ends with a closing square bracket ']'
+ * and false otherwise.
+ */
+ private boolean endsWithBracket(String nextToken) {
+ return nextToken.substring(nextToken.length() - 1).endsWith("]");
+ }
+
+ /**
+ * Handle (multidimensional) arrays. Treat the square bracket pairs one
+ * after the other if necessary.
+ *
+ * @param nextToken
+ * The current part of the searched field.
+ * @throws JSONException
+ * If the searched element is not found.
+ */
+ private void treatAllBracket(String nextToken) throws JSONException {
+ List<String> list = Arrays.asList(nextToken.split("\\["));
+ ListIterator<String> iter = list.listIterator();
+
+ temp = ((JSONObject) temp).get(iter.next());
+
+ while (iter.hasNext()) {
+ int index = Integer.parseInt(cutBracket(iter.next()));
+ temp = ((JSONArray) temp).get(index);
+ }
+ }
+
+ /**
+ * Remove the last character of the string.
+ *
+ * @param string
+ * String to modify.
+ * @return The given string without the last character.
+ */
+ private String cutBracket(String string) {
+ return string.substring(0, string.length() - 1);
+ }
+
+ /**
+ * Save the result of the search into a JSONObject.
+ *
+ * @return A special JSONObject which contain only one key. The value
+ * associated to this key is the result of the search.
+ * @throws JSONException
+ * If there is a problem creating the JSONObject. (e.g. invalid
+ * syntax)
+ */
+ private JSONObject putResultInJSONObj() throws JSONException {
+ JSONObject jo = new JSONObject();
+ jo.put("retValue", temp);
+ return jo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
new file mode 100644
index 0000000..78e4aa5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import com.twitter.hbc.httpclient.BasicClient;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+
+/**
+ * Implementation of {@link SourceFunction} specialized to emit tweets from
+ * Twitter. It can connect to Twitter Streaming API, collect tweets and
+ */
+public class TwitterSource extends RichParallelSourceFunction<String> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
+
+ private static final long serialVersionUID = 1L;
+ private String authPath;
+ private transient BlockingQueue<String> queue;
+ private int queueSize = 10000;
+ private transient BasicClient client;
+ private int waitSec = 5;
+
+ private int maxNumberOfTweets;
+ private int currentNumberOfTweets;
+
+ private String nextElement = null;
+
+ private volatile boolean isRunning = false;
+
+ /**
+ * Create {@link TwitterSource} for streaming
+ *
+ * @param authPath
+ * Location of the properties file containing the required
+ * authentication information.
+ */
+ public TwitterSource(String authPath) {
+ this.authPath = authPath;
+ maxNumberOfTweets = -1;
+ }
+
+ /**
+ * Create {@link TwitterSource} to collect finite number of tweets
+ *
+ * @param authPath
+ * Location of the properties file containing the required
+ * authentication information.
+ * @param numberOfTweets
+ *
+ */
+ public TwitterSource(String authPath, int numberOfTweets) {
+ this.authPath = authPath;
+ this.maxNumberOfTweets = numberOfTweets;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ initializeConnection();
+ currentNumberOfTweets = 0;
+ }
+
+ /**
+ * Initialize Hosebird Client to be able to consume Twitter's Streaming API
+ */
+ private void initializeConnection() {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Initializing Twitter Streaming API connection");
+ }
+
+ queue = new LinkedBlockingQueue<String>(queueSize);
+
+ StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
+ endpoint.stallWarnings(false);
+
+ Authentication auth = authenticate();
+
+ initializeClient(endpoint, auth);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Twitter Streaming API connection established successfully");
+ }
+ }
+
+ private OAuth1 authenticate() {
+
+ Properties authenticationProperties = loadAuthenticationProperties();
+
+ return new OAuth1(authenticationProperties.getProperty("consumerKey"),
+ authenticationProperties.getProperty("consumerSecret"),
+ authenticationProperties.getProperty("token"),
+ authenticationProperties.getProperty("secret"));
+ }
+
+ /**
+ * Reads the given properties file for the authentication data.
+ *
+ * @return the authentication data.
+ */
+ private Properties loadAuthenticationProperties() {
+ Properties properties = new Properties();
+ try {
+ InputStream input = new FileInputStream(authPath);
+ properties.load(input);
+ input.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot open .properties file: " + authPath, e);
+ }
+ return properties;
+ }
+
+ private void initializeClient(StatusesSampleEndpoint endpoint, Authentication auth) {
+
+ client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.STREAM_HOST)
+ .endpoint(endpoint).authentication(auth)
+ .processor(new StringDelimitedProcessor(queue)).build();
+
+ client.connect();
+ }
+
+ /**
+ * Put tweets into output
+ *
+ * @param collector
+ * Collector in which the tweets are collected.
+ */
+ protected void collectFiniteMessages(Collector<String> collector) {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Collecting tweets");
+ }
+
+ for (int i = 0; i < maxNumberOfTweets; i++) {
+ collectOneMessage(collector);
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Collecting tweets finished");
+ }
+ }
+
+ /**
+ * Put tweets into output
+ *
+ * @param collector
+ * Collector in which the tweets are collected.
+ */
+ protected void collectMessages(Collector<String> collector) {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Tweet-stream begins");
+ }
+
+ while (isRunning) {
+ collectOneMessage(collector);
+ }
+ }
+
+ /**
+ * Put one tweet into the output.
+ *
+ * @param collector
+ * Collector in which the tweets are collected.
+ */
+ protected void collectOneMessage(Collector<String> collector) {
+ if (client.isDone()) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
+ .getMessage());
+ }
+ }
+
+ try {
+ String msg = queue.poll(waitSec, TimeUnit.SECONDS);
+ if (msg != null) {
+ collector.collect(msg);
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Did not receive a message in {} seconds", waitSec);
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("'Waiting for tweet' thread is interrupted", e);
+ }
+
+ }
+
+ private void closeConnection() {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Initiating connection close");
+ }
+
+ client.stop();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Connection closed successfully");
+ }
+ }
+
+ /**
+ * Get the size of the queue in which the tweets are contained temporarily.
+ *
+ * @return the size of the queue in which the tweets are contained
+ * temporarily
+ */
+ public int getQueueSize() {
+ return queueSize;
+ }
+
+ /**
+ * Set the size of the queue in which the tweets are contained temporarily.
+ *
+ * @param queueSize
+ * The desired value.
+ */
+ public void setQueueSize(int queueSize) {
+ this.queueSize = queueSize;
+ }
+
+ /**
+ * This function tells how long TwitterSource waits for the tweets.
+ *
+ * @return Number of second.
+ */
+ public int getWaitSec() {
+ return waitSec;
+ }
+
+ /**
+ * This function sets how long TwitterSource should wait for the tweets.
+ *
+ * @param waitSec
+ * The desired value.
+ */
+ public void setWaitSec(int waitSec) {
+ this.waitSec = waitSec;
+ }
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ if (currentNumberOfTweets >= maxNumberOfTweets) {
+ return false;
+ }
+
+ if (nextElement != null) {
+ return true;
+ }
+ if (client.isDone()) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
+ .getMessage());
+ }
+ return false;
+ }
+
+ try {
+ String msg = queue.poll(waitSec, TimeUnit.SECONDS);
+ if (msg != null) {
+ nextElement = msg;
+ return true;
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Did not receive a message in {} seconds", waitSec);
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("'Waiting for tweet' thread is interrupted", e);
+ }
+ return false;
+ }
+
+ @Override
+ public String next() throws Exception {
+ if (nextElement != null) {
+ String result = nextElement;
+ nextElement = null;
+ return result;
+ }
+ if (reachedEnd()) {
+ throw new RuntimeException("Twitter stream end reached.");
+ } else {
+ String result = nextElement;
+ nextElement = null;
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
new file mode 100644
index 0000000..e500fef
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TwitterStreaming {
+
+ private static final int PARALLELISM = 1;
+ private static final int SOURCE_PARALLELISM = 1;
+ private static final int NUMBEROFTWEETS = 100;
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwitterStreaming.class);
+
+ public static class TwitterSink implements SinkFunction<Tuple5<Long, Integer, String, String, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(Tuple5<Long, Integer, String, String, String> tuple) {
+ System.out.println("ID: " + tuple.f0 + " int: " + tuple.f1 + " LANGUAGE: " + tuple.f2);
+ System.out.println("NAME: " + tuple.f4);
+ System.out.println("TEXT: " + tuple.f3);
+ System.out.println("");
+ }
+
+ }
+
+ public static class SelectDataFlatMap extends
+ JSONParseFlatMap<String, Tuple5<Long, Integer, String, String, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(String value, Collector<Tuple5<Long, Integer, String, String, String>> out)
+ throws Exception {
+ try {
+ out.collect(new Tuple5<Long, Integer, String, String, String>(
+ getLong(value, "id"),
+ getInt(value, "entities.hashtags[0].indices[1]"),
+ getString(value, "lang"),
+ getString(value, "text"),
+ getString(value, "user.name")));
+ } catch (JSONException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Field not found");
+ }
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ String path = new String();
+
+ if (args != null && args.length == 1) {
+ path = args[0];
+ } else {
+ System.err.println("USAGE:\nTwitterStreaming <pathToPropertiesFile>");
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(PARALLELISM);
+
+ DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS))
+ .setParallelism(SOURCE_PARALLELISM);
+
+ DataStream<Tuple5<Long, Integer, String, String, String>> selectedDataStream = streamSource
+ .flatMap(new SelectDataFlatMap());
+
+ selectedDataStream.addSink(new TwitterSink());
+
+ env.execute();
+ }
+}