You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:38 UTC
[05/12] git commit: [FLINK-1104] [streaming] Eliminated Tuple1 from
connectors
[FLINK-1104] [streaming] Eliminated Tuple1 from connectors
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7cc24006
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7cc24006
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7cc24006
Branch: refs/heads/master
Commit: 7cc24006a992d67cd3de54b867b38161f5b08d92
Parents: a3b0284
Author: ghermann <re...@gmail.com>
Authored: Sat Sep 20 13:55:13 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Wed Sep 24 19:54:39 2014 +0200
----------------------------------------------------------------------
docs/streaming_guide.md | 66 ++++++++++----------
.../connectors/kafka/KafkaTopology.java | 31 +++++----
.../function/source/GenSequenceFunction.java | 4 +-
3 files changed, 48 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7cc24006/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index e2f5854..f1b4189 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -456,7 +456,7 @@ An abstract class providing an interface for receiving data from Kafka. By imple
* Write a deserializer function which processes the data coming from Kafka,
* Stop the source manually when necessary with one of the close functions.
-The implemented class must extend `KafkaSource`, for example: `KafkaSource<Tuple1<String>>`.
+The implemented class must extend `KafkaSource`, for example: `KafkaSource<String>`.
##### Constructor
An example of an implementation of a constructor:
@@ -472,12 +472,12 @@ An example of an implementation of a deserializer:
~~~java
@Override
-public Tuple1<String> deserialize(byte[] msg) {
+public String deserialize(byte[] msg) {
String s = new String(msg);
if(s.equals("q")){
closeWithoutSend();
}
- return new Tuple1<String>(s);
+ return new String(s);
}
~~~
@@ -494,7 +494,7 @@ An abstract class providing an interface for sending data to Kafka. By implement
* Write a serializer function to send data in the desired form to Kafka,
* Stop the sink manually when necessary with one of the close functions.
-The implemented class must extend `KafkaSink`, for example `KafkaSink<Tuple1<String>, String>`.
+The implemented class must extend `KafkaSink`, for example `KafkaSink<String, String>`.
##### Constructor
An example of an implementation of a constructor:
@@ -510,11 +510,11 @@ An example of an implementation of a serializer:
~~~java
@Override
-public String serialize(Tuple1<String> tuple) {
- if(tuple.f0.equals("q")){
+public String serialize(String tuple) {
+ if(tuple.equals("q")){
sendAndClose();
}
- return tuple.f0;
+ return tuple;
}
~~~
@@ -524,8 +524,8 @@ The API provided is the [same](#kafka_source_close) as the one for `KafkaSource`
#### Building A Topology
To use a Kafka connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `KafkaSource` as parameter:
-~~~java
-DataStream<Tuple1<String>> stream1 = env.
+```java
+DataStream<String> stream1 = env.
addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
.print();
~~~
@@ -539,8 +539,8 @@ The followings have to be provided for the `MyKafkaSource()` constructor in orde
Similarly to use a Kafka connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `KafkaSink`:
-~~~java
-DataStream<Tuple1<String>> stream2 = env
+```java
+DataStream<String> stream2 = env
.addSource(new MySource())
.addSink(new MyKafkaSink("test", "localhost:9092"));
~~~
@@ -567,7 +567,7 @@ An abstract class providing an interface for receiving data from Flume. By imple
* Write a deserializer function which processes the data coming from Flume,
* Stop the source manually when necessary with one of the close functions.
-The implemented class must extend `FlumeSource` for example: `FlumeSource<Tuple1<String>>`
+The implemented class must extend `FlumeSource` for example: `FlumeSource<String>`
##### Constructor
An example of an implementation of a constructor:
@@ -583,10 +583,9 @@ An example of an implementation of a deserializer:
~~~java
@Override
-public Tuple1<String> deserialize(byte[] msg) {
+public String deserialize(byte[] msg) {
String s = (String) SerializationUtils.deserialize(msg);
- Tuple1<String> out = new Tuple1<String>();
- out.f0 = s;
+ String out = s;
if (s.equals("q")) {
closeWithoutSend();
}
@@ -607,7 +606,7 @@ An abstract class providing an interface for sending data to Flume. By implement
* Write a serializer function to send data in the desired form to Flume,
* Stop the sink manually when necessary with one of the close functions.
-The implemented class must extend `FlumeSink`, for example `FlumeSink<Tuple1<String>, String>`.
+The implemented class must extend `FlumeSink`, for example `FlumeSink<String, String>`.
##### Constructor
An example of an implementation of a constructor:
@@ -623,8 +622,8 @@ An example of an implementation of a serializer.
~~~java
@Override
-public byte[] serialize(Tuple1<String> tuple) {
- if (tuple.f0.equals("q")) {
+public byte[] serialize(String tuple) {
+ if (tuple.equals("q")) {
try {
sendAndClose();
} catch (Exception e) {
@@ -632,7 +631,7 @@ public byte[] serialize(Tuple1<String> tuple) {
+ host, e);
}
}
- return SerializationUtils.serialize(tuple.f0);
+ return SerializationUtils.serialize(tuple);
}
~~~
@@ -642,8 +641,8 @@ The API provided is the [same](#flume_source_close) as the one for `FlumeSource`
#### Building A Topology
To use a Flume connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `FlumeSource` as parameter:
-~~~java
-DataStream<Tuple1<String>> dataStream1 = env
+```java
+DataStream<String> dataStream1 = env
.addSource(new MyFlumeSource("localhost", 41414))
.print();
~~~
@@ -655,8 +654,8 @@ The followings have to be provided for the `MyFlumeSource()` constructor in orde
Similarly to use a Flume connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `FlumeSink`
-~~~java
-DataStream<Tuple1<String>> dataStream2 = env
+```java
+DataStream<String> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyFlumeSink("localhost", 42424));
~~~
@@ -713,7 +712,7 @@ An abstract class providing an interface for receiving data from RabbitMQ. By im
* Write a deserializer function which processes the data coming from RabbitMQ,
* Stop the source manually when necessary with one of the close functions.
-The implemented class must extend `RabbitMQSource` for example: `RabbitMQSource<Tuple1<String>>`
+The implemented class must extend `RabbitMQSource` for example: `RabbitMQSource<String>`
##### Constructor
An example of an implementation of a constructor:
@@ -729,10 +728,9 @@ An example of an implemetation of a deserializer:
~~~java
@Override
-public Tuple1<String> deserialize(byte[] t) {
+public String deserialize(byte[] t) {
String s = (String) SerializationUtils.deserialize(t);
- Tuple1<String> out = new Tuple1<String>();
- out.f0 = s;
+ String out = s;
if (s.equals("q")) {
closeWithoutSend();
}
@@ -755,7 +753,7 @@ An abstract class providing an interface for sending data to RabbitMQ. By implem
* Write a serializer function to send data in the desired form to RabbitMQ
* Stop the sink manually when necessary with one of the close functions
-The implemented class must extend `RabbitMQSink` for example: `RabbitMQSink<Tuple1<String>, String>`
+The implemented class must extend `RabbitMQSink` for example: `RabbitMQSink<String, String>`
##### Constructor
An example of an implementation of a constructor:
@@ -787,7 +785,7 @@ To use a RabbitMQ connector as a source in Flink call the `addSource()` function
~~~java
@SuppressWarnings("unused")
-DataStream<Tuple1<String>> dataStream1 = env
+DataStream<String> dataStream1 = env
.addSource(new MyRMQSource("localhost", "hello"))
.print();
~~~
@@ -799,8 +797,8 @@ The followings have to be provided for the `MyRabbitMQSource()` constructor in o
Similarly to use a RabbitMQ connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `RabbitMQSink`
-~~~java
-DataStream<Tuple1<String>> dataStream2 = env
+```java
+DataStream<String> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyRMQSink("localhost", "hello"));
~~~
@@ -835,7 +833,7 @@ To run the container type:
sudo docker run -p 127.0.0.1:5672:5672 -t -i flinkstreaming/flink-connectors-rabbitmq
~~~
-Now a terminal started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost's and the Docker container's port so RabbitMQ can communicate with the application through this.
+Now a terminal started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost's and the Docker container's ports so RabbitMQ can communicate with the application through these.
To start the RabbitMQ server:
@@ -877,7 +875,7 @@ To run the container type:
sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i flinkstreaming/flink-connectors-kafka
~~~
-Now a terminal started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost's and the Docker container's port so Kafka can communicate with the application through this.
+Now a terminal started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost's and the Docker container's ports so Kafka can communicate with the application through these.
First start a zookeeper in the background:
~~~batch
@@ -931,7 +929,7 @@ To run the container type:
sudo docker run -t -i flinkstreaming/flink-connectors-flume
~~~
-Now a terminal started running from the image with all the necessary configurations to test run the Flume connector. The -p flag binds the localhost's and the Docker container's port so flume can communicate with the application through this.
+Now a terminal started running from the image with all the necessary configurations to test run the Flume connector. The -p flag binds the localhost's and the Docker container's ports so flume can communicate with the application through these.
To have the latest version of Flink type:
~~~batch
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7cc24006/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 79f0b74..089efad 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
@@ -29,20 +28,20 @@ import org.slf4j.LoggerFactory;
public class KafkaTopology {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
- public static final class MySource implements SourceFunction<Tuple1<String>> {
+ public static final class MySource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
@Override
- public void invoke(Collector<Tuple1<String>> collector) throws Exception {
+ public void invoke(Collector<String> collector) throws Exception {
for (int i = 0; i < 10; i++) {
- collector.collect(new Tuple1<String>(Integer.toString(i)));
+ collector.collect(new String(Integer.toString(i)));
}
- collector.collect(new Tuple1<String>("q"));
+ collector.collect(new String("q"));
}
}
- public static final class MyKafkaSource extends KafkaSource<Tuple1<String>> {
+ public static final class MyKafkaSource extends KafkaSource<String> {
private static final long serialVersionUID = 1L;
public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
@@ -50,17 +49,17 @@ public class KafkaTopology {
}
@Override
- public Tuple1<String> deserialize(byte[] msg) {
+ public String deserialize(byte[] msg) {
String s = new String(msg);
if (s.equals("q")) {
closeWithoutSend();
}
- return new Tuple1<String>(s);
+ return new String(s);
}
}
- public static final class MyKafkaSink extends KafkaSink<Tuple1<String>, String> {
+ public static final class MyKafkaSink extends KafkaSink<String, String> {
private static final long serialVersionUID = 1L;
public MyKafkaSink(String topicId, String brokerAddr) {
@@ -68,20 +67,20 @@ public class KafkaTopology {
}
@Override
- public String serialize(Tuple1<String> tuple) {
- if (tuple.f0.equals("q")) {
+ public String serialize(String tuple) {
+ if (tuple.equals("q")) {
sendAndClose();
}
- return tuple.f0;
+ return tuple;
}
}
- public static final class MyKafkaPrintSink implements SinkFunction<Tuple1<String>> {
+ public static final class MyKafkaPrintSink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
@Override
- public void invoke(Tuple1<String> value) {
+ public void invoke(String value) {
if (LOG.isInfoEnabled()) {
LOG.info("String: <{}> arrived from Kafka", value);
}
@@ -95,12 +94,12 @@ public class KafkaTopology {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
- DataStream<Tuple1<String>> stream1 = env
+ DataStream<String> stream1 = env
.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
.addSink(new MyKafkaPrintSink());
@SuppressWarnings("unused")
- DataStream<Tuple1<String>> stream2 = env
+ DataStream<String> stream2 = env
.addSource(new MySource())
.addSink(new MyKafkaSink("test", "localhost:9092"));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7cc24006/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index acd63c2..69601ff 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -17,8 +17,7 @@
package org.apache.flink.streaming.api.function.source;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.util.Collector;
+import org.apache.flink.util.Collector;
/**
* Source Function used to generate the number sequence
@@ -30,7 +29,6 @@ public class GenSequenceFunction implements SourceFunction<Long> {
long from;
long to;
- Tuple1<Long> outTuple = new Tuple1<Long>();
public GenSequenceFunction(long from, long to) {
this.from = from;