You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:38 UTC

[05/12] git commit: [FLINK-1104] [streaming] Eliminated Tuple1 from connectors

[FLINK-1104] [streaming] Eliminated Tuple1 from connectors


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7cc24006
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7cc24006
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7cc24006

Branch: refs/heads/master
Commit: 7cc24006a992d67cd3de54b867b38161f5b08d92
Parents: a3b0284
Author: ghermann <re...@gmail.com>
Authored: Sat Sep 20 13:55:13 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Wed Sep 24 19:54:39 2014 +0200

----------------------------------------------------------------------
 docs/streaming_guide.md                         | 66 ++++++++++----------
 .../connectors/kafka/KafkaTopology.java         | 31 +++++----
 .../function/source/GenSequenceFunction.java    |  4 +-
 3 files changed, 48 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7cc24006/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index e2f5854..f1b4189 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -456,7 +456,7 @@ An abstract class providing an interface for receiving data from Kafka. By imple
  * Write a deserializer function which processes the data coming from Kafka,
  * Stop the source manually when necessary with one of the close functions.
 
-The implemented class must extend `KafkaSource`, for example: `KafkaSource<Tuple1<String>>`.
+The implemented class must extend `KafkaSource`, for example: `KafkaSource<String>`.
 
 ##### Constructor
 An example of an implementation of a constructor:
@@ -472,12 +472,12 @@ An example of an implementation of a deserializer:
 
 ~~~java
 @Override
