You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:06 UTC
[29/51] [abbrv] git commit: [streaming] Added support for simple
types instead of Tuple1 in the API
[streaming] Added support for simple types instead of Tuple1 in the API
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1162caca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1162caca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1162caca
Branch: refs/heads/master
Commit: 1162caca857142f237c6c05a04e9f7f2afc89572
Parents: b3cd5fd
Author: gyfora <gy...@gmail.com>
Authored: Sat Aug 2 21:19:18 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:48 2014 +0200
----------------------------------------------------------------------
.../streaming/connectors/flume/FlumeSink.java | 6 +-
.../streaming/connectors/flume/FlumeSource.java | 19 +-
.../connectors/flume/FlumeTopology.java | 21 +-
.../streaming/connectors/kafka/KafkaSink.java | 3 +-
.../streaming/connectors/kafka/KafkaSource.java | 11 +-
.../streaming/connectors/rabbitmq/RMQSink.java | 13 +-
.../connectors/rabbitmq/RMQSource.java | 16 +-
.../connectors/rabbitmq/RMQTopology.java | 23 +-
.../connectors/twitter/TwitterLocal.java | 9 +-
.../connectors/twitter/TwitterSource.java | 16 +-
.../connectors/twitter/TwitterStreaming.java | 17 +-
.../apache/flink/streaming/api/DataStream.java | 20 +-
.../streaming/api/IterativeDataStream.java | 5 +-
.../flink/streaming/api/StreamConfig.java | 15 +-
.../api/StreamExecutionEnvironment.java | 48 ++--
.../flink/streaming/api/StreamOperator.java | 3 +-
.../api/collector/DirectedStreamCollector.java | 15 +-
.../streaming/api/collector/OutputSelector.java | 20 +-
.../api/collector/StreamCollector.java | 28 +--
.../api/function/co/CoMapFunction.java | 3 +-
.../api/function/sink/PrintSinkFunction.java | 3 +-
.../api/function/sink/SinkFunction.java | 5 +-
.../api/function/sink/WriteFormat.java | 4 +-
.../api/function/sink/WriteFormatAsCsv.java | 4 +-
.../api/function/sink/WriteFormatAsText.java | 4 +-
.../api/function/sink/WriteSinkFunction.java | 4 +-
.../sink/WriteSinkFunctionByBatches.java | 3 +-
.../sink/WriteSinkFunctionByMillis.java | 3 +-
.../api/function/source/FileSourceFunction.java | 9 +-
.../api/function/source/FileStreamFunction.java | 9 +-
.../function/source/FromElementsFunction.java | 9 +-
.../function/source/GenSequenceFunction.java | 7 +-
.../api/function/source/SourceFunction.java | 3 +-
.../streaming/api/invokable/SinkInvokable.java | 11 +-
.../api/invokable/SourceInvokable.java | 3 +-
.../api/invokable/StreamComponentInvokable.java | 3 +-
.../api/invokable/StreamRecordInvokable.java | 3 +-
.../api/invokable/UserTaskInvokable.java | 4 +-
.../operator/BatchReduceInvokable.java | 7 +-
.../api/invokable/operator/FilterInvokable.java | 15 +-
.../invokable/operator/FlatMapInvokable.java | 12 +-
.../api/invokable/operator/MapInvokable.java | 11 +-
.../operator/StreamReduceInvokable.java | 3 +-
.../operator/WindowReduceInvokable.java | 9 +-
.../api/invokable/operator/co/CoInvokable.java | 3 +-
.../invokable/operator/co/CoMapInvokable.java | 12 +-
.../AbstractStreamComponent.java | 19 +-
.../api/streamcomponent/CoStreamTask.java | 22 +-
.../SingleInputAbstractStreamComponent.java | 25 +-
.../streamcomponent/StreamIterationSink.java | 6 +-
.../api/streamcomponent/StreamSink.java | 7 +-
.../api/streamcomponent/StreamTask.java | 4 +-
.../api/streamrecord/StreamRecord.java | 55 ++--
.../streamrecord/StreamRecordSerializer.java | 20 +-
.../partitioner/BroadcastPartitioner.java | 3 +-
.../partitioner/DistributePartitioner.java | 3 +-
.../partitioner/FieldsPartitioner.java | 5 +-
.../partitioner/ForwardPartitioner.java | 3 +-
.../partitioner/GlobalPartitioner.java | 3 +-
.../partitioner/ShufflePartitioner.java | 3 +-
.../partitioner/StreamPartitioner.java | 3 +-
.../util/serialization/FunctionTypeWrapper.java | 17 +-
.../util/serialization/ObjectTypeWrapper.java | 24 +-
.../serialization/TypeSerializerWrapper.java | 35 ++-
.../apache/flink/streaming/api/IterateTest.java | 23 +-
.../apache/flink/streaming/api/PrintTest.java | 2 +-
.../api/collector/DirectedOutputTest.java | 33 ++-
.../api/invokable/operator/CoMapTest.java | 27 +-
.../api/invokable/operator/FilterTest.java | 23 +-
.../api/invokable/operator/FlatMapTest.java | 80 +++---
.../api/invokable/operator/MapTest.java | 250 ++++++++-----------
.../api/streamcomponent/MockRecordWriter.java | 2 +-
.../partitioner/FieldsPartitionerTest.java | 4 +-
.../serialization/TypeSerializationTest.java | 50 ++--
.../examples/wordcount/WordCountCounter.java | 7 +-
.../examples/wordcount/WordCountLocal.java | 2 +-
.../examples/wordcount/WordCountSplitter.java | 12 +-
77 files changed, 577 insertions(+), 669 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 20a3a4a..6f943d1 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -21,10 +21,8 @@ package org.apache.flink.streaming.connectors.flume;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
@@ -32,10 +30,10 @@ import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
-public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
+public abstract class FlumeSink<IN> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(RMQSource.class);
+ private static final Log LOG = LogFactory.getLog(FlumeSink.class);
private transient FlinkRpcClientFacade client;
boolean initDone = false;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 8b102a8..b141efb 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -21,17 +21,16 @@ package org.apache.flink.streaming.connectors.flume;
import java.util.List;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
import org.apache.flume.Context;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.source.AvroSource;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.Status;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
+public abstract class FlumeSource<OUT> extends SourceFunction<OUT> {
private static final long serialVersionUID = 1L;
String host;
@@ -43,7 +42,7 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
}
public class MyAvroSource extends AvroSource {
- Collector<IN> collector;
+ Collector<OUT> collector;
/**
* Sends the AvroFlumeEvent from it's argument list to the Apache Flink
@@ -85,7 +84,7 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
*/
private void collect(AvroFlumeEvent avroEvent) {
byte[] b = avroEvent.getBody().array();
- IN tuple = FlumeSource.this.deserialize(b);
+ OUT tuple = FlumeSource.this.deserialize(b);
if (!closeWithoutSend) {
collector.collect(tuple);
}
@@ -108,7 +107,7 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
* The incoming message in a byte array
* @return The deserialized message in the required format.
*/
- public abstract IN deserialize(byte[] message);
+ public abstract OUT deserialize(byte[] message);
/**
* Configures the AvroSource. Also sets the collector so the application can
@@ -117,7 +116,7 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
* @param collector
* The collector used in the invoke function
*/
- public void configureAvroSource(Collector<IN> collector) {
+ public void configureAvroSource(Collector<OUT> collector) {
avroSource = new MyAvroSource();
avroSource.collector = collector;
@@ -138,7 +137,7 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
* The Collector for sending data to the datastream
*/
@Override
- public void invoke(Collector<IN> collector) throws Exception {
+ public void invoke(Collector<OUT> collector) throws Exception {
configureAvroSource(collector);
avroSource.start();
while (true) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 3c45cd4..414795b 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -20,13 +20,12 @@
package org.apache.flink.streaming.connectors.flume;
import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.StreamExecutionEnvironment;
public class FlumeTopology {
- public static class MyFlumeSink extends FlumeSink<Tuple1<String>> {
+ public static class MyFlumeSink extends FlumeSink<String> {
private static final long serialVersionUID = 1L;
public MyFlumeSink(String host, int port) {
@@ -34,8 +33,8 @@ public class FlumeTopology {
}
@Override
- public byte[] serialize(Tuple1<String> tuple) {
- if (tuple.f0.equals("q")) {
+ public byte[] serialize(String tuple) {
+ if (tuple.equals("q")) {
try {
sendAndClose();
} catch (Exception e) {
@@ -43,12 +42,12 @@ public class FlumeTopology {
+ host, e);
}
}
- return SerializationUtils.serialize((String) tuple.getField(0));
+ return SerializationUtils.serialize(tuple);
}
}
- public static class MyFlumeSource extends FlumeSource<Tuple1<String>> {
+ public static class MyFlumeSource extends FlumeSource<String> {
private static final long serialVersionUID = 1L;
MyFlumeSource(String host, int port) {
@@ -56,14 +55,12 @@ public class FlumeTopology {
}
@Override
- public Tuple1<String> deserialize(byte[] msg) {
+ public String deserialize(byte[] msg) {
String s = (String) SerializationUtils.deserialize(msg);
- Tuple1<String> out = new Tuple1<String>();
- out.f0 = s;
if (s.equals("q")) {
closeWithoutSend();
}
- return out;
+ return s;
}
}
@@ -73,12 +70,12 @@ public class FlumeTopology {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
- DataStream<Tuple1<String>> dataStream1 = env
+ DataStream<String> dataStream1 = env
.addSource(new MyFlumeSource("localhost", 41414))
.print();
@SuppressWarnings("unused")
- DataStream<Tuple1<String>> dataStream2 = env
+ DataStream<String> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyFlumeSink("localhost", 42424));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index 7e3f3db..955e8dc 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -25,10 +25,9 @@ import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN> {
+public abstract class KafkaSink<IN, OUT> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
private kafka.javaapi.producer.Producer<Integer, OUT> producer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
index 623e3b8..228069a 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -29,12 +29,11 @@ import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
-public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
+public abstract class KafkaSource<OUT> extends SourceFunction<OUT> {
private static final long serialVersionUID = 1L;
private final String zkQuorum;
@@ -45,7 +44,7 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
private boolean closeWithoutSend = false;
private boolean sendAndClose = false;
- IN outTuple;
+ OUT outTuple;
public KafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
this.zkQuorum = zkQuorum;
@@ -74,7 +73,7 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
* The Collector for sending data to the dataStream
*/
@Override
- public void invoke(Collector<IN> collector) throws Exception {
+ public void invoke(Collector<OUT> collector) throws Exception {
initializeConnection();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
@@ -85,7 +84,7 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
- IN out = deserialize(it.next().message());
+ OUT out = deserialize(it.next().message());
if (closeWithoutSend) {
break;
}
@@ -104,7 +103,7 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
* The incoming message in a byte array
* @return The deserialized message in the required format.
*/
- public abstract IN deserialize(byte[] message);
+ public abstract OUT deserialize(byte[] message);
/**
* Closes the connection immediately and no further data will be sent.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index d7ed17a..4225cd3 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,16 +21,15 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import java.io.IOException;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public abstract class RMQSink<IN> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(RMQSource.class);
@@ -103,7 +102,7 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
* The tuple used for the serialization
* @return The serialized byte array.
*/
- public abstract byte[] serialize(Tuple tuple);
+ public abstract byte[] serialize(IN tuple);
/**
* Closes the connection.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index dfea55a..8303b1a 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -23,18 +23,16 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
+public abstract class RMQSource<OUT> extends SourceFunction<OUT> {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(RMQSource.class);
@@ -50,7 +48,7 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
private transient QueueingConsumer consumer;
private transient QueueingConsumer.Delivery delivery;
- IN outTuple;
+ OUT outTuple;
public RMQSource(String HOST_NAME, String QUEUE_NAME) {
this.HOST_NAME = HOST_NAME;
@@ -82,7 +80,7 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
* The Collector for sending data to the dataStream
*/
@Override
- public void invoke(Collector<IN> collector) throws Exception {
+ public void invoke(Collector<OUT> collector) throws Exception {
initializeConnection();
while (true) {
@@ -122,7 +120,7 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
* The incoming message in a byte array
* @return The deserialized message in the required format.
*/
- public abstract IN deserialize(byte[] message);
+ public abstract OUT deserialize(byte[] message);
/**
* Closes the connection immediately and no further data will be sent.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index 828c2fa..94ae43f 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -20,15 +20,12 @@
package org.apache.flink.streaming.connectors.rabbitmq;
import org.apache.commons.lang.SerializationUtils;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.StreamExecutionEnvironment;
public class RMQTopology {
- public static final class MyRMQSink extends RMQSink<Tuple1<String>> {
+ public static final class MyRMQSink extends RMQSink<String> {
public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
super(HOST_NAME, QUEUE_NAME);
}
@@ -36,16 +33,16 @@ public class RMQTopology {
private static final long serialVersionUID = 1L;
@Override
- public byte[] serialize(Tuple t) {
- if (t.getField(0).equals("q")) {
+ public byte[] serialize(String t) {
+ if (t.equals("q")) {
sendAndClose();
}
- return SerializationUtils.serialize((String) t.getField(0));
+ return SerializationUtils.serialize((String) t);
}
}
- public static final class MyRMQSource extends RMQSource<Tuple1<String>> {
+ public static final class MyRMQSource extends RMQSource<String> {
public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
super(HOST_NAME, QUEUE_NAME);
@@ -54,14 +51,12 @@ public class RMQTopology {
private static final long serialVersionUID = 1L;
@Override
- public Tuple1<String> deserialize(byte[] t) {
+ public String deserialize(byte[] t) {
String s = (String) SerializationUtils.deserialize(t);
- Tuple1<String> out = new Tuple1<String>();
- out.f0 = s;
if (s.equals("q")) {
closeWithoutSend();
}
- return out;
+ return s;
}
}
@@ -71,12 +66,12 @@ public class RMQTopology {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
- DataStream<Tuple1<String>> dataStream1 = env
+ DataStream<String> dataStream1 = env
.addSource(new MyRMQSource("localhost", "hello"))
.print();
@SuppressWarnings("unused")
- DataStream<Tuple1<String>> dataStream2 = env
+ DataStream<String> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyRMQSink("localhost", "hello"));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index 138fe05..cb868f5 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.connectors.twitter;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.StreamExecutionEnvironment;
@@ -41,7 +40,7 @@ public class TwitterLocal {
* FlatMapFunction to determine the language of tweets if possible
*/
public static class SelectLanguageFlatMap extends
- JSONParseFlatMap<Tuple1<String>, Tuple1<String>> {
+ JSONParseFlatMap<String, String> {
private static final long serialVersionUID = 1L;
@@ -49,9 +48,9 @@ public class TwitterLocal {
* Select the language from the incoming JSON text
*/
@Override
- public void flatMap(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception {
+ public void flatMap(String value, Collector<String> out) throws Exception {
- out.collect(new Tuple1<String>(colationOfNull(getField(value.f0, "lang"))));
+ out.collect(colationOfNull(getField(value, "lang")));
}
/**
@@ -81,7 +80,7 @@ public class TwitterLocal {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(PARALLELISM);
- DataStream<Tuple1<String>> streamSource = env.addSource(new TwitterSource(path, 100),
+ DataStream<String> streamSource = env.addSource(new TwitterSource(path, 100),
SOURCE_PARALLELISM);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index bbff732..bc0995d 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -29,8 +29,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
@@ -46,9 +44,9 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
* Implementation of {@link SourceFunction} specialized to emit tweets from Twitter.
* It can connect to Twitter Streaming API, collect tweets and
*/
-public class TwitterSource extends SourceFunction<Tuple1<String>> {
+public class TwitterSource extends SourceFunction<String> {
- private static final Log LOG = LogFactory.getLog(DataStream.class);
+ private static final Log LOG = LogFactory.getLog(TwitterSource.class);
private static final long serialVersionUID = 1L;
private String authPath;
@@ -88,7 +86,7 @@ public class TwitterSource extends SourceFunction<Tuple1<String>> {
*
*/
@Override
- public void invoke(Collector<Tuple1<String>> collector) throws Exception {
+ public void invoke(Collector<String> collector) throws Exception {
initializeConnection();
@@ -169,7 +167,7 @@ public class TwitterSource extends SourceFunction<Tuple1<String>> {
* @param collector
* @param piece
*/
- protected void collectMessages(Collector<Tuple1<String>> collector, int piece) {
+ protected void collectMessages(Collector<String> collector, int piece) {
if (LOG.isInfoEnabled()) {
LOG.info("Collecting tweets");
@@ -189,7 +187,7 @@ public class TwitterSource extends SourceFunction<Tuple1<String>> {
* @param collector
*
*/
- protected void collectMessages(Collector<Tuple1<String>> collector) {
+ protected void collectMessages(Collector<String> collector) {
if (LOG.isInfoEnabled()) {
LOG.info("Tweet-stream begins");
@@ -204,7 +202,7 @@ public class TwitterSource extends SourceFunction<Tuple1<String>> {
* Put one tweet into the collector.
* @param collector
*/
- protected void collectOneMessage(Collector<Tuple1<String>> collector) {
+ protected void collectOneMessage(Collector<String> collector) {
if (client.isDone()) {
if (LOG.isErrorEnabled()) {
LOG.error("Client connection closed unexpectedly: "
@@ -215,7 +213,7 @@ public class TwitterSource extends SourceFunction<Tuple1<String>> {
try {
String msg = queue.poll(waitSec, TimeUnit.SECONDS);
if (msg != null) {
- collector.collect(new Tuple1<String>(msg));
+ collector.collect(msg);
} else {
if (LOG.isInfoEnabled()) {
LOG.info("Did not receive a message in " + waitSec
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index 805bf06..ee986ea 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.connectors.twitter;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.StreamExecutionEnvironment;
@@ -47,21 +46,21 @@ public class TwitterStreaming {
}
public static class SelectDataFlatMap extends
- JSONParseFlatMap<Tuple1<String>, Tuple5<Long, Long, String, String, String>> {
+ JSONParseFlatMap<String, Tuple5<Long, Long, String, String, String>> {
private static final long serialVersionUID = 1L;
@Override
- public void flatMap(Tuple1<String> value,
+ public void flatMap(String value,
Collector<Tuple5<Long, Long, String, String, String>> out)
throws Exception {
out.collect(new Tuple5<Long, Long, String, String, String>(
- convertDateString2Long(getField(value.f0, "id")),
- convertDateString2LongDate(getField(value.f0, "created_at")),
- colationOfNull(getField(value.f0, "user.name")),
- colationOfNull(getField(value.f0, "text")),
- getField(value.f0, "lang")));
+ convertDateString2Long(getField(value, "id")),
+ convertDateString2LongDate(getField(value, "created_at")),
+ colationOfNull(getField(value, "user.name")),
+ colationOfNull(getField(value, "text")),
+ getField(value, "lang")));
}
protected String colationOfNull(String in){
@@ -94,7 +93,7 @@ public class TwitterStreaming {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(PARALLELISM);
- DataStream<Tuple1<String>> streamSource = env.addSource(
+ DataStream<String> streamSource = env.addSource(
new TwitterSource(path,100), SOURCE_PARALLELISM);
DataStream<Tuple5<Long, Long, String, String, String>> selectedDataStream = streamSource
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index 1bde6a6..d0f1294 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -70,7 +70,7 @@ import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
* The type of the DataStream, i.e., the type of the elements of the
* DataStream.
*/
-public class DataStream<T extends Tuple> {
+public class DataStream<T> {
protected static Integer counter = 0;
protected final StreamExecutionEnvironment environment;
@@ -352,7 +352,7 @@ public class DataStream<T extends Tuple> {
* output type
* @return The transformed DataStream.
*/
- public <R extends Tuple> StreamOperator<T, R> map(MapFunction<T, R> mapper) {
+ public <R> StreamOperator<T, R> map(MapFunction<T, R> mapper) {
return addFunction("map", mapper, new FunctionTypeWrapper<T, Tuple, R>(mapper,
MapFunction.class, 0, -1, 1), new MapInvokable<T, R>(mapper));
}
@@ -372,7 +372,7 @@ public class DataStream<T extends Tuple> {
* {@link CoMapFunction#map2(Tuple)}
* @return The transformed DataStream
*/
- public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(
+ public <T2, R> DataStream<R> coMapWith(
CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
return addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream),
coMapper,
@@ -394,7 +394,7 @@ public class DataStream<T extends Tuple> {
* output type
* @return The transformed DataStream.
*/
- public <R extends Tuple> StreamOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
+ public <R> StreamOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
return addFunction("flatMap", flatMapper, new FunctionTypeWrapper<T, Tuple, R>(flatMapper,
FlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<T, R>(flatMapper));
}
@@ -430,7 +430,7 @@ public class DataStream<T extends Tuple> {
* output type
* @return The modified DataStream.
*/
- public <R extends Tuple> StreamOperator<T, R> batchReduce(GroupReduceFunction<T, R> reducer,
+ public <R> StreamOperator<T, R> batchReduce(GroupReduceFunction<T, R> reducer,
int batchSize) {
return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<T, R>(reducer,
@@ -453,7 +453,7 @@ public class DataStream<T extends Tuple> {
* output type
* @return The modified DataStream.
*/
- public <R extends Tuple> StreamOperator<T, R> windowReduce(GroupReduceFunction<T, R> reducer,
+ public <R> StreamOperator<T, R> windowReduce(GroupReduceFunction<T, R> reducer,
long windowSize) {
return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<T, R>(reducer,
@@ -476,7 +476,7 @@ public class DataStream<T extends Tuple> {
* type of the return stream
* @return the data stream constructed
*/
- private <R extends Tuple> StreamOperator<T, R> addFunction(String functionName,
+ private <R> StreamOperator<T, R> addFunction(String functionName,
final AbstractFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
UserTaskInvokable<T, R> functionInvokable) {
@@ -500,7 +500,7 @@ public class DataStream<T extends Tuple> {
return returnStream;
}
- protected <T1 extends Tuple, T2 extends Tuple, R extends Tuple> DataStream<R> addCoFunction(
+ protected <T1, T2, R> DataStream<R> addCoFunction(
String functionName, DataStream<T1> inputStream1, DataStream<T2> inputStream2,
final AbstractFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
CoInvokable<T1, T2, R> functionInvokable) {
@@ -535,7 +535,7 @@ public class DataStream<T extends Tuple> {
* @param typeNumber
* Number of the type (used at co-functions)
*/
- <X extends Tuple> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
+ <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
String inputID = inputStream.connectIDs.get(i);
StreamPartitioner<X> partitioner = inputStream.partitioners.get(i);
@@ -926,7 +926,7 @@ public class DataStream<T extends Tuple> {
return new IterativeDataStream<T>(this);
}
- protected <R extends Tuple> DataStream<T> addIterationSource(String iterationID) {
+ protected <R> DataStream<T> addIterationSource(String iterationID) {
DataStream<R> returnStream = new DataStream<R>(environment, "iterationSource");
jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
index 1cfb625..bfce834 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.partitioner.ForwardPartitioner;
/**
@@ -29,7 +28,7 @@ import org.apache.flink.streaming.partitioner.ForwardPartitioner;
* @param <T>
* Type of the DataStream
*/
-public class IterativeDataStream<T extends Tuple> extends StreamOperator<T, T> {
+public class IterativeDataStream<T> extends StreamOperator<T, T> {
static Integer iterationCount = 0;
@@ -69,7 +68,7 @@ public class IterativeDataStream<T extends Tuple> extends StreamOperator<T, T> {
* when used with directed emits
*
*/
- public <R extends Tuple> DataStream<T> closeWith(DataStream<T> iterationResult,
+ public <R> DataStream<T> closeWith(DataStream<T> iterationResult,
String iterationName) {
DataStream<R> returnStream = new DataStream<R>(environment, "iterationSink");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 3d49928..a102a00 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
@@ -74,12 +73,12 @@ public class StreamConfig {
// CONFIGS
public void setTypeWrapper(
- TypeSerializerWrapper<? extends Tuple, ? extends Tuple, ? extends Tuple> typeWrapper) {
+ TypeSerializerWrapper<?, ?, ?> typeWrapper) {
config.setBytes("typeWrapper", SerializationUtils.serialize(typeWrapper));
}
@SuppressWarnings("unchecked")
- public <IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> TypeSerializerWrapper<IN1, IN2, OUT> getTypeWrapper() {
+ public <IN1, IN2, OUT> TypeSerializerWrapper<IN1, IN2, OUT> getTypeWrapper() {
byte[] serializedWrapper = config.getBytes("typeWrapper", null);
if (serializedWrapper == null) {
@@ -106,7 +105,7 @@ public class StreamConfig {
return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
}
- public void setUserInvokable(StreamComponentInvokable<? extends Tuple> invokableObject) {
+ public void setUserInvokable(StreamComponentInvokable<?> invokableObject) {
if (invokableObject != null) {
config.setClass(USER_FUNCTION, invokableObject.getClass());
@@ -125,7 +124,7 @@ public class StreamConfig {
// return (Class<? extends T>) config.getClass(USER_FUNCTION, null);
// }
- public <T extends Tuple> StreamComponentInvokable<T> getUserInvokableObject() {
+ public <T> StreamComponentInvokable<T> getUserInvokableObject() {
try {
return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
} catch (Exception e) {
@@ -186,7 +185,7 @@ public class StreamConfig {
}
}
- public <T extends Tuple> OutputSelector<T> getOutputSelector() {
+ public <T> OutputSelector<T> getOutputSelector() {
try {
return deserializeObject(config.getBytes(OUTPUT_SELECTOR, null));
} catch (Exception e) {
@@ -211,14 +210,14 @@ public class StreamConfig {
return config.getInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, 0);
}
- public <T extends Tuple> void setPartitioner(int outputIndex,
+ public <T> void setPartitioner(int outputIndex,
StreamPartitioner<T> partitionerObject) {
config.setBytes(PARTITIONER_OBJECT + outputIndex,
SerializationUtils.serialize(partitionerObject));
}
- public <T extends Tuple> StreamPartitioner<T> getPartitioner(int outputIndex)
+ public <T> StreamPartitioner<T> getPartitioner(int outputIndex)
throws ClassNotFoundException, IOException {
return deserializeObject(config.getBytes(PARTITIONER_OBJECT + outputIndex,
SerializationUtils.serialize(new ShufflePartitioner<T>())));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index 4539126..0e77912 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.function.source.FileSourceFunction;
import org.apache.flink.streaming.api.function.source.FileStreamFunction;
import org.apache.flink.streaming.api.function.source.FromElementsFunction;
@@ -152,11 +151,11 @@ public abstract class StreamExecutionEnvironment {
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The DataStream representing the text file.
*/
- public DataStream<Tuple1<String>> readTextFile(String filePath) {
+ public DataStream<String> readTextFile(String filePath) {
return addSource(new FileSourceFunction(filePath), 1);
}
- public DataStream<Tuple1<String>> readTextFile(String filePath, int parallelism) {
+ public DataStream<String> readTextFile(String filePath, int parallelism) {
return addSource(new FileSourceFunction(filePath), parallelism);
}
@@ -170,11 +169,11 @@ public abstract class StreamExecutionEnvironment {
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The DataStream representing the text file.
*/
- public DataStream<Tuple1<String>> readTextStream(String filePath) {
+ public DataStream<String> readTextStream(String filePath) {
return addSource(new FileStreamFunction(filePath), 1);
}
- public DataStream<Tuple1<String>> readTextStream(String filePath, int parallelism) {
+ public DataStream<String> readTextStream(String filePath, int parallelism) {
return addSource(new FileStreamFunction(filePath), parallelism);
}
@@ -191,15 +190,14 @@ public abstract class StreamExecutionEnvironment {
* type of the returned stream
* @return The DataStream representing the elements.
*/
- public <X extends Serializable> DataStream<Tuple1<X>> fromElements(X... data) {
- DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
+ public <X extends Serializable> DataStream<X> fromElements(X... data) {
+ DataStream<X> returnStream = new DataStream<X>(this, "elements");
try {
- SourceFunction<Tuple1<X>> function = new FromElementsFunction<X>(data);
- jobGraphBuilder.addSource(returnStream.getId(),
- new SourceInvokable<Tuple1<X>>(function),
- new ObjectTypeWrapper<Tuple1<X>, Tuple, Tuple1<X>>(data[0], null, data[0]),
- "source", SerializationUtils.serialize(function), 1);
+ SourceFunction<X> function = new FromElementsFunction<X>(data);
+ jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<X>(function),
+ new ObjectTypeWrapper<X, Tuple, X>(data[0], null, data[0]), "source",
+ SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize elements");
}
@@ -218,22 +216,22 @@ public abstract class StreamExecutionEnvironment {
* type of the returned stream
* @return The DataStream representing the elements.
*/
- public <X extends Serializable> DataStream<Tuple1<X>> fromCollection(Collection<X> data) {
- DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
+ @SuppressWarnings("unchecked")
+ public <X extends Serializable> DataStream<X> fromCollection(Collection<X> data) {
+ DataStream<X> returnStream = new DataStream<X>(this, "elements");
if (data.isEmpty()) {
throw new RuntimeException("Collection must not be empty");
}
try {
- SourceFunction<Tuple1<X>> function = new FromElementsFunction<X>(data);
-
- jobGraphBuilder
- .addSource(returnStream.getId(), new SourceInvokable<Tuple1<X>>(
- new FromElementsFunction<X>(data)),
- new ObjectTypeWrapper<Tuple1<X>, Tuple, Tuple1<X>>(data.toArray()[0],
- null, data.toArray()[0]), "source", SerializationUtils
- .serialize(function), 1);
+ SourceFunction<X> function = new FromElementsFunction<X>(data);
+
+ jobGraphBuilder.addSource(
+ returnStream.getId(),
+ new SourceInvokable<X>(new FromElementsFunction<X>(data)),
+ new ObjectTypeWrapper<X, Tuple, X>((X) data.toArray()[0], null, (X) data
+ .toArray()[0]), "source", SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize collection");
}
@@ -250,7 +248,7 @@ public abstract class StreamExecutionEnvironment {
* The number to stop at (inclusive)
* @return A DataStrean, containing all number in the [from, to] interval.
*/
- public DataStream<Tuple1<Long>> generateSequence(long from, long to) {
+ public DataStream<Long> generateSequence(long from, long to) {
return addSource(new GenSequenceFunction(from, to), 1);
}
@@ -265,7 +263,7 @@ public abstract class StreamExecutionEnvironment {
* type of the returned stream
* @return the data stream constructed
*/
- public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> function, int parallelism) {
+ public <T> DataStream<T> addSource(SourceFunction<T> function, int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this, "source");
try {
@@ -279,7 +277,7 @@ public abstract class StreamExecutionEnvironment {
return returnStream;
}
- public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction) {
+ public <T> DataStream<T> addSource(SourceFunction<T> sourceFunction) {
return addSource(sourceFunction, 1);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
index d8adb97..7edde1c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
@@ -19,9 +19,8 @@
package org.apache.flink.streaming.api;
-import org.apache.flink.api.java.tuple.Tuple;
-public class StreamOperator<IN extends Tuple, OUT extends Tuple> extends DataStream<OUT> {
+public class StreamOperator<IN, OUT > extends DataStream<OUT> {
protected StreamOperator(StreamExecutionEnvironment environment, String operatorType) {
super(environment, operatorType);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index f968b83..73a5749 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -36,7 +35,7 @@ import org.apache.flink.util.StringUtils;
* @param <T>
* Type of the Tuple collected.
*/
-public class DirectedStreamCollector<T extends Tuple> extends StreamCollector<T> {
+public class DirectedStreamCollector<T> extends StreamCollector<T> {
OutputSelector<T> outputSelector;
private static final Log log = LogFactory.getLog(DirectedStreamCollector.class);
@@ -47,7 +46,7 @@ public class DirectedStreamCollector<T extends Tuple> extends StreamCollector<T>
* @param channelID
* Channel ID of the Task
* @param serializationDelegate
- * Serialization delegate used for tuple serialization
+ * Serialization delegate used for serialization
* @param outputSelector
* User defined {@link OutputSelector}
*/
@@ -63,12 +62,12 @@ public class DirectedStreamCollector<T extends Tuple> extends StreamCollector<T>
* Collects and emits a tuple to the outputs by reusing a StreamRecord
* object.
*
- * @param tuple
- * Tuple to be collected and emitted.
+ * @param outputObject
+ * Object to be collected and emitted.
*/
@Override
- public void collect(T tuple) {
- streamRecord.setTuple(tuple);
+ public void collect(T outputObject) {
+ streamRecord.setObject(outputObject);
emit(streamRecord);
}
@@ -80,7 +79,7 @@ public class DirectedStreamCollector<T extends Tuple> extends StreamCollector<T>
* Record to emit.
*/
private void emit(StreamRecord<T> streamRecord) {
- Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getTuple());
+ Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
streamRecord.setId(channelID);
serializationDelegate.setInstance(streamRecord);
for (String outputName : outputNames) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
index c4262b6..6d63385 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
@@ -23,17 +23,15 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
-import org.apache.flink.api.java.tuple.Tuple;
-
/**
* Class for defining an OutputSelector for the directTo operator. Every output
- * tuple of a directed DataStream will run through this operator to select
+ * object of a directed DataStream will run through this operator to select
* outputs.
*
* @param <T>
- * Type parameter of the directed tuples.
+ * Type parameter of the directed tuples/objects.
*/
-public abstract class OutputSelector<T extends Tuple> implements Serializable {
+public abstract class OutputSelector<T> implements Serializable {
private static final long serialVersionUID = 1L;
private Collection<String> outputs;
@@ -42,21 +40,21 @@ public abstract class OutputSelector<T extends Tuple> implements Serializable {
outputs = new ArrayList<String>();
}
- Collection<String> getOutputs(T tuple) {
+ Collection<String> getOutputs(T outputObject) {
outputs.clear();
- select(tuple, outputs);
+ select(outputObject, outputs);
return outputs;
}
/**
- * Method for selecting output names for the emitted tuples when using the
+ * Method for selecting output names for the emitted objects when using the
* directTo operator. The tuple will be emitted only to output names which
* are added to the outputs collection.
*
- * @param tuple
- * Tuple for which the output selection should be made.
+ * @param outputObject
+ * Output object for which the output selection should be made.
* @param outputs
* Selected output names should be added to this collection.
*/
- public abstract void select(T tuple, Collection<String> outputs);
+ public abstract void select(T outputObject, Collection<String> outputs);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index 544a695..4317f75 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -34,14 +33,14 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
/**
- * Collector for tuples in Apache Flink stream processing. The collected tuples
- * will be wrapped with ID in a {@link StreamRecord} and then emitted to the
- * outputs.
+ * Collector for tuples in Apache Flink stream processing. The collected
+ * tuples/obecjts will be wrapped with ID in a {@link StreamRecord} and then
+ * emitted to the outputs.
*
* @param <T>
- * Type of the Tuple collected.
+ * Type of the Tuples/Objects collected.
*/
-public class StreamCollector<T extends Tuple> implements Collector<T> {
+public class StreamCollector<T> implements Collector<T> {
private static final Log LOG = LogFactory.getLog(StreamCollector.class);
@@ -57,9 +56,10 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
* @param channelID
* Channel ID of the Task
* @param serializationDelegate
- * Serialization delegate used for tuple serialization
+ * Serialization delegate used for serialization
*/
- public StreamCollector(int channelID, SerializationDelegate<StreamRecord<T>> serializationDelegate) {
+ public StreamCollector(int channelID,
+ SerializationDelegate<StreamRecord<T>> serializationDelegate) {
this.serializationDelegate = serializationDelegate;
this.streamRecord = new StreamRecord<T>();
@@ -92,15 +92,15 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
}
/**
- * Collects and emits a tuple to the outputs by reusing a StreamRecord
- * object.
+ * Collects and emits a tuple/object to the outputs by reusing a
+ * StreamRecord object.
*
- * @param tuple
- * Tuple to be collected and emitted.
+ * @param outputObject
+ * Object to be collected and emitted.
*/
@Override
- public void collect(T tuple) {
- streamRecord.setTuple(tuple);
+ public void collect(T outputObject) {
+ streamRecord.setObject(outputObject);
emit(streamRecord);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
index 5885cbf..6e4d877 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
@@ -19,9 +19,8 @@
package org.apache.flink.streaming.api.function.co;
import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-public abstract class CoMapFunction<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends AbstractFunction {
+public abstract class CoMapFunction<IN1, IN2, OUT> extends AbstractFunction {
private static final long serialVersionUID = 1L;
public abstract OUT map1(IN1 value);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index 7918e48..026c18e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.function.sink;
-import org.apache.flink.api.java.tuple.Tuple;
/**
* Dummy implementation of the SinkFunction writing every tuple to the standard
@@ -28,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple;
* @param <IN>
* Input tuple type
*/
-public class PrintSinkFunction<IN extends Tuple> extends SinkFunction<IN> {
+public class PrintSinkFunction<IN> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index cc4fb96..867c9f8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -22,12 +22,11 @@ package org.apache.flink.streaming.api.function.sink;
import java.io.Serializable;
import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-public abstract class SinkFunction<IN extends Tuple> extends AbstractFunction implements Serializable {
+public abstract class SinkFunction<IN> extends AbstractFunction implements Serializable {
private static final long serialVersionUID = 1L;
- public abstract void invoke(IN tuple);
+ public abstract void invoke(IN value);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
index 18853b3..3e93a97 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
@@ -22,8 +22,6 @@ package org.apache.flink.streaming.api.function.sink;
import java.io.Serializable;
import java.util.ArrayList;
-import org.apache.flink.api.java.tuple.Tuple;
-
/**
* Abstract class for formatting the output of the writeAsText and writeAsCsv
* functions.
@@ -31,7 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple;
* @param <IN>
* Input tuple type
*/
-public abstract class WriteFormat<IN extends Tuple> implements Serializable {
+public abstract class WriteFormat<IN> implements Serializable {
private static final long serialVersionUID = 1L;
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
index e10a9c8..5fa099f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
@@ -25,15 +25,13 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
-import org.apache.flink.api.java.tuple.Tuple;
-
/**
* Writes tuples in csv format.
*
* @param <IN>
* Input tuple type
*/
-public class WriteFormatAsCsv<IN extends Tuple> extends WriteFormat<IN> {
+public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
index 2d591ae..6a82877 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
@@ -25,15 +25,13 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
-import org.apache.flink.api.java.tuple.Tuple;
-
/**
* Writes tuples in text format.
*
* @param <IN>
* Input tuple type
*/
-public class WriteFormatAsText<IN extends Tuple> extends WriteFormat<IN> {
+public class WriteFormatAsText<IN> extends WriteFormat<IN> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
index d473190..774dd63 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
@@ -23,8 +23,6 @@ import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.util.ArrayList;
-import org.apache.flink.api.java.tuple.Tuple;
-
/**
* Simple implementation of the SinkFunction writing tuples as simple text to
* the file specified by path. Tuples are collected to a list and written to the
@@ -34,7 +32,7 @@ import org.apache.flink.api.java.tuple.Tuple;
* @param <IN>
* Input tuple type
*/
-public abstract class WriteSinkFunction<IN extends Tuple> extends SinkFunction<IN> {
+public abstract class WriteSinkFunction<IN> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
protected final String path;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
index 3797d13..c860c52 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.function.sink;
-import org.apache.flink.api.java.tuple.Tuple;
/**
* Implementation of WriteSinkFunction. Writes tuples to file in equally sized
@@ -28,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple;
* @param <IN>
* Input tuple type
*/
-public class WriteSinkFunctionByBatches<IN extends Tuple> extends WriteSinkFunction<IN> {
+public class WriteSinkFunctionByBatches<IN> extends WriteSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private final int batchSize;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
index cb77e6d..9271f36 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.function.sink;
-import org.apache.flink.api.java.tuple.Tuple;
/**
* Implementation of WriteSinkFunction. Writes tuples to file in every millis
@@ -28,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple;
* @param <IN>
* Input tuple type
*/
-public class WriteSinkFunctionByMillis<IN extends Tuple> extends WriteSinkFunction<IN> {
+public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private final long millis;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index f6c2c72..3a732be 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -23,27 +23,24 @@ import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.util.Collector;
-public class FileSourceFunction extends SourceFunction<Tuple1<String>> {
+public class FileSourceFunction extends SourceFunction<String> {
private static final long serialVersionUID = 1L;
private final String path;
- private Tuple1<String> outTuple = new Tuple1<String>();
public FileSourceFunction(String path) {
this.path = path;
}
@Override
- public void invoke(Collector<Tuple1<String>> collector) throws IOException {
+ public void invoke(Collector<String> collector) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(path));
String line = br.readLine();
while (line != null) {
if (line != "") {
- outTuple.f0 = line;
- collector.collect(outTuple);
+ collector.collect(line);
}
line = br.readLine();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
index edadfc3..9cfb2ce 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
@@ -23,28 +23,25 @@ import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.util.Collector;
-public class FileStreamFunction extends SourceFunction<Tuple1<String>> {
+public class FileStreamFunction extends SourceFunction<String> {
private static final long serialVersionUID = 1L;
private final String path;
- private Tuple1<String> outTuple = new Tuple1<String>();
public FileStreamFunction(String path) {
this.path = path;
}
@Override
- public void invoke(Collector<Tuple1<String>> collector) throws IOException {
+ public void invoke(Collector<String> collector) throws IOException {
while (true) {
BufferedReader br = new BufferedReader(new FileReader(path));
String line = br.readLine();
while (line != null) {
if (line != "") {
- outTuple.f0 = line;
- collector.collect(outTuple);
+ collector.collect(line);
}
line = br.readLine();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index dfe29d2..89f5182 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -22,14 +22,12 @@ package org.apache.flink.streaming.api.function.source;
import java.util.Arrays;
import java.util.Collection;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.util.Collector;
-public class FromElementsFunction<T> extends SourceFunction<Tuple1<T>> {
+public class FromElementsFunction<T> extends SourceFunction<T> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
- Tuple1<T> outTuple = new Tuple1<T>();
public FromElementsFunction(T... elements) {
this.iterable = Arrays.asList(elements);
@@ -40,10 +38,9 @@ public class FromElementsFunction<T> extends SourceFunction<Tuple1<T>> {
}
@Override
- public void invoke(Collector<Tuple1<T>> collector) throws Exception {
+ public void invoke(Collector<T> collector) throws Exception {
for (T element : iterable) {
- outTuple.f0 = element;
- collector.collect(outTuple);
+ collector.collect(element);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index 706295e..d402374 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
* Source Function used to generate the number sequence
*
*/
-public class GenSequenceFunction extends SourceFunction<Tuple1<Long>> {
+public class GenSequenceFunction extends SourceFunction<Long> {
private static final long serialVersionUID = 1L;
@@ -40,10 +40,9 @@ public class GenSequenceFunction extends SourceFunction<Tuple1<Long>> {
}
@Override
- public void invoke(Collector<Tuple1<Long>> collector) throws Exception {
+ public void invoke(Collector<Long> collector) throws Exception {
for (long i = from; i <= to; i++) {
- outTuple.f0 = i;
- collector.collect(outTuple);
+ collector.collect(i);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 971533f..01d4dac 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -20,10 +20,9 @@
package org.apache.flink.streaming.api.function.source;
import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.util.Collector;
-public abstract class SourceFunction<OUT extends Tuple> extends AbstractFunction {
+public abstract class SourceFunction<OUT> extends AbstractFunction {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 81cfa81..92b1ea6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,10 +19,9 @@
package org.apache.flink.streaming.api.invokable;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-public class SinkInvokable<IN extends Tuple> extends StreamRecordInvokable<IN, IN> {
+public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
private SinkFunction<IN> sinkFunction;
@@ -33,16 +32,16 @@ public class SinkInvokable<IN extends Tuple> extends StreamRecordInvokable<IN, I
@Override
protected void immutableInvoke() throws Exception {
- while (recordIterator.next(reuse) != null) {
- sinkFunction.invoke((IN) reuse.getTuple());
+ while ((reuse = recordIterator.next(reuse)) != null) {
+ sinkFunction.invoke((IN) reuse.getObject());
resetReuse();
}
}
@Override
protected void mutableInvoke() throws Exception {
- while (recordIterator.next(reuse) != null) {
- sinkFunction.invoke((IN) reuse.getTuple());
+ while ((reuse = recordIterator.next(reuse)) != null) {
+ sinkFunction.invoke((IN) reuse.getObject());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index 992a25e..c7f0f09 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,10 +21,9 @@ package org.apache.flink.streaming.api.invokable;
import java.io.Serializable;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.function.source.SourceFunction;
-public class SourceInvokable<OUT extends Tuple> extends StreamComponentInvokable<OUT> implements
+public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implements
Serializable {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
index daa7378..c011284 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
@@ -21,10 +21,9 @@ package org.apache.flink.streaming.api.invokable;
import java.io.Serializable;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.util.Collector;
-public abstract class StreamComponentInvokable<OUT extends Tuple> implements Serializable {
+public abstract class StreamComponentInvokable<OUT> implements Serializable {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
index 5be3c30..b1cdde1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
@@ -21,13 +21,12 @@ package org.apache.flink.streaming.api.invokable;
import java.io.IOException;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
-public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple> extends
+public abstract class StreamRecordInvokable<IN, OUT> extends
StreamComponentInvokable<OUT> {
private static final long serialVersionUID = 1L;