You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/03/22 18:35:25 UTC
[1/4] flink git commit: [FLINK-1688] [streaming] Socket client sink
added
Repository: flink
Updated Branches:
refs/heads/master 244e5d5f8 -> 35f34162a
[FLINK-1688] [streaming] Socket client sink added
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3bc7852
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3bc7852
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3bc7852
Branch: refs/heads/master
Commit: a3bc785232d3071ac5cb76ab20db9c30a3d2a22a
Parents: 244e5d5
Author: miguel0afd <ma...@stratio.com>
Authored: Sat Mar 14 23:54:38 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Sun Mar 22 08:46:23 2015 +0100
----------------------------------------------------------------------
.../connectors/socket/SocketClientSink.java | 141 +++++++++++++++++++
1 file changed, 141 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a3bc7852/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
new file mode 100644
index 0000000..72ba4f2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.socket;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Socket client that acts as a streaming sink. The data is sent to a Socket.
+ *
+ * @param <IN> data to be written into the Socket.
+ */
+public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Class logger
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
+ private final String hostName;
+ private final int port;
+ private final SerializationSchema<IN, byte[]> scheme;
+ private transient Socket client;
+ private transient DataOutputStream dataOutputStream;
+
+ /**
+ * Default constructor.
+ *
+ * @param hostName Host of the Socket server.
+ * @param port Port of the Socket.
+ * @param schema Schema of the data.
+ */
+ public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+ this.hostName = hostName;
+ this.port = port;
+ this.scheme = schema;
+ }
+
+ /**
+ * Initializes the connection to Socket.
+ */
+ public void intializeConnection() {
+ OutputStream outputStream;
+ try {
+ client = new Socket(hostName, port);
+ outputStream = client.getOutputStream();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ dataOutputStream = new DataOutputStream(outputStream);
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to Socket.
+ *
+ * @param value
+ * The incoming data
+ */
+ @Override
+ public void invoke(IN value) {
+ byte[] msg = scheme.serialize(value);
+ try {
+ dataOutputStream.write(msg);
+ } catch (IOException e) {
+ if(LOG.isErrorEnabled()){
+ LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
+ }
+ }
+ }
+
+ /**
+ * Closes the connection of the Socket client.
+ */
+ private void closeConnection(){
+ try {
+ client.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Error while closing connection with socket server at "
+ + hostName + ":" + port, e);
+ } finally {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ LOG.error("Cannot close connection with socket server at "
+ + hostName + ":" + port, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Initialize the connection with the Socket in the server.
+ * @param parameters Configuration.
+ */
+ @Override
+ public void open(Configuration parameters) {
+ intializeConnection();
+ }
+
+ /**
+ * Closes the connection with the Socket server.
+ */
+ @Override
+ public void close() {
+ closeConnection();
+ }
+
+ /**
+ * Closes the connection with the Socket server.
+ */
+ @Override
+ public void cancel() {
+ close();
+ }
+
+}
[2/4] flink git commit: [FLINK-1688] [streaming] [api-extending]
Socket Client Sink added to the DataStream API
Posted by mb...@apache.org.
[FLINK-1688] [streaming] [api-extending] Socket Client Sink added to the DataStream API
Moved Serialization Schemas from connectors to core
Minor cleanups for Socket Client Sink
This closes #484
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11796495
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11796495
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11796495
Branch: refs/heads/master
Commit: 11796495762893f2c8b1f744321d0c4b5db0b881
Parents: a3bc785
Author: mbalassi <mb...@apache.org>
Authored: Sun Mar 22 10:27:30 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Sun Mar 22 12:11:05 2015 +0100
----------------------------------------------------------------------
.../flink-streaming-connectors/pom.xml | 2 +-
.../streaming/connectors/ConnectorSource.java | 2 +-
.../streaming/connectors/flume/FlumeSink.java | 8 +-
.../streaming/connectors/flume/FlumeSource.java | 2 +-
.../connectors/flume/FlumeTopology.java | 4 +-
.../connectors/kafka/KafkaConsumerExample.java | 2 +-
.../connectors/kafka/KafkaProducerExample.java | 2 +-
.../kafka/KafkaSimpleConsumerExample.java | 2 +-
.../connectors/kafka/api/KafkaSink.java | 10 +-
.../connectors/kafka/api/KafkaSource.java | 2 +-
.../kafka/api/simple/KafkaTopicUtils.java | 2 -
.../kafka/api/simple/PersistentKafkaSource.java | 2 +-
.../streaming/connectors/rabbitmq/RMQSink.java | 8 +-
.../connectors/rabbitmq/RMQSource.java | 2 +-
.../connectors/rabbitmq/RMQTopology.java | 4 +-
.../connectors/socket/SocketClientSink.java | 141 -------------------
.../connectors/util/DeserializationSchema.java | 42 ------
.../util/JavaDefaultStringSchema.java | 41 ------
.../streaming/connectors/util/RawSchema.java | 39 -----
.../connectors/util/SerializationSchema.java | 33 -----
.../connectors/util/SimpleStringSchema.java | 40 ------
.../streaming/connectors/kafka/KafkaITCase.java | 2 +-
.../streaming/api/datastream/DataStream.java | 16 +++
.../api/function/sink/SocketClientSink.java | 138 ++++++++++++++++++
.../serialization/DeserializationSchema.java | 42 ++++++
.../serialization/JavaDefaultStringSchema.java | 41 ++++++
.../streaming/util/serialization/RawSchema.java | 39 +++++
.../util/serialization/SerializationSchema.java | 33 +++++
.../util/serialization/SimpleStringSchema.java | 40 ++++++
.../flink/streaming/api/scala/DataStream.scala | 8 ++
30 files changed, 384 insertions(+), 365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
index 4408ba0..8155100 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
@@ -35,7 +35,7 @@ under the License.
<packaging>jar</packaging>
- <!-- Allow users to pass custom kafka versions -->
+ <!-- Allow users to pass custom connector versions -->
<properties>
<kafka.version>0.8.2.0</kafka.version>
<rabbitmq.version>3.3.1</rabbitmq.version>
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
index 1623943..a7b0b06 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements
GenericSourceFunction<OUT> {
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 86fd1b1..27074ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.flume;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
@@ -39,12 +39,12 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
boolean initDone = false;
String host;
int port;
- SerializationSchema<IN, byte[]> scheme;
+ SerializationSchema<IN, byte[]> schema;
public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
this.host = host;
this.port = port;
- this.scheme = schema;
+ this.schema = schema;
}
/**
@@ -57,7 +57,7 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
@Override
public void invoke(IN value) {
- byte[] data = scheme.serialize(value);
+ byte[] data = schema.serialize(value);
client.sendDataToFlume(data);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 2a321a2..00661ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -21,7 +21,7 @@ import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.flume.Context;
import org.apache.flume.channel.ChannelProcessor;
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 3cfd7d4..7c979d5 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.flume;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
-import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class FlumeTopology {
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index dd1221d..6a95b0c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
public class KafkaConsumerExample {
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index 1fe759a..e7abf11 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -21,7 +21,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.util.Collector;
public class KafkaProducerExample {
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
index 47c5a33..2fd31ed 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
public class KafkaSimpleConsumerExample {
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 1753561..be9eb57 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils;
import com.google.common.base.Preconditions;
@@ -50,7 +50,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
private Properties props;
private String topicId;
private String zookeeperAddress;
- private SerializationSchema<IN, byte[]> scheme;
+ private SerializationSchema<IN, byte[]> schema;
private SerializableKafkaPartitioner partitioner;
private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
@@ -91,7 +91,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
this.zookeeperAddress = zookeeperAddress;
this.topicId = topicId;
- this.scheme = serializationSchema;
+ this.schema = serializationSchema;
this.partitioner = partitioner;
}
@@ -103,7 +103,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
this.zookeeperAddress = zookeeperAddress;
this.topicId = topicId;
- this.scheme = serializationSchema;
+ this.schema = serializationSchema;
this.partitionerClass = partitioner;
}
@@ -150,7 +150,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
*/
@Override
public void invoke(IN next) {
- byte[] serialized = scheme.serialize(next);
+ byte[] serialized = schema.serialize(next);
producer.send(new KeyedMessage<IN, byte[]>(topicId, next, serialized));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 4eff870..ae6c169 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -31,7 +31,7 @@ import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
index 7a10ed3..9e09ea8 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
@@ -82,8 +82,6 @@ public class KafkaTopicUtils {
Broker leader = JavaConversions.asJavaCollection(partitionMetadata.isr()).iterator().next();
- // TODO for Kafka version 8.2.0
- // return leader.connectionString();
return leader.connectionString();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 8ec298a..97225dc 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffs
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index dae9c6d..48f5e60 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,12 +39,12 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
- private SerializationSchema<IN, byte[]> scheme;
+ private SerializationSchema<IN, byte[]> schema;
public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]> schema) {
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
- this.scheme = schema;
+ this.schema = schema;
}
/**
@@ -72,7 +72,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
@Override
public void invoke(IN value) {
try {
- byte[] msg = scheme.serialize(value);
+ byte[] msg = schema.serialize(value);
channel.basicPublish("", QUEUE_NAME, null, msg);
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 12ad3d6..03b6d10 100755
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index a6ca9ae..0f06235 100755
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
-import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class RMQTopology {
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
deleted file mode 100644
index 72ba4f2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.socket;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Socket client that acts as a streaming sink. The data is sent to a Socket.
- *
- * @param <IN> data to be written into the Socket.
- */
-public class SocketClientSink<IN> extends RichSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- /**
- * Class logger
- */
- private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
-
- private final String hostName;
- private final int port;
- private final SerializationSchema<IN, byte[]> scheme;
- private transient Socket client;
- private transient DataOutputStream dataOutputStream;
-
- /**
- * Default constructor.
- *
- * @param hostName Host of the Socket server.
- * @param port Port of the Socket.
- * @param schema Schema of the data.
- */
- public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
- this.hostName = hostName;
- this.port = port;
- this.scheme = schema;
- }
-
- /**
- * Initializes the connection to Socket.
- */
- public void intializeConnection() {
- OutputStream outputStream;
- try {
- client = new Socket(hostName, port);
- outputStream = client.getOutputStream();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- dataOutputStream = new DataOutputStream(outputStream);
- }
-
- /**
- * Called when new data arrives to the sink, and forwards it to Socket.
- *
- * @param value
- * The incoming data
- */
- @Override
- public void invoke(IN value) {
- byte[] msg = scheme.serialize(value);
- try {
- dataOutputStream.write(msg);
- } catch (IOException e) {
- if(LOG.isErrorEnabled()){
- LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
- }
- }
- }
-
- /**
- * Closes the connection of the Socket client.
- */
- private void closeConnection(){
- try {
- client.close();
- } catch (IOException e) {
- throw new RuntimeException("Error while closing connection with socket server at "
- + hostName + ":" + port, e);
- } finally {
- if (client != null) {
- try {
- client.close();
- } catch (IOException e) {
- LOG.error("Cannot close connection with socket server at "
- + hostName + ":" + port, e);
- }
- }
- }
- }
-
- /**
- * Initialize the connection with the Socket in the server.
- * @param parameters Configuration.
- */
- @Override
- public void open(Configuration parameters) {
- intializeConnection();
- }
-
- /**
- * Closes the connection with the Socket server.
- */
- @Override
- public void close() {
- closeConnection();
- }
-
- /**
- * Closes the connection with the Socket server.
- */
- @Override
- public void cancel() {
- close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
deleted file mode 100644
index 4507a1d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import java.io.Serializable;
-
-public interface DeserializationSchema<T> extends Serializable {
-
- /**
- * Deserializes the incoming data.
- *
- * @param message
- * The incoming message in a byte array
- * @return The deserialized message in the required format.
- */
- public T deserialize(byte[] message);
-
- /**
- * Method to decide whether the element signals the end of the stream. If
- * true is returned the element won't be emitted
- *
- * @param nextElement
- * The element to test for end signal
- * @return The end signal, if true the stream shuts down
- */
- public boolean isEndOfStream(T nextElement);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java
deleted file mode 100644
index 569d3e6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import org.apache.commons.lang3.SerializationUtils;
-
-public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isEndOfStream(String nextElement) {
- return nextElement.equals("q");
- }
-
- @Override
- public byte[] serialize(String element) {
- return SerializationUtils.serialize(element);
- }
-
- @Override
- public String deserialize(byte[] message) {
- return SerializationUtils.deserialize(message);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
deleted file mode 100644
index 29c749a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-public class RawSchema implements DeserializationSchema<byte[]>,
- SerializationSchema<byte[], byte[]> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public byte[] deserialize(byte[] message) {
- return message;
- }
-
- @Override
- public boolean isEndOfStream(byte[] nextElement) {
- return false;
- }
-
- @Override
- public byte[] serialize(byte[] element) {
- return element;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
deleted file mode 100644
index f8d2b2f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import java.io.Serializable;
-
-public interface SerializationSchema<T, R> extends Serializable {
-
- /**
- * Serializes the incoming element to a specified type.
- *
- * @param element
- * The incoming element to be serialized
- * @return The serialized element.
- */
- public R serialize(T element);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
deleted file mode 100644
index 4b21580..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-public class SimpleStringSchema implements DeserializationSchema<String>,
- SerializationSchema<String, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String deserialize(byte[] message) {
- return new String(message);
- }
-
- @Override
- public boolean isEndOfStream(String nextElement) {
- return false;
- }
-
- @Override
- public String serialize(String element) {
- return element;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 134525d..ab786ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -36,7 +36,7 @@ import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b62a6d8..5768101 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -59,6 +59,7 @@ import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
import org.apache.flink.streaming.api.function.sink.FileSinkFunctionByMillis;
import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.SocketClientSink;
import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
@@ -80,6 +81,7 @@ import org.apache.flink.streaming.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
/**
* A DataStream represents a stream of elements of the same type. A DataStream
@@ -1115,6 +1117,20 @@ public class DataStream<OUT> {
return writeToFile((OutputFormat<OUT>) of, millis);
}
+ /**
+ * Writes the DataStream to a socket as a byte array. The format of the output is
+ * specified by a {@link SerializationSchema}.
+ *
+ * @param hostName host of the socket
+ * @param port port of the socket
+ * @param schema schema for serialization
+ * @return the closed DataStream
+ */
+ public DataStreamSink<OUT> writeToSocket(String hostName, int port, SerializationSchema<OUT, byte[]> schema){
+ DataStreamSink<OUT> returnStream = addSink(new SocketClientSink<OUT>(hostName, port, schema));
+ return returnStream;
+ }
+
private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) {
DataStreamSink<OUT> returnStream = addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
return returnStream;
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
new file mode 100644
index 0000000..6ebcf46
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
+ *
+ * @param <IN> data to be written into the Socket.
+ */
+public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
+ private final String hostName;
+ private final int port;
+ private final SerializationSchema<IN, byte[]> schema;
+ private transient Socket client;
+ private transient DataOutputStream dataOutputStream;
+
+ /**
+ * Default constructor.
+ *
+ * @param hostName Host of the Socket server.
+ * @param port Port of the Socket.
+ * @param schema Schema of the data.
+ */
+ public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+ this.hostName = hostName;
+ this.port = port;
+ this.schema = schema;
+ }
+
+ /**
+ * Initializes the connection to Socket.
+ */
+ public void intializeConnection() {
+ OutputStream outputStream;
+ try {
+ client = new Socket(hostName, port);
+ outputStream = client.getOutputStream();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ dataOutputStream = new DataOutputStream(outputStream);
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to Socket.
+ *
+ * @param value
+ * The incoming data
+ */
+ @Override
+ public void invoke(IN value) {
+ byte[] msg = schema.serialize(value);
+ try {
+ dataOutputStream.write(msg);
+ } catch (IOException e) {
+ if(LOG.isErrorEnabled()){
+ LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
+ }
+ }
+ }
+
+ /**
+ * Closes the connection of the Socket client.
+ */
+ private void closeConnection(){
+ try {
+ dataOutputStream.flush();
+ client.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Error while closing connection with socket server at "
+ + hostName + ":" + port, e);
+ } finally {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ LOG.error("Cannot close connection with socket server at "
+ + hostName + ":" + port, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Initialize the connection with the Socket in the server.
+ * @param parameters Configuration.
+ */
+ @Override
+ public void open(Configuration parameters) {
+ intializeConnection();
+ }
+
+ /**
+ * Closes the connection with the Socket server.
+ */
+ @Override
+ public void close() {
+ closeConnection();
+ }
+
+ /**
+ * Closes the connection with the Socket server.
+ */
+ @Override
+ public void cancel() {
+ close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
new file mode 100644
index 0000000..08ef461
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+public interface DeserializationSchema<T> extends Serializable {
+
+ /**
+ * Deserializes the incoming data.
+ *
+ * @param message
+ * The incoming message in a byte array
+ * @return The deserialized message in the required format.
+ */
+ public T deserialize(byte[] message);
+
+ /**
+ * Method to decide whether the element signals the end of the stream. If
+ * true is returned the element won't be emitted
+ *
+ * @param nextElement
+ * The element to test for end signal
+ * @return The end signal, if true the stream shuts down
+ */
+ public boolean isEndOfStream(T nextElement);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
new file mode 100644
index 0000000..93d13ab
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.commons.lang3.SerializationUtils;
+
+public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isEndOfStream(String nextElement) {
+ return nextElement.equals("q");
+ }
+
+ @Override
+ public byte[] serialize(String element) {
+ return SerializationUtils.serialize(element);
+ }
+
+ @Override
+ public String deserialize(byte[] message) {
+ return SerializationUtils.deserialize(message);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
new file mode 100644
index 0000000..e457bef
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+public class RawSchema implements DeserializationSchema<byte[]>,
+ SerializationSchema<byte[], byte[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public byte[] deserialize(byte[] message) {
+ return message;
+ }
+
+ @Override
+ public boolean isEndOfStream(byte[] nextElement) {
+ return false;
+ }
+
+ @Override
+ public byte[] serialize(byte[] element) {
+ return element;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
new file mode 100644
index 0000000..8124eb0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+public interface SerializationSchema<T, R> extends Serializable {
+
+ /**
+ * Serializes the incoming element to a specified type.
+ *
+ * @param element
+ * The incoming element to be serialized
+ * @return The serialized element.
+ */
+ public R serialize(T element);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
new file mode 100644
index 0000000..3d0a0d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+public class SimpleStringSchema implements DeserializationSchema<String>,
+ SerializationSchema<String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String deserialize(byte[] message) {
+ return new String(message);
+ }
+
+ @Override
+ public boolean isEndOfStream(String nextElement) {
+ return false;
+ }
+
+ @Override
+ public String serialize(String element) {
+ return element;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3dc54d6..7d77b5b 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
SingleOutputStreamOperator, GroupedDataStream}
+import org.apache.flink.streaming.util.serialization.SerializationSchema
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -605,6 +606,13 @@ class DataStream[T](javaStream: JavaStream[T]) {
javaStream.writeAsCsv(path, millis)
/**
+ * Writes the DataStream to a socket as a byte array. The format of the output is
+ * specified by a {@link SerializationSchema}.
+ */
+ def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T, Array[Byte]]):
+ DataStream[T] = javaStream.writeToSocket(hostname, port, schema)
+
+ /**
* Adds the given sink to this DataStream. Only streams with sinks added
* will be executed once the StreamExecutionEnvironment.execute(...)
* method is called.
[4/4] flink git commit: [FLINK-1763] [streaming] Remove cancel from
SinkFunction
Posted by mb...@apache.org.
[FLINK-1763] [streaming] Remove cancel from SinkFunction
This closes #513
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35f34162
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35f34162
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35f34162
Branch: refs/heads/master
Commit: 35f34162a1ba3bf3fd590d7b0e59011ad9f6160f
Parents: 2842e2f
Author: Gyula Fora <gy...@apache.org>
Authored: Sun Mar 22 13:44:27 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Sun Mar 22 16:44:09 2015 +0100
----------------------------------------------------------------------
.../streaming/connectors/flume/FlumeSink.java | 7 ----
.../connectors/kafka/api/KafkaSink.java | 6 +--
.../connectors/kafka/api/KafkaSource.java | 1 +
.../api/simple/offset/BeginningOffset.java | 2 +
.../kafka/api/simple/offset/CurrentOffset.java | 2 +
.../kafka/api/simple/offset/GivenOffset.java | 1 +
.../kafka/api/simple/offset/KafkaOffset.java | 2 +
.../streaming/connectors/rabbitmq/RMQSink.java | 5 ---
.../connectors/twitter/TwitterStreaming.java | 4 --
.../streaming/connectors/kafka/KafkaITCase.java | 7 ++--
.../api/function/sink/FileSinkFunction.java | 9 ----
.../api/function/sink/PrintSinkFunction.java | 6 ---
.../api/function/sink/SinkFunction.java | 7 ----
.../api/function/sink/SocketClientSink.java | 8 ----
.../sink/WriteSinkFunctionByMillis.java | 5 ---
.../streaming/api/invokable/SinkInvokable.java | 7 ----
.../api/streamvertex/StreamVertex.java | 43 +++++++++++++++-----
.../streaming/io/SpillingBufferOrEvent.java | 1 +
.../apache/flink/streaming/api/IterateTest.java | 5 ---
.../api/collector/DirectedOutputTest.java | 6 +--
.../windowing/WindowIntegrationTest.java | 40 ------------------
.../api/streamvertex/StreamVertexTest.java | 10 +----
.../streaming/util/TestListResultSink.java | 6 ---
.../flink/streaming/api/scala/DataStream.scala | 1 -
.../StreamCheckpointingITCase.java | 4 +-
.../test/classloading/jar/StreamingProgram.java | 4 --
26 files changed, 50 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 27074ec..8112159 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -134,13 +134,6 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
}
@Override
- public void cancel() {
- if (client != null) {
- client.client.close();
- }
- }
-
- @Override
public void open(Configuration config) {
client = new FlinkRpcClientFacade();
client.init(host, port);
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index be9eb57..f1dbc8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -65,6 +65,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
* @param serializationSchema
* User defined serialization schema.
*/
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public KafkaSink(String zookeeperAddress, String topicId,
SerializationSchema<IN, byte[]> serializationSchema) {
this(zookeeperAddress, topicId, serializationSchema, (Class) null);
@@ -161,9 +162,4 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
}
}
- @Override
- public void cancel() {
- close();
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index ae6c169..1aa834d 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -46,6 +46,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
private static final long serialVersionUID = 1L;
+ @SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
private final String zookeeperAddress;
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
index f7096ad..15e7b36 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
@@ -22,6 +22,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
public class BeginningOffset extends KafkaOffset {
+ private static final long serialVersionUID = 1L;
+
@Override
public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
return getLastOffset(consumer, topic, partition, OffsetRequest.EarliestTime(), clientName);
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
index 3555ff9..6119f32 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
@@ -22,6 +22,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
public class CurrentOffset extends KafkaOffset {
+ private static final long serialVersionUID = 1L;
+
@Override
public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
return getLastOffset(consumer, topic, partition, OffsetRequest.LatestTime(), clientName);
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
index 1282125..fef6325 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
@@ -21,6 +21,7 @@ import kafka.javaapi.consumer.SimpleConsumer;
public class GivenOffset extends KafkaOffset {
+ private static final long serialVersionUID = 1L;
private final long offset;
public GivenOffset(long offset) {
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
index c048ba1..4dfd314 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
@@ -28,6 +28,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
public abstract class KafkaOffset implements Serializable {
+ private static final long serialVersionUID = 1L;
+
public abstract long getOffset(SimpleConsumer consumer, String topic, int partition,
String clientName);
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 48f5e60..53db1c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -108,9 +108,4 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
closeChannel();
}
- @Override
- public void cancel() {
- close();
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index 9be27eb..a32fe1b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -47,10 +47,6 @@ public class TwitterStreaming {
System.out.println("");
}
- @Override
- public void cancel() {
- }
-
}
public static class SelectDataFlatMap extends
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index ab786ee..a094b89 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -154,14 +154,11 @@ public class KafkaITCase {
throw new SuccessException();
}
}
-
- @Override
- public void cancel() {
- }
});
// add producing topology
DataStream<String> stream = env.addSource(new SourceFunction<String>() {
+ private static final long serialVersionUID = 1L;
boolean running = true;
@Override
@@ -247,6 +244,8 @@ public class KafkaITCase {
public static class SuccessException extends Exception {
+ private static final long serialVersionUID = 1L;
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
index 5468494..24beba1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
@@ -115,13 +115,4 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
*/
protected abstract void resetParameters();
- @Override
- public void cancel() {
- try {
- close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index 0fa37ac..9ff8a7f 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -93,10 +93,4 @@ public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
public String toString() {
return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
}
-
- @Override
- public void cancel() {
- close();
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index ffa5a67..d4ce24e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -35,11 +35,4 @@ public interface SinkFunction<IN> extends Function, Serializable {
* @throws Exception
*/
public void invoke(IN value) throws Exception;
-
- /**
- * In case another vertex in topology fails this method is called before terminating
- * the sink. Make sure to free up any allocated resources here.
- */
- public void cancel();
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
index 6ebcf46..c582c1b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
@@ -127,12 +127,4 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
closeConnection();
}
- /**
- * Closes the connection with the Socket server.
- */
- @Override
- public void cancel() {
- close();
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
index 53030f4..ee6df94 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
@@ -47,9 +47,4 @@ public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
lastTime = System.currentTimeMillis();
}
- @Override
- public void cancel() {
- // No cleanup needed
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 01a295a..29d2ed2 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -40,11 +40,4 @@ public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
protected void callUserFunction() throws Exception {
sinkFunction.invoke(nextObject);
}
-
- @Override
- public void cancel() {
- super.cancel();
- sinkFunction.cancel();
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index dd8a463..ae2ebdd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -29,9 +29,9 @@ import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.jobmanager.BarrierAck;
import org.apache.flink.runtime.jobmanager.StateBarrierAck;
import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.invokable.ChainableInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -152,32 +152,36 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
@Override
public void invoke() throws Exception {
+ boolean operatorOpen = false;
+
if (LOG.isDebugEnabled()) {
LOG.debug("Task {} invoked with instance id {}", getName(), getInstanceID());
}
try {
userInvokable.setRuntimeContext(context);
- userInvokable.open(getTaskConfiguration());
- for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
- invokable.setRuntimeContext(context);
- invokable.open(getTaskConfiguration());
- }
+ operatorOpen = true;
+ openOperator();
userInvokable.invoke();
- userInvokable.close();
-
- for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
- invokable.close();
- }
+ closeOperator();
+ operatorOpen = false;
if (LOG.isDebugEnabled()) {
LOG.debug("Task {} invoke finished instance id {}", getName(), getInstanceID());
}
} catch (Exception e) {
+
+ if (operatorOpen) {
+ try {
+ closeOperator();
+ } catch (Throwable t) {
+ }
+ }
+
if (LOG.isErrorEnabled()) {
LOG.error("StreamInvokable failed due to: {}", StringUtils.stringifyException(e));
}
@@ -190,6 +194,23 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
}
+ protected void openOperator() throws Exception {
+ userInvokable.open(getTaskConfiguration());
+
+ for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
+ invokable.setRuntimeContext(context);
+ invokable.open(getTaskConfiguration());
+ }
+ }
+
+ protected void closeOperator() throws Exception {
+ userInvokable.close();
+
+ for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
+ invokable.close();
+ }
+ }
+
protected void clearBuffers() throws IOException {
if (outputHandler != null) {
outputHandler.clearWriters();
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
index 40713e2..38cc0c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
@@ -40,6 +40,7 @@ public class SpillingBufferOrEvent {
throws IOException {
this.boe = boe;
+ this.channelIndex = boe.getChannelIndex();
this.spillReader = reader;
if (shouldSpill()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 92d23aa..3f0c48a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -68,11 +68,6 @@ public class IterateTest {
@Override
public void invoke(Boolean tuple) {
}
-
- @Override
- public void cancel() {
- }
-
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 62872fd..bc7fe73 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -88,9 +88,6 @@ public class DirectedOutputTest {
this.list = outputs.get(name);
}
- @Override
- public void cancel() {
- }
}
private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();
@@ -115,6 +112,7 @@ public class DirectedOutputTest {
assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult());
assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
evenAndOddSink.getSortedResult());
- assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), allSink.getSortedResult());
+ assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
+ allSink.getSortedResult());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 34986c8..f0b5500 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -286,10 +286,6 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
- @Override
- public void cancel() {
- }
-
}
@SuppressWarnings("serial")
@@ -303,10 +299,6 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
- @Override
- public void cancel() {
- }
-
}
@SuppressWarnings("serial")
@@ -320,10 +312,6 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
- @Override
- public void cancel() {
- }
-
}
@SuppressWarnings("serial")
@@ -337,10 +325,6 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
- @Override
- public void cancel() {
- }
-
}
@SuppressWarnings("serial")
@@ -354,10 +338,6 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
- @Override
- public void cancel() {
- }
-
}
@SuppressWarnings("serial")
@@ -371,10 +351,6 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
- @Override
- public void cancel() {
- }
-
}
@SuppressWarnings("serial")
@@ -388,10 +364,6 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
- @Override
- public void cancel() {
- }
-
}
@SuppressWarnings("serial")
@@ -405,10 +377,6 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
- @Override
- public void cancel() {
- }
-
}
@SuppressWarnings("serial")
@@ -422,10 +390,6 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
- @Override
- public void cancel() {
- }
-
}
@SuppressWarnings("serial")
@@ -439,10 +403,6 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
- @Override
- public void cancel() {
- }
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index a88a60d..0c3ff83 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -83,9 +83,6 @@ public class StreamVertexTest {
data.put(k, v);
}
- @Override
- public void cancel() {
- }
}
@SuppressWarnings("unused")
@@ -134,13 +131,13 @@ public class StreamVertexTest {
@Override
public String map1(String value) {
-// System.out.println(value);
+ // System.out.println(value);
return value;
}
@Override
public String map2(Long value) {
-// System.out.println(value);
+ // System.out.println(value);
return value.toString();
}
}
@@ -154,9 +151,6 @@ public class StreamVertexTest {
result.add(value);
}
- @Override
- public void cancel() {
- }
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
index 8b78a42..87f290f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.util;
-
import java.util.ArrayList;
import java.util.List;
import java.util.TreeSet;
@@ -72,9 +71,4 @@ public class TestListResultSink<T> extends RichSinkFunction<T> {
return sortedList;
}
}
-
- @Override
- public void cancel() {
-
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 7d77b5b..59f86b8 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -634,7 +634,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
val sinkFunction = new SinkFunction[T] {
val cleanFun = clean(fun)
def invoke(in: T) = cleanFun(in)
- def cancel() = {}
}
this.addSink(sinkFunction)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 3accb11..12c6b41 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -178,9 +178,7 @@ public class StreamCheckpointingITCase {
}
}
- @Override
- public void cancel() {}
- });
+ });
env.execute();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index 18b52c5..9a244a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -98,9 +98,5 @@ public class StreamingProgram {
@Override
public void invoke(Word value) throws Exception {
}
-
- @Override
- public void cancel() {
- }
}
}
[3/4] flink git commit: [FLINK-1756] [streaming] Rename Stream
Monitoring to Stream Checkpointing
Posted by mb...@apache.org.
[FLINK-1756] [streaming] Rename Stream Monitoring to Stream Checkpointing
Also set the default checkpoint interval to 5 secs instead of 10.
This closes #506
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2842e2fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2842e2fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2842e2fd
Branch: refs/heads/master
Commit: 2842e2fd9893add426c42aa40436b898e40a1ffc
Parents: 1179649
Author: mbalassi <mb...@apache.org>
Authored: Fri Mar 20 16:06:56 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Sun Mar 22 16:35:06 2015 +0100
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 30 ++++++++++----------
.../flink/runtime/jobmanager/JobManager.scala | 8 +++---
.../apache/flink/streaming/api/StreamGraph.java | 20 ++++++-------
.../api/StreamingJobGraphGenerator.java | 6 ++--
.../environment/StreamExecutionEnvironment.java | 10 +++----
.../api/scala/StreamExecutionEnvironment.scala | 8 +++---
6 files changed, 41 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 46ec445..91d01a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -178,11 +178,11 @@ public class ExecutionGraph implements Serializable {
private ActorContext parentContext;
- private ActorRef stateMonitorActor;
+ private ActorRef stateCheckpointerActor;
- private boolean monitoringEnabled;
+ private boolean checkpointingEnabled;
- private long monitoringInterval = 10000;
+ private long checkpointingInterval = 5000;
public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>());
@@ -223,12 +223,12 @@ public class ExecutionGraph implements Serializable {
// --------------------------------------------------------------------------------------------
- public void setStateMonitorActor(ActorRef stateMonitorActor) {
- this.stateMonitorActor = stateMonitorActor;
+ public void setStateCheckpointerActor(ActorRef stateCheckpointerActor) {
+ this.stateCheckpointerActor = stateCheckpointerActor;
}
- public ActorRef getStateMonitorActor() {
- return stateMonitorActor;
+ public ActorRef getStateCheckpointerActor() {
+ return stateCheckpointerActor;
}
public void setParentContext(ActorContext parentContext) {
@@ -289,12 +289,12 @@ public class ExecutionGraph implements Serializable {
}
}
- public void setMonitoringEnabled(boolean monitoringEnabled) {
- this.monitoringEnabled = monitoringEnabled;
+ public void setCheckpointingEnabled(boolean checkpointingEnabled) {
+ this.checkpointingEnabled = checkpointingEnabled;
}
- public void setMonitoringInterval(long monitoringInterval) {
- this.monitoringInterval = monitoringInterval;
+ public void setCheckpointingInterval(long checkpointingInterval) {
+ this.checkpointingInterval = checkpointingInterval;
}
/**
@@ -451,9 +451,9 @@ public class ExecutionGraph implements Serializable {
throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
}
- if (monitoringEnabled) {
- stateMonitorActor = StreamCheckpointCoordinator.spawn(parentContext, this,
- Duration.create(monitoringInterval, TimeUnit.MILLISECONDS));
+ if (checkpointingEnabled) {
+ stateCheckpointerActor = StreamCheckpointCoordinator.spawn(parentContext, this,
+ Duration.create(checkpointingInterval, TimeUnit.MILLISECONDS));
}
}
else {
@@ -777,6 +777,6 @@ public class ExecutionGraph implements Serializable {
scheduler = null;
parentContext = null;
- stateMonitorActor = null;
+ stateCheckpointerActor = null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 70a1cbb..61a0aea 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -311,13 +311,13 @@ class JobManager(val configuration: Configuration,
case msg: BarrierAck =>
currentJobs.get(msg.jobID) match {
case Some(jobExecution) =>
- jobExecution._1.getStateMonitorActor forward msg
+ jobExecution._1.getStateCheckpointerActor forward msg
case None =>
}
case msg: StateBarrierAck =>
currentJobs.get(msg.jobID) match {
case Some(jobExecution) =>
- jobExecution._1.getStateMonitorActor forward msg
+ jobExecution._1.getStateCheckpointerActor forward msg
case None =>
}
@@ -487,8 +487,8 @@ class JobManager(val configuration: Configuration,
executionGraph.setScheduleMode(jobGraph.getScheduleMode)
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
- executionGraph.setMonitoringEnabled(jobGraph.isMonitoringEnabled)
- executionGraph.setMonitoringInterval(jobGraph.getMonitorInterval)
+ executionGraph.setCheckpointingEnabled(jobGraph.isMonitoringEnabled)
+ executionGraph.setCheckpointingInterval(jobGraph.getMonitorInterval)
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits
http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 71706bc..970ce49 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -92,9 +92,9 @@ public class StreamGraph extends StreamingPlan {
private ExecutionConfig executionConfig;
- private boolean monitoringEnabled;
+ private boolean checkpointingEnabled;
- private long monitoringInterval = 10000;
+ private long checkpointingInterval = 5000;
public StreamGraph(ExecutionConfig executionConfig) {
@@ -559,20 +559,20 @@ public class StreamGraph extends StreamingPlan {
return executionConfig;
}
- public void setMonitoringEnabled(boolean monitoringEnabled) {
- this.monitoringEnabled = monitoringEnabled;
+ public void setCheckpointingEnabled(boolean checkpointingEnabled) {
+ this.checkpointingEnabled = checkpointingEnabled;
}
- public boolean isMonitoringEnabled() {
- return monitoringEnabled;
+ public boolean isCheckpointingEnabled() {
+ return checkpointingEnabled;
}
- public void setMonitoringInterval(long monitoringInterval) {
- this.monitoringInterval = monitoringInterval;
+ public void setCheckpointingInterval(long checkpointingInterval) {
+ this.checkpointingInterval = checkpointingInterval;
}
- public long getMonitoringInterval() {
- return monitoringInterval;
+ public long getCheckpointingInterval() {
+ return checkpointingInterval;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 544ccc6..ad744d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -78,8 +78,8 @@ public class StreamingJobGraphGenerator {
// Turn lazy scheduling off
jobGraph.setScheduleMode(ScheduleMode.ALL);
jobGraph.setJobType(JobGraph.JobType.STREAMING);
- jobGraph.setMonitoringEnabled(streamGraph.isMonitoringEnabled());
- jobGraph.setMonitorInterval(streamGraph.getMonitoringInterval());
+ jobGraph.setMonitoringEnabled(streamGraph.isCheckpointingEnabled());
+ jobGraph.setMonitorInterval(streamGraph.getCheckpointingInterval());
if(jobGraph.isMonitoringEnabled()) {
int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
@@ -257,7 +257,7 @@ public class StreamingJobGraphGenerator {
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);
- config.setStateMonitoring(streamGraph.isMonitoringEnabled());
+ config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
Class<? extends AbstractInvokable> vertexClass = streamGraph.getJobVertexClass(vertexID);
http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2096745..994ff15 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -163,9 +163,9 @@ public abstract class StreamExecutionEnvironment {
*
* @param interval Time interval between state checkpoints in millis
*/
- public StreamExecutionEnvironment enableMonitoring(long interval) {
- streamGraph.setMonitoringEnabled(true);
- streamGraph.setMonitoringInterval(interval);
+ public StreamExecutionEnvironment enableCheckpointing(long interval) {
+ streamGraph.setCheckpointingEnabled(true);
+ streamGraph.setCheckpointingInterval(interval);
return this;
}
@@ -178,8 +178,8 @@ public abstract class StreamExecutionEnvironment {
* otherwise with calling with the {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
* method in case of failure the job will be resubmitted to the cluster indefinitely.
*/
- public StreamExecutionEnvironment enableMonitoring() {
- streamGraph.setMonitoringEnabled(true);
+ public StreamExecutionEnvironment enableCheckpointing() {
+ streamGraph.setCheckpointingEnabled(true);
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 4672fca..2208388 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -84,8 +84,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
* failure the job will be resubmitted to the cluster indefinitely.
*/
- def enableMonitoring(interval : Long) : StreamExecutionEnvironment = {
- javaEnv.enableMonitoring(interval)
+ def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
+ javaEnv.enableCheckpointing(interval)
this
}
@@ -98,8 +98,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
* failure the job will be resubmitted to the cluster indefinitely.
*/
- def enableMonitoring() : StreamExecutionEnvironment = {
- javaEnv.enableMonitoring()
+ def enableCheckpointing() : StreamExecutionEnvironment = {
+ javaEnv.enableCheckpointing()
this
}