-public Tuple1<String> deserialize(byte[] msg) {
+public String deserialize(byte[] msg) {
 	String s = new String(msg);
 	if(s.equals("q")){
 		closeWithoutSend();
 	}
-	return new Tuple1<String>(s);
+	return new String(s);
 }
 ~~~
 
@@ -494,7 +494,7 @@ An abstract class providing an interface for sending data to Kafka. By implement
  * Write a serializer function to send data in the desired form to Kafka,
  * Stop the sink manually when necessary with one of the close functions.
 
-The implemented class must extend `KafkaSink`, for example `KafkaSink<Tuple1<String>, String>`.
+The implemented class must extend `KafkaSink`, for example `KafkaSink<String, String>`.
 
 ##### Constructor
 An example of an implementation of a constructor:
@@ -510,11 +510,11 @@ An example of an implementation of a serializer:
 
 ~~~java
 @Override
-public String serialize(Tuple1<String> tuple) {
-	if(tuple.f0.equals("q")){
+public String serialize(String tuple) {
+	if(tuple.equals("q")){
 		sendAndClose();
 	}
-	return tuple.f0;
+	return tuple;
 }
 ~~~
 
@@ -524,8 +524,8 @@ The API provided is the [same](#kafka_source_close) as the one for `KafkaSource`
 #### Building A Topology
 To use a Kafka connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `KafkaSource` as parameter:
 
-~~~java
-DataStream<Tuple1<String>> stream1 = env.
+```java
+DataStream<String> stream1 = env.
 	addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
 	.print();
 ~~~
@@ -539,8 +539,8 @@ The followings have to be provided for the `MyKafkaSource()` constructor in orde
 
 Similarly to use a Kafka connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `KafkaSink`:
 
-~~~java
-DataStream<Tuple1<String>> stream2 = env
+```java
+DataStream<String> stream2 = env
 	.addSource(new MySource())
 	.addSink(new MyKafkaSink("test", "localhost:9092"));
 ~~~
@@ -567,7 +567,7 @@ An abstract class providing an interface for receiving data from Flume. By imple
  * Write a deserializer function which processes the data coming from Flume,
  * Stop the source manually when necessary with one of the close functions.
 
-The implemented class must extend `FlumeSource` for example: `FlumeSource<Tuple1<String>>`
+The implemented class must extend `FlumeSource` for example: `FlumeSource<String>`
 
 ##### Constructor
 An example of an implementation of a constructor:
@@ -583,10 +583,9 @@ An example of an implementation of a deserializer:
 
 ~~~java
 @Override
-public Tuple1<String> deserialize(byte[] msg) {
+public String deserialize(byte[] msg) {
 	String s = (String) SerializationUtils.deserialize(msg);
-	Tuple1<String> out = new Tuple1<String>();
-	out.f0 = s;
+	String out = s;
 	if (s.equals("q")) {
 		closeWithoutSend();
 	}
@@ -607,7 +606,7 @@ An abstract class providing an interface for sending data to Flume. By implement
 * Write a serializer function to send data in the desired form to Flume,
 * Stop the sink manually when necessary with one of the close functions.
 
-The implemented class must extend `FlumeSink`, for example `FlumeSink<Tuple1<String>, String>`.
+The implemented class must extend `FlumeSink`, for example `FlumeSink<String, String>`.
 
 ##### Constructor
 An example of an implementation of a constructor:
@@ -623,8 +622,8 @@ An example of an implementation of a serializer.
 
 ~~~java
 @Override
-public byte[] serialize(Tuple1<String> tuple) {
-	if (tuple.f0.equals("q")) {
+public byte[] serialize(String tuple) {
+	if (tuple.equals("q")) {
 		try {
 			sendAndClose();
 		} catch (Exception e) {
@@ -632,7 +631,7 @@ public byte[] serialize(Tuple1<String> tuple) {
 				+ host, e);
 		}
 	}
-	return SerializationUtils.serialize(tuple.f0);
+	return SerializationUtils.serialize(tuple);
 }
 ~~~
 
@@ -642,8 +641,8 @@ The API provided is the [same](#flume_source_close) as the one for `FlumeSource`
 #### Building A Topology
 To use a Flume connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `FlumeSource` as parameter:
 
-~~~java
-DataStream<Tuple1<String>> dataStream1 = env
+```java
+DataStream<String> dataStream1 = env
 	.addSource(new MyFlumeSource("localhost", 41414))
 	.print();
 ~~~
@@ -655,8 +654,8 @@ The followings have to be provided for the `MyFlumeSource()` constructor in orde
 
 Similarly to use a Flume connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `FlumeSink`
 
-~~~java
-DataStream<Tuple1<String>> dataStream2 = env
+```java
+DataStream<String> dataStream2 = env
 	.fromElements("one", "two", "three", "four", "five", "q")
 	.addSink(new MyFlumeSink("localhost", 42424));
 ~~~
@@ -713,7 +712,7 @@ An abstract class providing an interface for receiving data from RabbitMQ. By im
 * Write a deserializer function which processes the data coming from RabbitMQ,
 * Stop the source manually when necessary with one of the close functions.
 
-The implemented class must extend `RabbitMQSource` for example: `RabbitMQSource<Tuple1<String>>`
+The implemented class must extend `RabbitMQSource` for example: `RabbitMQSource<String>`
 
 ##### Constructor
 An example of an implementation of a constructor:
@@ -729,10 +728,9 @@ An example of an implemetation of a deserializer:
 
 ~~~java
 @Override
-public Tuple1<String> deserialize(byte[] t) {
+public String deserialize(byte[] t) {
 	String s = (String) SerializationUtils.deserialize(t);
-	Tuple1<String> out = new Tuple1<String>();
-	out.f0 = s;
+	String out = s;
 	if (s.equals("q")) {
 		closeWithoutSend();
 	}
@@ -755,7 +753,7 @@ An abstract class providing an interface for sending data to RabbitMQ. By implem
 * Write a serializer function to send data in the desired form to RabbitMQ
 * Stop the sink manually when necessary with one of the close functions
 
-The implemented class must extend `RabbitMQSink` for example: `RabbitMQSink<Tuple1<String>, String>`
+The implemented class must extend `RabbitMQSink` for example: `RabbitMQSink<String, String>`
 
 ##### Constructor
 An example of an implementation of a constructor:
@@ -787,7 +785,7 @@ To use a RabbitMQ connector as a source in Flink call the `addSource()` function
 
 ~~~java
 @SuppressWarnings("unused")
-DataStream<Tuple1<String>> dataStream1 = env
+DataStream<String> dataStream1 = env
 	.addSource(new MyRMQSource("localhost", "hello"))
 	.print();
 ~~~
@@ -799,8 +797,8 @@ The followings have to be provided for the `MyRabbitMQSource()` constructor in o
 
 Similarly to use a RabbitMQ connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `RabbitMQSink`
 
-~~~java
-DataStream<Tuple1<String>> dataStream2 = env
+```java
+DataStream<String> dataStream2 = env
 	.fromElements("one", "two", "three", "four", "five", "q")
 	.addSink(new MyRMQSink("localhost", "hello"));
 ~~~
@@ -835,7 +833,7 @@ To run the container type:
 sudo docker run -p 127.0.0.1:5672:5672 -t -i flinkstreaming/flink-connectors-rabbitmq
 ~~~
 
-Now a terminal started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost's and the Docker container's port so RabbitMQ can communicate with the application through this.
+Now a terminal started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost's and the Docker container's ports so RabbitMQ can communicate with the application through these.
 
 To start the RabbitMQ server:
 
@@ -877,7 +875,7 @@ To run the container type:
 sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i flinkstreaming/flink-connectors-kafka
 ~~~
 
-Now a terminal started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost's and the Docker container's port so Kafka can communicate with the application through this.
+Now a terminal started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost's and the Docker container's ports so Kafka can communicate with the application through these.
 First start a zookeeper in the background:
 
 ~~~batch
@@ -931,7 +929,7 @@ To run the container type:
 sudo docker run -t -i flinkstreaming/flink-connectors-flume
 ~~~
 
-Now a terminal started running from the image with all the necessary configurations to test run the Flume connector. The -p flag binds the localhost's and the Docker container's port so flume can communicate with the application through this.
+Now a terminal started running from the image with all the necessary configurations to test run the Flume connector. The -p flag binds the localhost's and the Docker container's ports so flume can communicate with the application through these.
 
 To have the latest version of Flink type:
 ~~~batch

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7cc24006/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 79f0b74..089efad 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
@@ -29,20 +28,20 @@ import org.slf4j.LoggerFactory;
 public class KafkaTopology {
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
 	
-	public static final class MySource implements SourceFunction<Tuple1<String>> {
+	public static final class MySource implements SourceFunction<String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Collector<Tuple1<String>> collector) throws Exception {
+		public void invoke(Collector<String> collector) throws Exception {
 			for (int i = 0; i < 10; i++) {
-				collector.collect(new Tuple1<String>(Integer.toString(i)));
+				collector.collect(new String(Integer.toString(i)));
 			}
-			collector.collect(new Tuple1<String>("q"));
+			collector.collect(new String("q"));
 
 		}
 	}
 
-	public static final class MyKafkaSource extends KafkaSource<Tuple1<String>> {
+	public static final class MyKafkaSource extends KafkaSource<String> {
 		private static final long serialVersionUID = 1L;
 
 		public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
@@ -50,17 +49,17 @@ public class KafkaTopology {
 		}
 
 		@Override
-		public Tuple1<String> deserialize(byte[] msg) {
+		public String deserialize(byte[] msg) {
 			String s = new String(msg);
 			if (s.equals("q")) {
 				closeWithoutSend();
 			}
-			return new Tuple1<String>(s);
+			return new String(s);
 		}
 
 	}
 
-	public static final class MyKafkaSink extends KafkaSink<Tuple1<String>, String> {
+	public static final class MyKafkaSink extends KafkaSink<String, String> {
 		private static final long serialVersionUID = 1L;
 
 		public MyKafkaSink(String topicId, String brokerAddr) {
@@ -68,20 +67,20 @@ public class KafkaTopology {
 		}
 
 		@Override
-		public String serialize(Tuple1<String> tuple) {
-			if (tuple.f0.equals("q")) {
+		public String serialize(String tuple) {
+			if (tuple.equals("q")) {
 				sendAndClose();
 			}
-			return tuple.f0;
+			return tuple;
 		}
 
 	}
 	
-	public static final class MyKafkaPrintSink implements SinkFunction<Tuple1<String>> {
+	public static final class MyKafkaPrintSink implements SinkFunction<String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<String> value) {
+		public void invoke(String value) {
 			if (LOG.isInfoEnabled()) {
 				LOG.info("String: <{}> arrived from Kafka", value);
 			}
@@ -95,12 +94,12 @@ public class KafkaTopology {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<String>> stream1 = env
+		DataStream<String> stream1 = env
 			.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
 			.addSink(new MyKafkaPrintSink());
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<String>> stream2 = env
+		DataStream<String> stream2 = env
 			.addSource(new MySource())
 			.addSink(new MyKafkaSink("test", "localhost:9092"));
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7cc24006/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index acd63c2..69601ff 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -17,8 +17,7 @@
 
 package org.apache.flink.streaming.api.function.source;
 
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.util.Collector;
+import org.apache.flink.util.Collector;
 
 /**
  * Source Function used to generate the number sequence
@@ -30,7 +29,6 @@ public class GenSequenceFunction implements SourceFunction<Long> {
 
 	long from;
 	long to;
-	Tuple1<Long> outTuple = new Tuple1<Long>();
 
 	public GenSequenceFunction(long from, long to) {
 		this.from = from;