You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:59 UTC
[22/51] [abbrv] git commit: [streaming] Added Flume connector and
updated connectors
[streaming] Added Flume connector and updated connectors
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a06f9d10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a06f9d10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a06f9d10
Branch: refs/heads/master
Commit: a06f9d10e6ea9f91dc6723af483ba9328d4e1621
Parents: 7ca778b
Author: jfeher <fe...@gmail.com>
Authored: Tue Jul 29 13:51:34 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:21:13 2014 +0200
----------------------------------------------------------------------
.../flink-streaming-connectors/pom.xml | 11 +-
.../streaming/connectors/flume/FlumeSink.java | 129 +++++++++++++++++++
.../streaming/connectors/flume/FlumeSource.java | 114 ++++++++++++++++
.../connectors/flume/FlumeTopology.java | 88 +++++++++++++
.../connectors/kafka/KafkaProducer.java | 75 -----------
.../streaming/connectors/kafka/KafkaSink.java | 24 +++-
.../streaming/connectors/kafka/KafkaSource.java | 34 ++---
.../connectors/kafka/KafkaTopology.java | 38 +++---
.../streaming/connectors/rabbitmq/RMQSink.java | 74 +++++------
.../connectors/rabbitmq/RMQSource.java | 52 ++++----
.../connectors/rabbitmq/RMQTopology.java | 29 ++---
11 files changed, 472 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index ac15d55..8db610f 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -48,14 +48,19 @@ under the License.
<version>0.8.0</version>
</dependency>
-
- <dependency>
+ <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
- <dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>1.5.0</version>
+ </dependency>
+
+ <dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
new file mode 100644
index 0000000..27ce546
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -0,0 +1,129 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+
+public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private transient FlinkRpcClientFacade client;
+ boolean initDone = false;
+ String host;
+ int port;
+ private boolean sendAndClose = false;
+ private boolean closeWithoutSend = false;
+
+ public FlumeSink(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public void invoke(IN tuple) {
+
+ if (!initDone) {
+ client = new FlinkRpcClientFacade();
+ client.init(host, port);
+ }
+
+ byte[] data = serialize(tuple);
+ if(!closeWithoutSend){
+ client.sendDataToFlume(data);
+ }
+ if (sendAndClose) {
+ client.close();
+ }
+
+ }
+
+ public abstract byte[] serialize(IN tuple);
+
+ private class FlinkRpcClientFacade {
+ private RpcClient client;
+ private String hostname;
+ private int port;
+
+ public void init(String hostname, int port) {
+ // Setup the RPC connection
+ this.hostname = hostname;
+ this.port = port;
+ int initCounter = 0;
+ while(true){
+ if(initCounter >= 90){
+ System.exit(1);
+ }
+ try{
+ this.client = RpcClientFactory.getDefaultInstance(hostname, port);
+ }
+ catch(FlumeException e){
+ // Wait one second if the connection failed before the next try
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ }
+ if(client != null){
+ break;
+ }
+ initCounter++;
+ }
+ initDone = true;
+ }
+
+ public void sendDataToFlume(byte[] data) {
+ Event event = EventBuilder.withBody(data);
+
+ try {
+ client.append(event);
+
+ } catch (EventDeliveryException e) {
+ // clean up and recreate the client
+ client.close();
+ client = null;
+ client = RpcClientFactory.getDefaultInstance(hostname, port);
+ }
+ }
+
+ public void close() {
+ client.close();
+ }
+
+ }
+
+ public void sendAndClose() {
+ sendAndClose = true;
+ }
+
+ public void closeWithoutSend(){
+ client.close();
+ closeWithoutSend=true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
new file mode 100644
index 0000000..966c881
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import java.util.List;
+
+import org.apache.flume.Context;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.source.AvroSource;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.Status;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ String host;
+ String port;
+
+ FlumeSource(String host, int port){
+ this.host = host;
+ this.port = Integer.toString(port);
+ }
+
+ public class MyAvroSource extends AvroSource {
+ Collector<IN> collector;
+
+ @Override
+ public Status append(AvroFlumeEvent avroEvent) {
+ collect(avroEvent);
+ return Status.OK;
+ }
+
+ @Override
+ public Status appendBatch(List<AvroFlumeEvent> events) {
+ for (AvroFlumeEvent avroEvent : events) {
+ collect(avroEvent);
+ }
+
+ return Status.OK;
+ }
+
+ private void collect(AvroFlumeEvent avroEvent){
+ byte[] b = avroEvent.getBody().array();
+ IN tuple = FlumeSource.this.deserialize(b);
+ if(!closeWithoutSend){
+ collector.collect(tuple);
+ }
+ if(sendAndClose){
+ sendDone = true;
+ }
+ }
+
+ }
+
+ MyAvroSource avroSource;
+ private volatile boolean closeWithoutSend = false;
+ private boolean sendAndClose = false;
+ private volatile boolean sendDone = false;
+ public abstract IN deserialize(byte[] msg);
+
+ public void configureAvroSource(Collector<IN> collector){
+
+ avroSource = new MyAvroSource();
+ avroSource.collector=collector;
+ Context context = new Context();
+ context.put("port", port);
+ context.put("bind", host);
+ avroSource.configure(context);
+ ChannelProcessor cp = new ChannelProcessor(null);
+ avroSource.setChannelProcessor(cp);
+ }
+
+ @Override
+ public void invoke(Collector<IN> collector) throws Exception {
+ configureAvroSource(collector);
+ avroSource.start();
+ while(true){
+ if(closeWithoutSend || sendDone){
+ break;
+ }
+ }
+ avroSource.stop();
+ }
+
+ public void sendAndClose(){
+ sendAndClose = true;
+ }
+
+ public void closeWithoutSend(){
+ closeWithoutSend = true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
new file mode 100644
index 0000000..e08837a
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.commons.lang.SerializationUtils;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+
+public class FlumeTopology {
+
+ public static class MyFlumeSink extends FlumeSink<Tuple1<String>> {
+ private static final long serialVersionUID = 1L;
+
+ public MyFlumeSink(String host, int port) {
+ super(host, port);
+ }
+
+ @Override
+ public byte[] serialize(Tuple1<String> tuple) {
+ if (tuple.f0.equals("q")) {
+ try {
+ sendAndClose();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ return SerializationUtils.serialize((String) tuple.getField(0));
+ }
+
+ }
+
+ public static class MyFlumeSource extends FlumeSource<Tuple1<String>> {
+ private static final long serialVersionUID = 1L;
+
+ MyFlumeSource(String host, int port) {
+ super(host, port);
+ }
+
+ @Override
+ public Tuple1<String> deserialize(byte[] msg) {
+ String s = (String) SerializationUtils.deserialize(msg);
+ Tuple1<String> out = new Tuple1<String>();
+ out.f0 = s;
+ if (s.equals("q")) {
+ closeWithoutSend();
+ }
+ return out;
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(1);
+
+ @SuppressWarnings("unused")
+ DataStream<Tuple1<String>> dataStream1 = env
+ .addSource(new MyFlumeSource("localhost", 41414))
+ .print();
+
+ @SuppressWarnings("unused")
+ DataStream<Tuple1<String>> dataStream2 = env
+ .fromElements("one", "two", "three", "four", "five", "q")
+ .addSink(new MyFlumeSink("localhost", 42424));
+
+ env.execute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducer.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducer.java
deleted file mode 100644
index 00a8964..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.util.Properties;
-
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
-/**
- * This is a simple kafka producer that reads local file from disk and produces
- * line streams. To use the producer, a zookeeper server and a kafka server
- * should be in service. Run the following script to start a zookeeper server:
- * bin/zookeeper-server-start.sh config/zookeeper.properties Run the following
- * script to start a kafka server: bin/kafka-server-start.sh
- * config/server.properties Run the following script to start the producer: java
- * -cp kafka-0.8/libs/*:yourJarFile.jar
- * org.apache.flink.streaming.kafka.KafkaProducer yourTopicID kafkaServerIp As
- * an example: java -cp kafka-0.8/libs/*:flink-streaming.jar
- * org.apache.flink.streaming.kafka.KafkaProducer test localhost:9092
- */
-public class KafkaProducer {
- static kafka.javaapi.producer.Producer<Integer, String> producer;
- static Properties props = new Properties();
-
- public static void ProducerPrepare(String brokerAddr) {
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("metadata.broker.list", brokerAddr);
-
- producer = new kafka.javaapi.producer.Producer<Integer, String>(
- new ProducerConfig(props));
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length == 3) {
- String infilename = args[0];
- String topicId = args[1];
- String brokerAddr = args[2];
- ProducerPrepare(brokerAddr);
- BufferedReader reader = new BufferedReader(new FileReader(
- infilename));
- while (true) {
- String line = reader.readLine();
- if (line == null) {
- reader.close();
- reader = new BufferedReader(new FileReader(infilename));
- continue;
- }
- producer.send(new KeyedMessage<Integer, String>(topicId, line));
- }
- } else {
- System.out.println("please set filename!");
- System.exit(-1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index eeac961..e97cdd0 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -35,7 +35,8 @@ public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>
static Properties props;
private String topicId;
private String brokerAddr;
- private boolean close = false;
+ private boolean sendAndClose = false;
+ private boolean closeWithoutSend = false;
private boolean initDone = false;
public KafkaSink(String topicId, String brokerAddr) {
@@ -63,18 +64,27 @@ public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>
}
OUT out = serialize(tuple);
- KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(topicId, out);
- producer.send(data);
- if (close) {
- producer.close();
+ KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(
+ topicId, out);
+
+ if (!closeWithoutSend) {
+ producer.send(data);
}
+ if (sendAndClose) {
+ producer.close();
+ }
}
public abstract OUT serialize(IN tuple);
- public void close() {
- close = true;
+ public void closeWithoutSend() {
+ producer.close();
+ closeWithoutSend = true;
+ }
+
+ public void sendAndClose() {
+ sendAndClose = true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
index d34b6c3..32b11d9 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -24,21 +24,19 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
-
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
/**
- * Source for reading messages from a Kafka queue.
- * The source currently only support string messages.
+ * Source for reading messages from a Kafka queue. The source currently only
+ * support string messages.
*/
-public abstract class KafkaSource<IN extends Tuple, OUT> extends SourceFunction<IN>{
+public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
private static final long serialVersionUID = 1L;
private final String zkQuorum;
@@ -46,7 +44,8 @@ public abstract class KafkaSource<IN extends Tuple, OUT> extends SourceFunction<
private final String topicId;
private final int numThreads;
private ConsumerConnector consumer;
- private boolean close = false;
+ private boolean closeWithoutSend = false;
+ private boolean sendAndClose = false;
IN outTuple;
@@ -81,20 +80,25 @@ public abstract class KafkaSource<IN extends Tuple, OUT> extends SourceFunction<
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
- IN out=deserialize(it.next().message());
- if(!close){
- collector.collect(out);
+ IN out = deserialize(it.next().message());
+ if (closeWithoutSend) {
+ break;
}
- else {
+ collector.collect(out);
+ if (sendAndClose) {
break;
}
}
consumer.shutdown();
}
-
+
public abstract IN deserialize(byte[] msg);
-
- public void close(){
- close=true;
+
+ public void closeWithoutSend() {
+ closeWithoutSend = true;
+ }
+
+ public void sendAndClose() {
+ sendAndClose = true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 1a77aee..e111feb 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -19,10 +19,10 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.util.Collector;
public class KafkaTopology {
@@ -31,8 +31,8 @@ public class KafkaTopology {
private static final long serialVersionUID = 1L;
@Override
- public void invoke(Collector<Tuple1<String>> collector) throws Exception {
- // TODO Auto-generated method stub
+ public void invoke(Collector<Tuple1<String>> collector)
+ throws Exception {
for (int i = 0; i < 10; i++) {
collector.collect(new Tuple1<String>(Integer.toString(i)));
}
@@ -41,39 +41,37 @@ public class KafkaTopology {
}
}
- public static final class MyKafkaSource extends KafkaSource<Tuple1<String>, String> {
+ public static final class MyKafkaSource extends KafkaSource<Tuple1<String>> {
private static final long serialVersionUID = 1L;
- public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
+ public MyKafkaSource(String zkQuorum, String groupId, String topicId,
+ int numThreads) {
super(zkQuorum, groupId, topicId, numThreads);
- // TODO Auto-generated constructor stub
}
@Override
public Tuple1<String> deserialize(byte[] msg) {
- // TODO Auto-generated method stub
String s = new String(msg);
if (s.equals("q")) {
- close();
+ closeWithoutSend();
}
return new Tuple1<String>(s);
}
}
- public static final class MyKafkaSink extends KafkaSink<Tuple1<String>, String> {
+ public static final class MyKafkaSink extends
+ KafkaSink<Tuple1<String>, String> {
private static final long serialVersionUID = 1L;
public MyKafkaSink(String topicId, String brokerAddr) {
super(topicId, brokerAddr);
- // TODO Auto-generated constructor stub
}
@Override
public String serialize(Tuple1<String> tuple) {
- // TODO Auto-generated method stub
if (tuple.f0.equals("q")) {
- close();
+ sendAndClose();
}
return tuple.f0;
}
@@ -84,15 +82,19 @@ public class KafkaTopology {
public static void main(String[] args) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(1);
@SuppressWarnings("unused")
- DataStream<Tuple1<String>> stream1 = env.addSource(
- new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM).print();
+ DataStream<Tuple1<String>> stream1 = env
+ .addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
+ .print();
@SuppressWarnings("unused")
- DataStream<Tuple1<String>> stream2 = env.addSource(new MySource()).addSink(
- new MyKafkaSink("test", "localhost:9092"));
+ DataStream<Tuple1<String>> stream2 = env
+ .addSource(new MySource())
+ .addSink(new MyKafkaSink("test", "localhost:9092"));
+
env.execute();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index e6e8de5..5440ea7 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,47 +21,39 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
-/**
- * Source for sending messages to a RabbitMQ queue. The source currently only
- * support string messages. Other types will be added soon.
- *
- */
-public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
- private static final Log LOG = LogFactory.getLog(RMQSink.class);
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
- private boolean close = false;
+ private boolean sendAndClose = false;
+ private boolean closeWithoutSend = false;
- private String queueName;
- private String hostName;
+ private String QUEUE_NAME;
+ private String HOST_NAME;
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
private boolean initDone = false;
- public RMQSink(String hostName, String queueName) {
- this.hostName = hostName;
- this.queueName = queueName;
+ public RMQSink(String HOST_NAME, String QUEUE_NAME) {
+ this.HOST_NAME = HOST_NAME;
+ this.QUEUE_NAME = QUEUE_NAME;
}
public void initializeConnection() {
factory = new ConnectionFactory();
- factory.setHost(hostName);
+ factory.setHost(HOST_NAME);
try {
connection = factory.newConnection();
channel = connection.createChannel();
+
} catch (IOException e) {
- new RuntimeException("Cannot create RMQ connection with " + queueName + " at "
- + hostName, e);
+ e.printStackTrace();
}
initDone = true;
@@ -74,31 +66,39 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
}
try {
- channel.queueDeclare(queueName, false, false, false, null);
+ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
byte[] msg = serialize(tuple);
- channel.basicPublish("", queueName, null, msg);
- } catch (IOException e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send RMQ message to " + queueName + " at " + hostName);
+ if (!closeWithoutSend) {
+ channel.basicPublish("", QUEUE_NAME, null, msg);
}
+ } catch (IOException e) {
+ e.printStackTrace();
}
- if (close) {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Cannot close RMQ connection: " + queueName + " at " + hostName);
- }
- }
+ if (sendAndClose) {
+ closeChannel();
}
}
public abstract byte[] serialize(Tuple t);
- public void close() {
- close = true;
+ private void closeChannel() {
+ try {
+ channel.close();
+ connection.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public void closeWithoutSend() {
+ closeChannel();
+ closeWithoutSend = true;
+ }
+
+ public void sendAndClose() {
+ sendAndClose = true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 4b197e3..345d884 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -21,31 +21,24 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.ShutdownSignalException;
-/**
- * Source for reading messages from a RabbitMQ queue. The source currently only
- * support string messages. Other types will be added soon.
- *
- */
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
- private static final Log LOG = LogFactory.getLog(RMQSource.class);
-
private static final long serialVersionUID = 1L;
private final String QUEUE_NAME;
private final String HOST_NAME;
- private boolean close = false;
+ private boolean closeWithoutSend = false;
+ private boolean sendAndClose = false;
private transient ConnectionFactory factory;
private transient Connection connection;
@@ -70,8 +63,6 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (IOException e) {
- new RuntimeException("Cannot create RMQ connection with " + QUEUE_NAME + " at "
- + HOST_NAME, e);
}
}
@@ -79,20 +70,29 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
public void invoke(Collector<IN> collector) throws Exception {
initializeConnection();
- while (!close) {
+ while (true) {
try {
delivery = consumer.nextDelivery();
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot receive RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
- }
+ } catch (ShutdownSignalException e) {
+ e.printStackTrace();
+ break;
+ } catch (ConsumerCancelledException e) {
+ e.printStackTrace();
+ break;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
outTuple = deserialize(delivery.getBody());
- if (!close){
+ if (closeWithoutSend) {
+ break;
+ } else {
collector.collect(outTuple);
}
+ if (sendAndClose) {
+ break;
+ }
}
try {
@@ -105,8 +105,12 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
public abstract IN deserialize(byte[] t);
- public void close() {
- close = true;
+ public void closeWithoutSend() {
+ closeWithoutSend = true;
+ }
+
+ public void sendAndClose() {
+ sendAndClose = true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index 4b7c5dc..f84505b 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -19,10 +19,8 @@
package org.apache.flink.streaming.connectors.rabbitmq;
-import java.util.HashSet;
-import java.util.Set;
-
import org.apache.commons.lang.SerializationUtils;
+
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.DataStream;
@@ -33,16 +31,14 @@ public class RMQTopology {
public static final class MyRMQSink extends RMQSink<Tuple1<String>> {
public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
super(HOST_NAME, QUEUE_NAME);
- // TODO Auto-generated constructor stub
}
private static final long serialVersionUID = 1L;
@Override
public byte[] serialize(Tuple t) {
- // TODO Auto-generated method stub
if (t.getField(0).equals("q")) {
- close();
+ sendAndClose();
}
return SerializationUtils.serialize((String) t.getField(0));
}
@@ -53,7 +49,6 @@ public class RMQTopology {
public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
super(HOST_NAME, QUEUE_NAME);
- // TODO Auto-generated constructor stub
}
private static final long serialVersionUID = 1L;
@@ -64,28 +59,28 @@ public class RMQTopology {
Tuple1<String> out = new Tuple1<String>();
out.f0 = s;
if (s.equals("q")) {
- close();
+ closeWithoutSend();
}
return out;
}
}
- @SuppressWarnings("unused")
- private static Set<String> result = new HashSet<String>();
-
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(1);
@SuppressWarnings("unused")
- DataStream<Tuple1<String>> dataStream1 = env.addSource(
- new MyRMQSource("localhost", "hello")).print();
+ DataStream<Tuple1<String>> dataStream1 = env
+ .addSource(new MyRMQSource("localhost", "hello"))
+ .print();
@SuppressWarnings("unused")
- DataStream<Tuple1<String>> dataStream2 = env.fromElements("one", "two", "three", "four",
- "five", "q").addSink(new MyRMQSink("localhost", "hello"));
+ DataStream<Tuple1<String>> dataStream2 = env
+ .fromElements("one", "two", "three", "four", "five", "q")
+ .addSink(new MyRMQSink("localhost", "hello"));
env.execute();
}
-}
\ No newline at end of file
+}