You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:55 UTC
[18/51] [abbrv] git commit: [streaming] Javadocs for connectors
[streaming] Javadocs for connectors
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2f704aeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2f704aeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2f704aeb
Branch: refs/heads/master
Commit: 2f704aeb3eb94a7b24641408699d54cd40a4d8cb
Parents: 7c5bc3c
Author: jfeher <fe...@gmail.com>
Authored: Fri Aug 1 11:22:31 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:21:13 2014 +0200
----------------------------------------------------------------------
.../streaming/connectors/flume/FlumeSink.java | 51 +++++++++++++++---
.../streaming/connectors/flume/FlumeSource.java | 57 ++++++++++++++++++--
.../connectors/flume/FlumeTopology.java | 2 +-
.../streaming/connectors/kafka/KafkaSink.java | 22 ++++++++
.../streaming/connectors/kafka/KafkaSource.java | 36 +++++++++----
.../streaming/connectors/rabbitmq/RMQSink.java | 29 +++++++++-
.../connectors/rabbitmq/RMQSource.java | 27 +++++++++-
7 files changed, 201 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 3379fcb..20a3a4a 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -21,21 +21,22 @@ package org.apache.flink.streaming.connectors.flume;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(RMQSource.class);
-
+
private transient FlinkRpcClientFacade client;
boolean initDone = false;
String host;
@@ -48,6 +49,13 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
this.port = port;
}
+ /**
+ * Receives tuples from the Apache Flink {@link DataStream} and forwards them to
+ * Apache Flume.
+ *
+ * @param tuple
+ * The tuple arriving from the datastream
+ */
@Override
public void invoke(IN tuple) {
@@ -66,6 +74,13 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
}
+ /**
+ * Serializes tuples into byte arrays.
+ *
+ * @param tuple
+ * The tuple used for the serialization
+ * @return The serialized byte array.
+ */
public abstract byte[] serialize(IN tuple);
private class FlinkRpcClientFacade {
@@ -73,6 +88,14 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
private String hostname;
private int port;
+ /**
+ * Initializes the connection to Apache Flume.
+ *
+ * @param hostname
+ * The host
+ * @param port
+ * The port.
+ */
public void init(String hostname, int port) {
// Setup the RPC connection
this.hostname = hostname;
@@ -80,12 +103,13 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
int initCounter = 0;
while (true) {
if (initCounter >= 90) {
- System.exit(1);
+ new RuntimeException("Cannot establish connection with" + port + " at " + host);
}
try {
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
} catch (FlumeException e) {
- // Wait one second if the connection failed before the next try
+ // Wait one second if the connection failed before the next
+ // try
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
@@ -102,6 +126,12 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
initDone = true;
}
+ /**
+ * Sends byte arrays as {@link Event} series to Apache Flume.
+ *
+ * @param data
+ * The byte array to send to Apache FLume
+ */
public void sendDataToFlume(byte[] data) {
Event event = EventBuilder.withBody(data);
@@ -116,16 +146,25 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
}
}
+ /**
+ * Closes the RpcClient.
+ */
public void close() {
client.close();
}
}
+ /**
+ * Closes the connection only when the next message is sent after this call.
+ */
public void sendAndClose() {
sendAndClose = true;
}
+ /**
+ * Closes the connection immediately and no further data will be sent.
+ */
public void closeWithoutSend() {
client.close();
closeWithoutSend = true;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 5b90e14..8b102a8 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -26,8 +26,8 @@ import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.source.AvroSource;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.Status;
-
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
@@ -45,12 +45,28 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
public class MyAvroSource extends AvroSource {
Collector<IN> collector;
+ /**
+ * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
+ * {@link DataStream}.
+ *
+ * @param avroEvent
+ * The event that should be sent to the dataStream
+ * @return A {@link Status}.OK message if sending the event was successful.
+ */
@Override
public Status append(AvroFlumeEvent avroEvent) {
collect(avroEvent);
return Status.OK;
}
+ /**
+ * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
+ * {@link DataStream}.
+ *
+ * @param events
+ * The events that is sent to the dataStream
+ * @return A Status.OK message if sending the events was successful.
+ */
@Override
public Status appendBatch(List<AvroFlumeEvent> events) {
for (AvroFlumeEvent avroEvent : events) {
@@ -60,6 +76,13 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
return Status.OK;
}
+ /**
+ * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
+ * {@link DataStream}.
+ *
+ * @param avroEvent
+ * The event that is sent to the dataStream
+ */
private void collect(AvroFlumeEvent avroEvent) {
byte[] b = avroEvent.getBody().array();
IN tuple = FlumeSource.this.deserialize(b);
@@ -78,8 +101,22 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
private boolean sendAndClose = false;
private volatile boolean sendDone = false;
- public abstract IN deserialize(byte[] msg);
-
+ /**
+ * Deserializes the incoming data.
+ *
+ * @param message
+ * The incoming message in a byte array
+ * @return The deserialized message in the required format.
+ */
+ public abstract IN deserialize(byte[] message);
+
+ /**
+ * Configures the AvroSource. Also sets the collector so the application can
+ * use it from outside of the invoke function.
+ *
+ * @param collector
+ * The collector used in the invoke function
+ */
public void configureAvroSource(Collector<IN> collector) {
avroSource = new MyAvroSource();
@@ -88,10 +125,18 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
context.put("port", port);
context.put("bind", host);
avroSource.configure(context);
+ // An instance of a ChannelProcessor is required for configuring the
+ // avroSource although it will not be used in this case.
ChannelProcessor cp = new ChannelProcessor(null);
avroSource.setChannelProcessor(cp);
}
+ /**
+ * Configures the AvroSource and runs until the user calls a close function.
+ *
+ * @param collector
+ * The Collector for sending data to the datastream
+ */
@Override
public void invoke(Collector<IN> collector) throws Exception {
configureAvroSource(collector);
@@ -104,10 +149,16 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
avroSource.stop();
}
+ /**
+ * Closes the connection only when the next message is sent after this call.
+ */
public void sendAndClose() {
sendAndClose = true;
}
+ /**
+ * Closes the connection immediately and no further data will be sent.
+ */
public void closeWithoutSend() {
closeWithoutSend = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 779a5fb..3c45cd4 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -39,7 +39,7 @@ public class FlumeTopology {
try {
sendAndClose();
} catch (Exception e) {
- new RuntimeException("Error while closing RMQ connection with " + port + " at "
+ new RuntimeException("Error while closing Flume connection with " + port + " at "
+ host, e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index 44867ef..7e3f3db 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -45,6 +45,9 @@ public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>
}
+ /**
+ * Initializes the connection to Kafka.
+ */
public void initialize() {
props = new Properties();
@@ -57,6 +60,12 @@ public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>
initDone = true;
}
+ /**
+ * Called when new data arrives to the sink, and forwards it to Kafka.
+ *
+ * @param tuple
+ * The incoming data
+ */
@Override
public void invoke(IN tuple) {
if (!initDone) {
@@ -75,13 +84,26 @@ public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>
}
}
+ /**
+ * Serializes tuples into byte arrays.
+ *
+ * @param tuple
+ * The tuple used for the serialization
+ * @return The serialized byte array.
+ */
public abstract OUT serialize(IN tuple);
+ /**
+ * Closes the connection immediately and no further data will be sent.
+ */
public void closeWithoutSend() {
producer.close();
closeWithoutSend = true;
}
+ /**
+ * Closes the connection only when the next message is sent after this call.
+ */
public void sendAndClose() {
sendAndClose = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
index 32b11d9..623e3b8 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -28,14 +28,12 @@ import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
+
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
-/**
- * Source for reading messages from a Kafka queue. The source currently only
- * support string messages.
- */
public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
private static final long serialVersionUID = 1L;
@@ -49,14 +47,16 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
IN outTuple;
- public KafkaSource(String zkQuorum, String groupId, String topicId,
- int numThreads) {
+ public KafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
this.zkQuorum = zkQuorum;
this.groupId = groupId;
this.topicId = topicId;
this.numThreads = numThreads;
}
+ /**
+ * Initializes the connection to Kafka.
+ */
private void initializeConnection() {
Properties props = new Properties();
props.put("zookeeper.connect", zkQuorum);
@@ -64,10 +64,15 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
- consumer = kafka.consumer.Consumer
- .createJavaConsumerConnector(new ConsumerConfig(props));
+ consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
}
+ /**
+ * Called to forward the data from the source to the {@link DataStream}.
+ *
+ * @param collector
+ * The Collector for sending data to the dataStream
+ */
@Override
public void invoke(Collector<IN> collector) throws Exception {
initializeConnection();
@@ -92,12 +97,25 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
consumer.shutdown();
}
- public abstract IN deserialize(byte[] msg);
+ /**
+ * Deserializes the incoming data.
+ *
+ * @param message
+ * The incoming message in a byte array
+ * @return The deserialized message in the required format.
+ */
+ public abstract IN deserialize(byte[] message);
+ /**
+ * Closes the connection immediately and no further data will be sent.
+ */
public void closeWithoutSend() {
closeWithoutSend = true;
}
+ /**
+ * Closes the connection only when the next message is sent after this call.
+ */
public void sendAndClose() {
sendAndClose = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 663fc13..d7ed17a 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -50,6 +50,9 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
this.QUEUE_NAME = QUEUE_NAME;
}
+ /**
+ * Initializes the connection to RMQ.
+ */
public void initializeConnection() {
factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
@@ -64,6 +67,12 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
initDone = true;
}
+ /**
+ * Called when new data arrives to the sink, and forwards it to RMQ.
+ *
+ * @param tuple
+ * The incoming data
+ */
@Override
public void invoke(IN tuple) {
if (!initDone) {
@@ -87,8 +96,18 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
}
}
- public abstract byte[] serialize(Tuple t);
-
+ /**
+ * Serializes tuples into byte arrays.
+ *
+ * @param tuple
+ * The tuple used for the serialization
+ * @return The serialized byte array.
+ */
+ public abstract byte[] serialize(Tuple tuple);
+
+ /**
+ * Closes the connection.
+ */
private void closeChannel() {
try {
channel.close();
@@ -100,11 +119,17 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
}
+ /**
+ * Closes the connection immediately and no further data will be sent.
+ */
public void closeWithoutSend() {
closeChannel();
closeWithoutSend = true;
}
+ /**
+ * Closes the connection only when the next message is sent after this call.
+ */
public void sendAndClose() {
sendAndClose = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 4fd2235..dfea55a 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -30,6 +30,7 @@ import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
@@ -56,6 +57,9 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
this.QUEUE_NAME = QUEUE_NAME;
}
+ /**
+ * Initializes the connection to RMQ.
+ */
private void initializeConnection() {
factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
@@ -71,6 +75,12 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
}
}
+ /**
+ * Called to forward the data from the source to the {@link DataStream}.
+ *
+ * @param collector
+ * The Collector for sending data to the dataStream
+ */
@Override
public void invoke(Collector<IN> collector) throws Exception {
initializeConnection();
@@ -105,12 +115,25 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
}
- public abstract IN deserialize(byte[] t);
-
+ /**
+ * Deserializes the incoming data.
+ *
+ * @param message
+ * The incoming message in a byte array
+ * @return The deserialized message in the required format.
+ */
+ public abstract IN deserialize(byte[] message);
+
+ /**
+ * Closes the connection immediately and no further data will be sent.
+ */
public void closeWithoutSend() {
closeWithoutSend = true;
}
+ /**
+ * Closes the connection only when the next message is sent after this call.
+ */
public void sendAndClose() {
sendAndClose = true;
}