You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:10 UTC
[33/51] [abbrv] git commit: [streaming] Updated Streaming function
interfaces to match main project
[streaming] Updated Streaming function interfaces to match main project
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e73ea295
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e73ea295
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e73ea295
Branch: refs/heads/master
Commit: e73ea29593cdb8b4d9a11137b2188ca72673e98c
Parents: 0465d30
Author: gyfora <gy...@gmail.com>
Authored: Mon Aug 4 14:05:07 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:50 2014 +0200
----------------------------------------------------------------------
.../streaming/connectors/flume/FlumeSink.java | 12 +-
.../streaming/connectors/flume/FlumeSource.java | 4 +-
.../streaming/connectors/kafka/KafkaSink.java | 12 +-
.../streaming/connectors/kafka/KafkaSource.java | 4 +-
.../connectors/kafka/KafkaTopology.java | 2 +-
.../streaming/connectors/rabbitmq/RMQSink.java | 12 +-
.../connectors/rabbitmq/RMQSource.java | 4 +-
.../connectors/twitter/TwitterSource.java | 2 +-
.../connectors/twitter/TwitterStreaming.java | 33 +++---
.../streaming/connectors/rabbitmq/RMQTest.java | 46 ++++----
.../apache/flink/streaming/api/DataStream.java | 110 +++++++++++--------
.../streaming/api/collector/OutputSelector.java | 4 +-
.../api/function/co/CoMapFunction.java | 13 ++-
.../api/function/co/RichCoMapFunction.java | 27 +++++
.../api/function/sink/PrintSinkFunction.java | 2 +-
.../api/function/sink/RichSinkFunction.java | 30 +++++
.../api/function/sink/SinkFunction.java | 6 +-
.../api/function/sink/WriteSinkFunction.java | 2 +-
.../api/function/source/FileSourceFunction.java | 2 +-
.../api/function/source/FileStreamFunction.java | 2 +-
.../function/source/FromElementsFunction.java | 2 +-
.../function/source/GenSequenceFunction.java | 2 +-
.../api/function/source/RichSourceFunction.java | 29 +++++
.../api/function/source/SourceFunction.java | 10 +-
.../streaming/api/invokable/SinkInvokable.java | 9 +-
.../api/invokable/SourceInvokable.java | 9 +-
.../operator/BatchReduceInvokable.java | 4 +-
.../api/invokable/operator/FilterInvokable.java | 15 ++-
.../invokable/operator/FlatMapInvokable.java | 15 ++-
.../api/invokable/operator/MapInvokable.java | 15 ++-
.../operator/StreamReduceInvokable.java | 13 ++-
.../operator/WindowReduceInvokable.java | 4 +-
.../invokable/operator/co/CoMapInvokable.java | 9 +-
.../api/streamcomponent/CoStreamTask.java | 4 +-
.../util/serialization/FunctionTypeWrapper.java | 26 ++---
.../apache/flink/streaming/api/IterateTest.java | 2 +-
.../flink/streaming/api/WriteAsCsvTest.java | 2 +-
.../flink/streaming/api/WriteAsTextTest.java | 2 +-
.../api/collector/DirectedOutputTest.java | 4 +-
.../api/invokable/operator/BatchReduceTest.java | 10 +-
.../api/invokable/operator/CoMapTest.java | 4 +-
.../api/invokable/operator/FilterTest.java | 6 +-
.../api/invokable/operator/FlatMapTest.java | 16 +--
.../api/invokable/operator/MapTest.java | 36 +++---
.../streamcomponent/StreamComponentTest.java | 4 +-
.../examples/basictopology/BasicTopology.java | 6 +-
.../examples/cellinfo/CellInfoLocal.java | 4 +-
.../CollaborativeFilteringSink.java | 2 +-
.../CollaborativeFilteringSource.java | 2 +-
.../examples/iterative/kmeans/KMeansSink.java | 2 +-
.../examples/iterative/kmeans/KMeansSource.java | 2 +-
.../iterative/pagerank/PageRankSink.java | 2 +-
.../iterative/pagerank/PageRankSource.java | 2 +-
.../examples/iterative/sssp/SSSPSink.java | 2 +-
.../examples/iterative/sssp/SSSPSource.java | 2 +-
.../flink/streaming/examples/join/JoinSink.java | 2 +-
.../streaming/examples/join/JoinSourceOne.java | 2 +-
.../streaming/examples/join/JoinSourceTwo.java | 2 +-
.../ml/IncrementalLearningSkeleton.java | 4 +-
.../streaming/examples/ml/IncrementalOLS.java | 4 +-
.../window/join/WindowJoinSourceOne.java | 2 +-
.../window/join/WindowJoinSourceTwo.java | 2 +-
.../examples/wordcount/WordCountCounter.java | 6 +-
.../examples/wordcount/WordCountSplitter.java | 6 +-
.../testdata_checksum/ASTopology.data.md5 | 1 +
.../testdata_checksum/MovieLens100k.data.md5 | 1 +
.../resources/testdata_checksum/hamlet.txt.md5 | 1 +
.../testdata_checksum/terainput.txt.md5 | 1 +
68 files changed, 389 insertions(+), 252 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 6f943d1..69e34e6 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -30,7 +30,7 @@ import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
-public abstract class FlumeSink<IN> extends SinkFunction<IN> {
+public abstract class FlumeSink<IN> implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(FlumeSink.class);
@@ -51,18 +51,18 @@ public abstract class FlumeSink<IN> extends SinkFunction<IN> {
* Receives tuples from the Apache Flink {@link DataStream} and forwards them to
* Apache Flume.
*
- * @param tuple
+ * @param value
* The tuple arriving from the datastream
*/
@Override
- public void invoke(IN tuple) {
+ public void invoke(IN value) {
if (!initDone) {
client = new FlinkRpcClientFacade();
client.init(host, port);
}
- byte[] data = serialize(tuple);
+ byte[] data = serialize(value);
if (!closeWithoutSend) {
client.sendDataToFlume(data);
}
@@ -75,11 +75,11 @@ public abstract class FlumeSink<IN> extends SinkFunction<IN> {
/**
* Serializes tuples into byte arrays.
*
- * @param tuple
+ * @param value
* The tuple used for the serialization
* @return The serialized byte array.
*/
- public abstract byte[] serialize(IN tuple);
+ public abstract byte[] serialize(IN value);
private class FlinkRpcClientFacade {
private RpcClient client;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index b141efb..c296319 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -22,7 +22,7 @@ package org.apache.flink.streaming.connectors.flume;
import java.util.List;
import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flume.Context;
import org.apache.flume.channel.ChannelProcessor;
@@ -30,7 +30,7 @@ import org.apache.flume.source.AvroSource;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.Status;
-public abstract class FlumeSource<OUT> extends SourceFunction<OUT> {
+public abstract class FlumeSource<OUT> extends RichSourceFunction<OUT> {
private static final long serialVersionUID = 1L;
String host;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index 955e8dc..183860e 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -27,7 +27,7 @@ import kafka.producer.ProducerConfig;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-public abstract class KafkaSink<IN, OUT> extends SinkFunction<IN> {
+public abstract class KafkaSink<IN, OUT> implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
private kafka.javaapi.producer.Producer<Integer, OUT> producer;
@@ -62,16 +62,16 @@ public abstract class KafkaSink<IN, OUT> extends SinkFunction<IN> {
/**
* Called when new data arrives to the sink, and forwards it to Kafka.
*
- * @param tuple
+ * @param value
* The incoming data
*/
@Override
- public void invoke(IN tuple) {
+ public void invoke(IN value) {
if (!initDone) {
initialize();
}
- OUT out = serialize(tuple);
+ OUT out = serialize(value);
KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(topicId, out);
if (!closeWithoutSend) {
@@ -86,11 +86,11 @@ public abstract class KafkaSink<IN, OUT> extends SinkFunction<IN> {
/**
* Serializes tuples into byte arrays.
*
- * @param tuple
+ * @param value
* The tuple used for the serialization
* @return The serialized byte array.
*/
- public abstract OUT serialize(IN tuple);
+ public abstract OUT serialize(IN value);
/**
* Closes the connection immediately and no further data will be sent.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
index 228069a..6e18b20 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -30,10 +30,10 @@ import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.util.Collector;
-public abstract class KafkaSource<OUT> extends SourceFunction<OUT> {
+public abstract class KafkaSource<OUT> extends RichSourceFunction<OUT> {
private static final long serialVersionUID = 1L;
private final String zkQuorum;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index d605fb8..295f1cc 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -27,7 +27,7 @@ import org.apache.flink.util.Collector;
public class KafkaTopology {
- public static final class MySource extends SourceFunction<Tuple1<String>> {
+ public static final class MySource implements SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index c6f0ef5..2e3a8a6 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -29,7 +29,7 @@ import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
-public abstract class RMQSink<IN> extends SinkFunction<IN> {
+public abstract class RMQSink<IN> implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(RMQSink.class);
@@ -69,18 +69,18 @@ public abstract class RMQSink<IN> extends SinkFunction<IN> {
/**
* Called when new data arrives to the sink, and forwards it to RMQ.
*
- * @param tuple
+ * @param value
* The incoming data
*/
@Override
- public void invoke(IN tuple) {
+ public void invoke(IN value) {
if (!initDone) {
initializeConnection();
}
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- byte[] msg = serialize(tuple);
+ byte[] msg = serialize(value);
if (!closeWithoutSend) {
channel.basicPublish("", QUEUE_NAME, null, msg);
}
@@ -98,11 +98,11 @@ public abstract class RMQSink<IN> extends SinkFunction<IN> {
/**
* Serializes tuples into byte arrays.
*
- * @param tuple
+ * @param value
* The tuple used for the serialization
* @return The serialized byte array.
*/
- public abstract byte[] serialize(IN tuple);
+ public abstract byte[] serialize(IN value);
/**
* Closes the connection.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 8303b1a..fa0be0d 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -24,7 +24,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import com.rabbitmq.client.Channel;
@@ -32,7 +32,7 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
-public abstract class RMQSource<OUT> extends SourceFunction<OUT> {
+public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(RMQSource.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index bc0995d..17e3b02 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -44,7 +44,7 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
* Implementation of {@link SourceFunction} specialized to emit tweets from Twitter.
* It can connect to Twitter Streaming API, collect tweets and
*/
-public class TwitterSource extends SourceFunction<String> {
+public class TwitterSource implements SourceFunction<String> {
private static final Log LOG = LogFactory.getLog(TwitterSource.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index ee986ea..6a464ea 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -31,7 +31,8 @@ public class TwitterStreaming {
private static final int PARALLELISM = 1;
private static final int SOURCE_PARALLELISM = 1;
- public static class TwitterSink extends SinkFunction<Tuple5<Long, Long, String, String, String>> {
+ public static class TwitterSink implements
+ SinkFunction<Tuple5<Long, Long, String, String, String>> {
private static final long serialVersionUID = 1L;
@@ -44,40 +45,38 @@ public class TwitterStreaming {
}
}
-
+
public static class SelectDataFlatMap extends
JSONParseFlatMap<String, Tuple5<Long, Long, String, String, String>> {
private static final long serialVersionUID = 1L;
@Override
- public void flatMap(String value,
- Collector<Tuple5<Long, Long, String, String, String>> out)
+ public void flatMap(String value, Collector<Tuple5<Long, Long, String, String, String>> out)
throws Exception {
out.collect(new Tuple5<Long, Long, String, String, String>(
convertDateString2Long(getField(value, "id")),
convertDateString2LongDate(getField(value, "created_at")),
- colationOfNull(getField(value, "user.name")),
- colationOfNull(getField(value, "text")),
- getField(value, "lang")));
+ colationOfNull(getField(value, "user.name")), colationOfNull(getField(value,
+ "text")), getField(value, "lang")));
}
-
- protected String colationOfNull(String in){
- if(in==null){
+
+ protected String colationOfNull(String in) {
+ if (in == null) {
return " ";
}
return in;
}
-
+
protected Long convertDateString2LongDate(String dateString) {
- if (dateString!=(null)) {
+ if (dateString != (null)) {
String[] dateArray = dateString.split(" ");
- return Long.parseLong(dateArray[2])*100000+Long.parseLong(dateArray[5]);
+ return Long.parseLong(dateArray[2]) * 100000 + Long.parseLong(dateArray[5]);
}
return 0L;
}
-
+
protected Long convertDateString2Long(String dateString) {
if (dateString != null) {
return Long.parseLong(dateString);
@@ -87,14 +86,14 @@ public class TwitterStreaming {
}
public static void main(String[] args) {
-
+
String path = "/home/eszes/git/auth.properties";
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(PARALLELISM);
- DataStream<String> streamSource = env.addSource(
- new TwitterSource(path,100), SOURCE_PARALLELISM);
+ DataStream<String> streamSource = env.addSource(new TwitterSource(path, 100),
+ SOURCE_PARALLELISM);
DataStream<Tuple5<Long, Long, String, String, String>> selectedDataStream = streamSource
.flatMap(new SelectDataFlatMap());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java
index c6a43f2..ad704d7 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.connectors.rabbitmq;
-
import java.util.HashSet;
import java.util.Set;
@@ -28,8 +27,8 @@ import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.junit.Test;
public class RMQTest {
-
- public static final class MySink extends SinkFunction<Tuple1<String>> {
+
+ public static final class MySink implements SinkFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
@Override
@@ -37,38 +36,37 @@ public class RMQTest {
result.add(tuple.f0);
}
-
}
-
+
private static Set<String> expected = new HashSet<String>();
private static Set<String> result = new HashSet<String>();
-
+
@SuppressWarnings("unused")
- private static void fillExpected() {
+ private static void fillExpected() {
expected.add("one");
expected.add("two");
expected.add("three");
expected.add("four");
expected.add("five");
}
-
+
@Test
public void RMQTest1() throws Exception {
-//
-// StreamExecutionEnvironment env = new StreamExecutionEnvironment();
-//
-// DataStream<Tuple1<String>> dataStream1 = env
-// .addSource(new RMQSource("localhost", "hello"), 1)
-// .addSink(new MySink());
-//
-// DataStream<Tuple1<String>> dataStream2 = env
-// .fromElements("one", "two", "three", "four", "five", "q")
-// .addSink(new RMQSink("localhost", "hello"));
-//
-// env.execute();
-//
-// fillExpected();
-//
-// assertEquals(expected, result);
+ //
+ // StreamExecutionEnvironment env = new StreamExecutionEnvironment();
+ //
+ // DataStream<Tuple1<String>> dataStream1 = env
+ // .addSource(new RMQSource("localhost", "hello"), 1)
+ // .addSink(new MySink());
+ //
+ // DataStream<Tuple1<String>> dataStream2 = env
+ // .fromElements("one", "two", "three", "four", "five", "q")
+ // .addSink(new RMQSink("localhost", "hello"));
+ //
+ // env.execute();
+ //
+ // fillExpected();
+ //
+ // assertEquals(expected, result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index 430f09b..7aff259 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -19,19 +19,24 @@
package org.apache.flink.streaming.api;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.java.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
@@ -184,7 +189,8 @@ public class DataStream<T> {
* Sets the mutability of the operator represented by the DataStream. If the
* operator is set to mutable, the tuples received in the user defined
* functions, will be reused after the function call. Setting an operator to
- * mutable greatly reduces garbage collection overhead and thus scalability.
+ * mutable reduces garbage collection overhead and thus increases
+ * scalability.
*
* @param isMutable
* The mutability of the operator.
@@ -309,38 +315,42 @@ public class DataStream<T> {
/**
* Applies a Map transformation on a {@link DataStream}. The transformation
- * calls a {@link RichMapFunction} for each element of the DataStream. Each
- * MapFunction call returns exactly one element.
+ * calls a {@link MapFunction} for each element of the DataStream. Each
+ * MapFunction call returns exactly one element. The user can also extend
+ * {@link RichMapFunction} to gain access to other features provided by the
+ * {@link RichFuntion} interface.
*
* @param mapper
- * The RichMapFunction that is called for each element of the
+ * The MapFunction that is called for each element of the
* DataStream.
* @param <R>
* output type
* @return The transformed DataStream.
*/
- public <R> StreamOperator<R> map(RichMapFunction<T, R> mapper) {
+ public <R> StreamOperator<R> map(MapFunction<T, R> mapper) {
return addFunction("map", mapper, new FunctionTypeWrapper<T, Tuple, R>(mapper,
- RichMapFunction.class, 0, -1, 1), new MapInvokable<T, R>(mapper));
+ MapFunction.class, 0, -1, 1), new MapInvokable<T, R>(mapper));
}
/**
* Applies a FlatMap transformation on a {@link DataStream}. The
- * transformation calls a {@link RichFlatMapFunction} for each element of
- * the DataStream. Each RichFlatMapFunction call can return any number of
- * elements including none.
+ * transformation calls a {@link FlatMapFunction} for each element of the
+ * DataStream. Each FlatMapFunction call can return any number of elements
+ * including none. The user can also extend {@link RichFlatMapFunction} to
+ * gain access to other features provided by the {@link RichFuntion}
+ * interface.
*
* @param flatMapper
- * The RichFlatMapFunction that is called for each element of the
+ * The FlatMapFunction that is called for each element of the
* DataStream
*
* @param <R>
* output type
* @return The transformed DataStream.
*/
- public <R> StreamOperator<R> flatMap(RichFlatMapFunction<T, R> flatMapper) {
+ public <R> StreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
return addFunction("flatMap", flatMapper, new FunctionTypeWrapper<T, Tuple, R>(flatMapper,
- RichFlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<T, R>(flatMapper));
+ FlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<T, R>(flatMapper));
}
/**
@@ -348,7 +358,9 @@ public class DataStream<T> {
* transformation calls a {@link CoMapFunction#map1(Tuple)} for each element
* of the first DataStream (on which .coMapWith was called) and
* {@link CoMapFunction#map2(Tuple)} for each element of the second
- * DataStream. Each CoMapFunction call returns exactly one element.
+ * DataStream. Each CoMapFunction call returns exactly one element. The user
+ * can also extend {@link RichCoMapFunction} to gain access to other
+ * features provided by the {@link RichFuntion} interface.
*
* @param coMapper
* The CoMapFunction used to jointly transform the two input
@@ -367,63 +379,67 @@ public class DataStream<T> {
/**
* Applies a reduce transformation on preset chunks of the DataStream. The
- * transformation calls a {@link RichGroupReduceFunction} for each tuple
- * batch of the predefined size. Each RichGroupReduceFunction call can
- * return any number of elements including none.
+ * transformation calls a {@link GroupReduceFunction} for each tuple batch
+ * of the predefined size. Each GroupReduceFunction call can return any
+ * number of elements including none. The user can also extend
+ * {@link RichGroupReduceFunction} to gain access to other features provided
+ * by the {@link RichFuntion} interface.
*
*
* @param reducer
- * The RichGroupReduceFunction that is called for each tuple
- * batch.
+ * The GroupReduceFunction that is called for each tuple batch.
* @param batchSize
* The number of tuples grouped together in the batch.
* @param <R>
* output type
* @return The modified DataStream.
*/
- public <R> StreamOperator<R> batchReduce(RichGroupReduceFunction<T, R> reducer, int batchSize) {
+ public <R> StreamOperator<R> batchReduce(GroupReduceFunction<T, R> reducer, int batchSize) {
return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
- RichGroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<T, R>(reducer,
+ GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<T, R>(reducer,
batchSize));
}
/**
* Applies a reduce transformation on preset "time" chunks of the
- * DataStream. The transformation calls a {@link RichGroupReduceFunction} on
+ * DataStream. The transformation calls a {@link GroupReduceFunction} on
* records received during the predefined time window. The window shifted
- * after each reduce call. Each RichGroupReduceFunction call can return any
- * number of elements including none.
+ * after each reduce call. Each GroupReduceFunction call can return any
+ * number of elements including none.The user can also extend
+ * {@link RichGroupReduceFunction} to gain access to other features provided
+ * by the {@link RichFuntion} interface.
*
*
* @param reducer
- * The RichGroupReduceFunction that is called for each time
- * window.
+ * The GroupReduceFunction that is called for each time window.
* @param windowSize
* The time window to run the reducer on, in milliseconds.
* @param <R>
* output type
* @return The modified DataStream.
*/
- public <R> StreamOperator<R> windowReduce(RichGroupReduceFunction<T, R> reducer, long windowSize) {
+ public <R> StreamOperator<R> windowReduce(GroupReduceFunction<T, R> reducer, long windowSize) {
return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
- RichGroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<T, R>(reducer,
+ GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<T, R>(reducer,
windowSize));
}
/**
* Applies a Filter transformation on a {@link DataStream}. The
- * transformation calls a {@link RichFilterFunction} for each element of the
+ * transformation calls a {@link FilterFunction} for each element of the
* DataStream and retains only those element for which the function returns
- * true. Elements for which the function returns false are filtered.
+ * true. Elements for which the function returns false are filtered. The
+ * user can also extend {@link RichFilterFunction} to gain access to other
+ * features provided by the {@link RichFuntion} interface.
*
* @param filter
- * The RichFilterFunction that is called for each element of the
+ * The FilterFunction that is called for each element of the
* DataSet.
* @return The filtered DataStream.
*/
- public StreamOperator<T> filter(RichFilterFunction<T> filter) {
+ public StreamOperator<T> filter(FilterFunction<T> filter) {
return addFunction("filter", filter, new FunctionTypeWrapper<T, Tuple, T>(filter,
- RichFilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
+ FilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
}
/**
@@ -745,14 +761,15 @@ public class DataStream<T> {
/**
* Initiates an iterative part of the program that executes multiple times
* and feeds back data streams. The iterative part needs to be closed by
- * calling {@link IterativeDataStream#closeWith(DataStream)}. The data
- * stream given to the {@code closeWith(DataStream)} method is the data
- * stream that will be fed back and used as the input for the iteration
+ * calling {@link IterativeDataStream#closeWith(DataStream)}. The
+ * transformation of this IterativeDataStream will be the iteration head.
+ * The data stream given to the {@code closeWith(DataStream)} method is the
+ * data stream that will be fed back and used as the input for the iteration
* head. Unlike in batch processing by default the output of the iteration
* stream is directed to both to the iteration head and the next component.
* To direct tuples to the iteration head or the output specifically one can
- * use the {@code directTo(OutputSelector)} while referencing the iteration
- * head as 'iterate'.
+ * use the {@code split(OutputSelector)} on the iteration tail while
+ * referencing the iteration head as 'iterate'.
*
* The iteration edge will be partitioned the same way as the first input of
* the iteration head.
@@ -786,8 +803,8 @@ public class DataStream<T> {
* type of the return stream
* @return the data stream constructed
*/
- private <R> StreamOperator<R> addFunction(String functionName,
- final AbstractRichFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
+ private <R> StreamOperator<R> addFunction(String functionName, final Function function,
+ TypeSerializerWrapper<T, Tuple, R> typeWrapper,
UserTaskInvokable<T, R> functionInvokable) {
DataStream<T> inputStream = this.copy();
@@ -796,7 +813,8 @@ public class DataStream<T> {
try {
jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, typeWrapper,
- functionName, SerializationUtils.serialize(function), degreeOfParallelism);
+ functionName, SerializationUtils.serialize((Serializable) function),
+ degreeOfParallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize user defined function");
}
@@ -816,16 +834,16 @@ public class DataStream<T> {
}
protected <T1, T2, R> StreamOperator<R> addCoFunction(String functionName,
- DataStream<T1> inputStream1, DataStream<T2> inputStream2,
- final AbstractRichFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
- CoInvokable<T1, T2, R> functionInvokable) {
+ DataStream<T1> inputStream1, DataStream<T2> inputStream2, final Function function,
+ TypeSerializerWrapper<T1, T2, R> typeWrapper, CoInvokable<T1, T2, R> functionInvokable) {
StreamOperator<R> returnStream = new TwoInputStreamOperator<T1, T2, R>(environment,
functionName);
try {
jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, typeWrapper,
- functionName, SerializationUtils.serialize(function), degreeOfParallelism);
+ functionName, SerializationUtils.serialize((Serializable) function),
+ degreeOfParallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize user defined function");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
index 6d63385..798d8fa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
@@ -51,10 +51,10 @@ public abstract class OutputSelector<T> implements Serializable {
* directTo operator. The tuple will be emitted only to output names which
* are added to the outputs collection.
*
- * @param outputObject
+ * @param value
* Output object for which the output selection should be made.
* @param outputs
* Selected output names should be added to this collection.
*/
- public abstract void select(T outputObject, Collection<String> outputs);
+ public abstract void select(T value, Collection<String> outputs);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
index 8404a80..d1ef3d0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
@@ -18,12 +18,13 @@
package org.apache.flink.streaming.api.function.co;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
+import java.io.Serializable;
+import org.apache.flink.api.common.functions.Function;
-public abstract class CoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction {
- private static final long serialVersionUID = 1L;
-
- public abstract OUT map1(IN1 value);
- public abstract OUT map2(IN2 value);
+public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+ public OUT map1(IN1 value);
+
+ public OUT map2(IN2 value);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
new file mode 100755
index 0000000..c93fc81
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichCoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
+ CoMapFunction<IN1, IN2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index 026c18e..4728800 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -27,7 +27,7 @@ package org.apache.flink.streaming.api.function.sink;
* @param <IN>
* Input tuple type
*/
-public class PrintSinkFunction<IN> extends SinkFunction<IN> {
+public class PrintSinkFunction<IN> implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
new file mode 100755
index 0000000..4bbbdc4
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
@@ -0,0 +1,30 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ public abstract void invoke(IN value);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index 668837f..24f45e8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -21,11 +21,9 @@ package org.apache.flink.streaming.api.function.sink;
import java.io.Serializable;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.Function;
-public abstract class SinkFunction<IN> extends AbstractRichFunction implements Serializable {
-
- private static final long serialVersionUID = 1L;
+public interface SinkFunction<IN> extends Function, Serializable {
public abstract void invoke(IN value);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
index 774dd63..1cfcfaf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
@@ -32,7 +32,7 @@ import java.util.ArrayList;
* @param <IN>
* Input tuple type
*/
-public abstract class WriteSinkFunction<IN> extends SinkFunction<IN> {
+public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
protected final String path;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index 3a732be..6c8cd3a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -25,7 +25,7 @@ import java.io.IOException;
import org.apache.flink.util.Collector;
-public class FileSourceFunction extends SourceFunction<String> {
+public class FileSourceFunction implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
private final String path;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
index 9cfb2ce..799e700 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
@@ -25,7 +25,7 @@ import java.io.IOException;
import org.apache.flink.util.Collector;
-public class FileStreamFunction extends SourceFunction<String> {
+public class FileStreamFunction implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
private final String path;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index 89f5182..98e012b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -24,7 +24,7 @@ import java.util.Collection;
import org.apache.flink.util.Collector;
-public class FromElementsFunction<T> extends SourceFunction<T> {
+public class FromElementsFunction<T> implements SourceFunction<T> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index d402374..ad7586c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
* Source Function used to generate the number sequence
*
*/
-public class GenSequenceFunction extends SourceFunction<Long> {
+public class GenSequenceFunction implements SourceFunction<Long> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
new file mode 100755
index 0000000..94311a1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements
+ SourceFunction<OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 44e3387..0bdd7d6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -19,12 +19,12 @@
package org.apache.flink.streaming.api.function.source;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.util.Collector;
+import java.io.Serializable;
-public abstract class SourceFunction<OUT> extends AbstractRichFunction {
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
- private static final long serialVersionUID = 1L;
+public interface SourceFunction<OUT> extends Function, Serializable {
- public abstract void invoke(Collector<OUT> collector) throws Exception;
+ public void invoke(Collector<OUT> collector) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index b733362..50bdd42 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.invokable;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
@@ -48,11 +49,15 @@ public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
@Override
public void open(Configuration parameters) throws Exception {
- sinkFunction.open(parameters);
+ if (sinkFunction instanceof RichFunction) {
+ ((RichFunction) sinkFunction).open(parameters);
+ }
}
@Override
public void close() throws Exception {
- sinkFunction.close();
+ if (sinkFunction instanceof RichFunction) {
+ ((RichFunction) sinkFunction).close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index d049bbf..d7710ae 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.invokable;
import java.io.Serializable;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.source.SourceFunction;
@@ -43,11 +44,15 @@ public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implemen
@Override
public void open(Configuration parameters) throws Exception {
- sourceFunction.open(parameters);
+ if (sourceFunction instanceof RichFunction) {
+ ((RichFunction) sourceFunction).open(parameters);
+ }
}
@Override
public void close() throws Exception {
- sourceFunction.close();
+ if (sourceFunction instanceof RichFunction) {
+ ((RichFunction) sourceFunction).close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 61ba5a9..c3c861b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -22,13 +22,13 @@ package org.apache.flink.streaming.api.invokable.operator;
import java.util.ArrayList;
import java.util.List;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private int batchSize;
- public BatchReduceInvokable(RichGroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
+ public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
this.reducer = reduceFunction;
this.batchSize = batchSize;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 56ab680..b64f08b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -19,7 +19,8 @@
package org.apache.flink.streaming.api.invokable.operator;
-import org.apache.flink.api.java.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
@@ -27,9 +28,9 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
- RichFilterFunction<IN> filterFunction;
+ FilterFunction<IN> filterFunction;
- public FilterInvokable(RichFilterFunction<IN> filterFunction) {
+ public FilterInvokable(FilterFunction<IN> filterFunction) {
this.filterFunction = filterFunction;
}
@@ -54,11 +55,15 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
@Override
public void open(Configuration parameters) throws Exception {
- filterFunction.open(parameters);
+ if (filterFunction instanceof RichFunction) {
+ ((RichFunction) filterFunction).open(parameters);
+ }
}
@Override
public void close() throws Exception {
- filterFunction.close();
+ if (filterFunction instanceof RichFunction) {
+ ((RichFunction) filterFunction).close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 7796230..cc9fcc1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -19,16 +19,17 @@
package org.apache.flink.streaming.api.invokable.operator;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
- private RichFlatMapFunction<IN, OUT> flatMapper;
+ private FlatMapFunction<IN, OUT> flatMapper;
- public FlatMapInvokable(RichFlatMapFunction<IN, OUT> flatMapper) {
+ public FlatMapInvokable(FlatMapFunction<IN, OUT> flatMapper) {
this.flatMapper = flatMapper;
}
@@ -49,11 +50,15 @@ public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
@Override
public void open(Configuration parameters) throws Exception {
- flatMapper.open(parameters);
+ if (flatMapper instanceof RichFunction) {
+ ((RichFunction) flatMapper).open(parameters);
+ }
}
@Override
public void close() throws Exception {
- flatMapper.close();
+ if (flatMapper instanceof RichFunction) {
+ ((RichFunction) flatMapper).close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 23fc31e..9dbb678 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -19,16 +19,17 @@
package org.apache.flink.streaming.api.invokable.operator;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
- private RichMapFunction<IN, OUT> mapper;
+ private MapFunction<IN, OUT> mapper;
- public MapInvokable(RichMapFunction<IN, OUT> mapper) {
+ public MapInvokable(MapFunction<IN, OUT> mapper) {
this.mapper = mapper;
}
@@ -49,11 +50,15 @@ public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
@Override
public void open(Configuration parameters) throws Exception {
- mapper.open(parameters);
+ if (mapper instanceof RichFunction) {
+ ((RichFunction) mapper).open(parameters);
+ }
}
@Override
public void close() throws Exception {
- mapper.close();
+ if (mapper instanceof RichFunction) {
+ ((RichFunction) mapper).close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index 1a402a1..548a298 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -21,25 +21,30 @@ package org.apache.flink.streaming.api.invokable.operator;
import java.util.Iterator;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
public abstract class StreamReduceInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
- protected RichGroupReduceFunction<IN, OUT> reducer;
+ protected GroupReduceFunction<IN, OUT> reducer;
protected BatchIterator<IN> userIterator;
protected BatchIterable userIterable;
@Override
public void open(Configuration parameters) throws Exception {
userIterable = new BatchIterable();
- reducer.open(parameters);
+ if (reducer instanceof RichFunction) {
+ ((RichFunction) reducer).open(parameters);
+ }
}
@Override
public void close() throws Exception {
- reducer.close();
+ if (reducer instanceof RichFunction) {
+ ((RichFunction) reducer).close();
+ }
}
protected class BatchIterable implements Iterable<IN> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 430a68e..309656b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -22,7 +22,7 @@ package org.apache.flink.streaming.api.invokable.operator;
import java.util.ArrayList;
import java.util.List;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
public class WindowReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
@@ -30,7 +30,7 @@ public class WindowReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OU
volatile boolean isRunning;
boolean window;
- public WindowReduceInvokable(RichGroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
+ public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
this.reducer = reduceFunction;
this.windowSize = windowSize;
this.window = true;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index ac71b22..cd51081 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.invokable.operator.co;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
@@ -56,12 +57,16 @@ public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
@Override
public void open(Configuration parameters) throws Exception {
- mapper.open(parameters);
+ if (mapper instanceof RichFunction) {
+ ((RichFunction) mapper).open(parameters);
+ }
}
@Override
public void close() throws Exception {
- mapper.close();
+ if (mapper instanceof RichFunction) {
+ ((RichFunction) mapper).close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index 0e03915..c06e664 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.MutableReader;
@@ -82,7 +82,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- private void setDeserializers(Object function, Class<? extends AbstractRichFunction> clazz) {
+ private void setDeserializers(Object function, Class<? extends Function> clazz) {
TypeInformation<IN1> inputTypeInfo1 = (TypeInformation<IN1>) typeWrapper
.getInputTypeInfo1();
inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
index 54471ae..2ac6a47 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
@@ -21,22 +21,20 @@ package org.apache.flink.streaming.util.serialization;
import java.io.IOException;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-public class FunctionTypeWrapper<IN1, IN2, OUT> extends
- TypeSerializerWrapper<IN1, IN2, OUT> {
+public class FunctionTypeWrapper<IN1, IN2, OUT> extends TypeSerializerWrapper<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
- private AbstractRichFunction function;
- private Class<? extends AbstractRichFunction> functionSuperClass;
+ private Function function;
+ private Class<? extends Function> functionSuperClass;
private int inTypeParameter1;
private int inTypeParameter2;
private int outTypeParameter;
- public FunctionTypeWrapper(AbstractRichFunction function,
- Class<? extends AbstractRichFunction> functionSuperClass, int inTypeParameter1,
- int inTypeParameter2, int outTypeParameter) {
+ public FunctionTypeWrapper(Function function, Class<? extends Function> functionSuperClass,
+ int inTypeParameter1, int inTypeParameter2, int outTypeParameter) {
this.function = function;
this.functionSuperClass = functionSuperClass;
this.inTypeParameter1 = inTypeParameter1;
@@ -54,18 +52,18 @@ public class FunctionTypeWrapper<IN1, IN2, OUT> extends
@Override
protected void setTypeInfo() {
if (inTypeParameter1 != -1) {
- inTypeInfo1 = TypeExtractor.createTypeInfo(functionSuperClass,
- function.getClass(), inTypeParameter1, null, null);
+ inTypeInfo1 = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
+ inTypeParameter1, null, null);
}
if (inTypeParameter2 != -1) {
- inTypeInfo2 = TypeExtractor.createTypeInfo(functionSuperClass,
- function.getClass(), inTypeParameter2, null, null);
+ inTypeInfo2 = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
+ inTypeParameter2, null, null);
}
if (outTypeParameter != -1) {
- outTypeInfo = TypeExtractor.createTypeInfo(functionSuperClass,
- function.getClass(), outTypeParameter, null, null);
+ outTypeInfo = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
+ outTypeParameter, null, null);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 68403a8..2b3edc2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -64,7 +64,7 @@ public class IterateTest {
}
- public static final class MySink extends SinkFunction<Boolean> {
+ public static final class MySink implements SinkFunction<Boolean> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
index e296733..28cbc6e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
@@ -48,7 +48,7 @@ public class WriteAsCsvTest {
private static List<String> expected4 = new ArrayList<String>();
private static List<String> expected5 = new ArrayList<String>();
- public static final class MySource1 extends SourceFunction<Tuple1<Integer>> {
+ public static final class MySource1 implements SourceFunction<Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index 0f22262..337ca4e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -50,7 +50,7 @@ public class WriteAsTextTest {
private static List<String> expected4 = new ArrayList<String>();
private static List<String> expected5 = new ArrayList<String>();
- public static final class MySource1 extends SourceFunction<Tuple1<Integer>> {
+ public static final class MySource1 implements SourceFunction<Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index e2991b4..f2c647f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -51,7 +51,7 @@ public class DirectedOutputTest {
}
}
- private static class EvenSink extends SinkFunction<Long> {
+ private static class EvenSink implements SinkFunction<Long> {
private static final long serialVersionUID = 1L;
@@ -61,7 +61,7 @@ public class DirectedOutputTest {
}
}
- private static class OddSink extends SinkFunction<Long> {
+ private static class OddSink implements SinkFunction<Long> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
index c23c9a7..49f4509 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.LocalStreamEnvironment;
@@ -42,8 +42,8 @@ public class BatchReduceTest {
private static final int PARALlELISM = 1;
private static final long MEMORYSIZE = 32;
- public static final class MyBatchReduce extends
- RichGroupReduceFunction<Tuple1<Double>, Tuple1<Double>> {
+ public static final class MyBatchReduce implements
+ GroupReduceFunction<Tuple1<Double>, Tuple1<Double>> {
private static final long serialVersionUID = 1L;
@Override
@@ -62,7 +62,7 @@ public class BatchReduceTest {
}
}
- public static final class MySink extends SinkFunction<Tuple1<Double>> {
+ public static final class MySink implements SinkFunction<Tuple1<Double>> {
private static final long serialVersionUID = 1L;
@Override
@@ -72,7 +72,7 @@ public class BatchReduceTest {
}
- public static final class MySource extends SourceFunction<Tuple1<Double>> {
+ public static final class MySource implements SourceFunction<Tuple1<Double>> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index 2c3f480..82a5f89 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -39,7 +39,7 @@ public class CoMapTest implements Serializable {
private static Set<String> result;
private static Set<String> expected = new HashSet<String>();
- private final static class EmptySink extends SinkFunction<Boolean> {
+ private final static class EmptySink implements SinkFunction<Boolean> {
private static final long serialVersionUID = 1L;
@Override
@@ -47,7 +47,7 @@ public class CoMapTest implements Serializable {
}
}
- private final static class MyCoMap extends
+ private final static class MyCoMap implements
CoMapFunction<String, Integer, Boolean> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index 2d4fe7a..0cba0bf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
-import org.apache.flink.api.java.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.util.LogUtils;
@@ -36,7 +36,7 @@ public class FilterTest implements Serializable {
private static Set<Integer> set = new HashSet<Integer>();
- private static class MySink extends SinkFunction<Integer> {
+ private static class MySink implements SinkFunction<Integer> {
private static final long serialVersionUID = 1L;
@Override
@@ -45,7 +45,7 @@ public class FilterTest implements Serializable {
}
}
- static class MyFilter extends RichFilterFunction<Integer> {
+ static class MyFilter implements FilterFunction<Integer> {
private static final long serialVersionUID = 1L;
@Override