You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/03/22 18:35:26 UTC
[2/4] flink git commit: [FLINK-1688] [streaming] [api-extending]
Socket Client Sink added to the DataStream API
[FLINK-1688] [streaming] [api-extending] Socket Client Sink added to the DataStream API
Moved Serialization Schemas from connectors to core
Minor cleanups for Socket Client Sink
This closes #484
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11796495
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11796495
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11796495
Branch: refs/heads/master
Commit: 11796495762893f2c8b1f744321d0c4b5db0b881
Parents: a3bc785
Author: mbalassi <mb...@apache.org>
Authored: Sun Mar 22 10:27:30 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Sun Mar 22 12:11:05 2015 +0100
----------------------------------------------------------------------
.../flink-streaming-connectors/pom.xml | 2 +-
.../streaming/connectors/ConnectorSource.java | 2 +-
.../streaming/connectors/flume/FlumeSink.java | 8 +-
.../streaming/connectors/flume/FlumeSource.java | 2 +-
.../connectors/flume/FlumeTopology.java | 4 +-
.../connectors/kafka/KafkaConsumerExample.java | 2 +-
.../connectors/kafka/KafkaProducerExample.java | 2 +-
.../kafka/KafkaSimpleConsumerExample.java | 2 +-
.../connectors/kafka/api/KafkaSink.java | 10 +-
.../connectors/kafka/api/KafkaSource.java | 2 +-
.../kafka/api/simple/KafkaTopicUtils.java | 2 -
.../kafka/api/simple/PersistentKafkaSource.java | 2 +-
.../streaming/connectors/rabbitmq/RMQSink.java | 8 +-
.../connectors/rabbitmq/RMQSource.java | 2 +-
.../connectors/rabbitmq/RMQTopology.java | 4 +-
.../connectors/socket/SocketClientSink.java | 141 -------------------
.../connectors/util/DeserializationSchema.java | 42 ------
.../util/JavaDefaultStringSchema.java | 41 ------
.../streaming/connectors/util/RawSchema.java | 39 -----
.../connectors/util/SerializationSchema.java | 33 -----
.../connectors/util/SimpleStringSchema.java | 40 ------
.../streaming/connectors/kafka/KafkaITCase.java | 2 +-
.../streaming/api/datastream/DataStream.java | 16 +++
.../api/function/sink/SocketClientSink.java | 138 ++++++++++++++++++
.../serialization/DeserializationSchema.java | 42 ++++++
.../serialization/JavaDefaultStringSchema.java | 41 ++++++
.../streaming/util/serialization/RawSchema.java | 39 +++++
.../util/serialization/SerializationSchema.java | 33 +++++
.../util/serialization/SimpleStringSchema.java | 40 ++++++
.../flink/streaming/api/scala/DataStream.scala | 8 ++
30 files changed, 384 insertions(+), 365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
index 4408ba0..8155100 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
@@ -35,7 +35,7 @@ under the License.
<packaging>jar</packaging>
- <!-- Allow users to pass custom kafka versions -->
+ <!-- Allow users to pass custom connector versions -->
<properties>
<kafka.version>0.8.2.0</kafka.version>
<rabbitmq.version>3.3.1</rabbitmq.version>
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
index 1623943..a7b0b06 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements
GenericSourceFunction<OUT> {
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 86fd1b1..27074ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.flume;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
@@ -39,12 +39,12 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
boolean initDone = false;
String host;
int port;
- SerializationSchema<IN, byte[]> scheme;
+ SerializationSchema<IN, byte[]> schema;
public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
this.host = host;
this.port = port;
- this.scheme = schema;
+ this.schema = schema;
}
/**
@@ -57,7 +57,7 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
@Override
public void invoke(IN value) {
- byte[] data = scheme.serialize(value);
+ byte[] data = schema.serialize(value);
client.sendDataToFlume(data);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 2a321a2..00661ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -21,7 +21,7 @@ import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.flume.Context;
import org.apache.flume.channel.ChannelProcessor;
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 3cfd7d4..7c979d5 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.flume;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
-import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class FlumeTopology {
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index dd1221d..6a95b0c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
public class KafkaConsumerExample {
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index 1fe759a..e7abf11 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -21,7 +21,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.util.Collector;
public class KafkaProducerExample {
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
index 47c5a33..2fd31ed 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
public class KafkaSimpleConsumerExample {
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 1753561..be9eb57 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils;
import com.google.common.base.Preconditions;
@@ -50,7 +50,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
private Properties props;
private String topicId;
private String zookeeperAddress;
- private SerializationSchema<IN, byte[]> scheme;
+ private SerializationSchema<IN, byte[]> schema;
private SerializableKafkaPartitioner partitioner;
private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
@@ -91,7 +91,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
this.zookeeperAddress = zookeeperAddress;
this.topicId = topicId;
- this.scheme = serializationSchema;
+ this.schema = serializationSchema;
this.partitioner = partitioner;
}
@@ -103,7 +103,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
this.zookeeperAddress = zookeeperAddress;
this.topicId = topicId;
- this.scheme = serializationSchema;
+ this.schema = serializationSchema;
this.partitionerClass = partitioner;
}
@@ -150,7 +150,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
*/
@Override
public void invoke(IN next) {
- byte[] serialized = scheme.serialize(next);
+ byte[] serialized = schema.serialize(next);
producer.send(new KeyedMessage<IN, byte[]>(topicId, next, serialized));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 4eff870..ae6c169 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -31,7 +31,7 @@ import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
index 7a10ed3..9e09ea8 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
@@ -82,8 +82,6 @@ public class KafkaTopicUtils {
Broker leader = JavaConversions.asJavaCollection(partitionMetadata.isr()).iterator().next();
- // TODO for Kafka version 8.2.0
- // return leader.connectionString();
return leader.connectionString();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 8ec298a..97225dc 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffs
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index dae9c6d..48f5e60 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,12 +39,12 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
- private SerializationSchema<IN, byte[]> scheme;
+ private SerializationSchema<IN, byte[]> schema;
public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]> schema) {
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
- this.scheme = schema;
+ this.schema = schema;
}
/**
@@ -72,7 +72,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
@Override
public void invoke(IN value) {
try {
- byte[] msg = scheme.serialize(value);
+ byte[] msg = schema.serialize(value);
channel.basicPublish("", QUEUE_NAME, null, msg);
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 12ad3d6..03b6d10 100755
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index a6ca9ae..0f06235 100755
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
-import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class RMQTopology {
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
deleted file mode 100644
index 72ba4f2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.socket;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Socket client that acts as a streaming sink. The data is sent to a Socket.
- *
- * @param <IN> data to be written into the Socket.
- */
-public class SocketClientSink<IN> extends RichSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- /**
- * Class logger
- */
- private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
-
- private final String hostName;
- private final int port;
- private final SerializationSchema<IN, byte[]> scheme;
- private transient Socket client;
- private transient DataOutputStream dataOutputStream;
-
- /**
- * Default constructor.
- *
- * @param hostName Host of the Socket server.
- * @param port Port of the Socket.
- * @param schema Schema of the data.
- */
- public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
- this.hostName = hostName;
- this.port = port;
- this.scheme = schema;
- }
-
- /**
- * Initializes the connection to Socket.
- */
- public void intializeConnection() {
- OutputStream outputStream;
- try {
- client = new Socket(hostName, port);
- outputStream = client.getOutputStream();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- dataOutputStream = new DataOutputStream(outputStream);
- }
-
- /**
- * Called when new data arrives to the sink, and forwards it to Socket.
- *
- * @param value
- * The incoming data
- */
- @Override
- public void invoke(IN value) {
- byte[] msg = scheme.serialize(value);
- try {
- dataOutputStream.write(msg);
- } catch (IOException e) {
- if(LOG.isErrorEnabled()){
- LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
- }
- }
- }
-
- /**
- * Closes the connection of the Socket client.
- */
- private void closeConnection(){
- try {
- client.close();
- } catch (IOException e) {
- throw new RuntimeException("Error while closing connection with socket server at "
- + hostName + ":" + port, e);
- } finally {
- if (client != null) {
- try {
- client.close();
- } catch (IOException e) {
- LOG.error("Cannot close connection with socket server at "
- + hostName + ":" + port, e);
- }
- }
- }
- }
-
- /**
- * Initialize the connection with the Socket in the server.
- * @param parameters Configuration.
- */
- @Override
- public void open(Configuration parameters) {
- intializeConnection();
- }
-
- /**
- * Closes the connection with the Socket server.
- */
- @Override
- public void close() {
- closeConnection();
- }
-
- /**
- * Closes the connection with the Socket server.
- */
- @Override
- public void cancel() {
- close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
deleted file mode 100644
index 4507a1d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import java.io.Serializable;
-
-public interface DeserializationSchema<T> extends Serializable {
-
- /**
- * Deserializes the incoming data.
- *
- * @param message
- * The incoming message in a byte array
- * @return The deserialized message in the required format.
- */
- public T deserialize(byte[] message);
-
- /**
- * Method to decide whether the element signals the end of the stream. If
- * true is returned the element won't be emitted
- *
- * @param nextElement
- * The element to test for end signal
- * @return The end signal, if true the stream shuts down
- */
- public boolean isEndOfStream(T nextElement);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java
deleted file mode 100644
index 569d3e6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import org.apache.commons.lang3.SerializationUtils;
-
-public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isEndOfStream(String nextElement) {
- return nextElement.equals("q");
- }
-
- @Override
- public byte[] serialize(String element) {
- return SerializationUtils.serialize(element);
- }
-
- @Override
- public String deserialize(byte[] message) {
- return SerializationUtils.deserialize(message);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
deleted file mode 100644
index 29c749a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-public class RawSchema implements DeserializationSchema<byte[]>,
- SerializationSchema<byte[], byte[]> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public byte[] deserialize(byte[] message) {
- return message;
- }
-
- @Override
- public boolean isEndOfStream(byte[] nextElement) {
- return false;
- }
-
- @Override
- public byte[] serialize(byte[] element) {
- return element;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
deleted file mode 100644
index f8d2b2f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import java.io.Serializable;
-
-public interface SerializationSchema<T, R> extends Serializable {
-
- /**
- * Serializes the incoming element to a specified type.
- *
- * @param element
- * The incoming element to be serialized
- * @return The serialized element.
- */
- public R serialize(T element);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
deleted file mode 100644
index 4b21580..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-public class SimpleStringSchema implements DeserializationSchema<String>,
- SerializationSchema<String, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String deserialize(byte[] message) {
- return new String(message);
- }
-
- @Override
- public boolean isEndOfStream(String nextElement) {
- return false;
- }
-
- @Override
- public String serialize(String element) {
- return element;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 134525d..ab786ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -36,7 +36,7 @@ import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b62a6d8..5768101 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -59,6 +59,7 @@ import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
import org.apache.flink.streaming.api.function.sink.FileSinkFunctionByMillis;
import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.SocketClientSink;
import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
@@ -80,6 +81,7 @@ import org.apache.flink.streaming.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
/**
* A DataStream represents a stream of elements of the same type. A DataStream
@@ -1115,6 +1117,20 @@ public class DataStream<OUT> {
return writeToFile((OutputFormat<OUT>) of, millis);
}
+ /**
+ * Writes the DataStream to a socket as a byte array. The format of the output is
+ * specified by a {@link SerializationSchema}.
+ *
+ * @param hostName host of the socket
+ * @param port port of the socket
+ * @param schema schema for serialization
+ * @return the closed DataStream
+ */
+ public DataStreamSink<OUT> writeToSocket(String hostName, int port, SerializationSchema<OUT, byte[]> schema){
+ DataStreamSink<OUT> returnStream = addSink(new SocketClientSink<OUT>(hostName, port, schema));
+ return returnStream;
+ }
+
private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) {
DataStreamSink<OUT> returnStream = addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
return returnStream;
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
new file mode 100644
index 0000000..6ebcf46
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
+ *
+ * @param <IN> data to be written into the Socket.
+ */
+public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
+ private final String hostName;
+ private final int port;
+ private final SerializationSchema<IN, byte[]> schema;
+ private transient Socket client;
+ private transient DataOutputStream dataOutputStream;
+
+ /**
+ * Default constructor.
+ *
+ * @param hostName Host of the Socket server.
+ * @param port Port of the Socket.
+ * @param schema Schema of the data.
+ */
+ public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+ this.hostName = hostName;
+ this.port = port;
+ this.schema = schema;
+ }
+
+ /**
+ * Initializes the connection to Socket.
+ */
+ public void intializeConnection() {
+ OutputStream outputStream;
+ try {
+ client = new Socket(hostName, port);
+ outputStream = client.getOutputStream();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ dataOutputStream = new DataOutputStream(outputStream);
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to Socket.
+ *
+ * @param value
+ * The incoming data
+ */
+ @Override
+ public void invoke(IN value) {
+ byte[] msg = schema.serialize(value);
+ try {
+ dataOutputStream.write(msg);
+ } catch (IOException e) {
+ if(LOG.isErrorEnabled()){
+ LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
+ }
+ }
+ }
+
+ /**
+ * Closes the connection of the Socket client.
+ */
+ private void closeConnection(){
+ try {
+ dataOutputStream.flush();
+ client.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Error while closing connection with socket server at "
+ + hostName + ":" + port, e);
+ } finally {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ LOG.error("Cannot close connection with socket server at "
+ + hostName + ":" + port, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Initialize the connection with the Socket in the server.
+ * @param parameters Configuration.
+ */
+ @Override
+ public void open(Configuration parameters) {
+ intializeConnection();
+ }
+
+ /**
+ * Closes the connection with the Socket server.
+ */
+ @Override
+ public void close() {
+ closeConnection();
+ }
+
+ /**
+ * Closes the connection with the Socket server.
+ */
+ @Override
+ public void cancel() {
+ close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
new file mode 100644
index 0000000..08ef461
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+public interface DeserializationSchema<T> extends Serializable {
+
+ /**
+ * Deserializes the incoming data.
+ *
+ * @param message
+ * The incoming message in a byte array
+ * @return The deserialized message in the required format.
+ */
+ public T deserialize(byte[] message);
+
+ /**
+ * Method to decide whether the element signals the end of the stream. If
+ * true is returned the element won't be emitted
+ *
+ * @param nextElement
+ * The element to test for end signal
+ * @return The end signal, if true the stream shuts down
+ */
+ public boolean isEndOfStream(T nextElement);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
new file mode 100644
index 0000000..93d13ab
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.commons.lang3.SerializationUtils;
+
+public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isEndOfStream(String nextElement) {
+ return nextElement.equals("q");
+ }
+
+ @Override
+ public byte[] serialize(String element) {
+ return SerializationUtils.serialize(element);
+ }
+
+ @Override
+ public String deserialize(byte[] message) {
+ return SerializationUtils.deserialize(message);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
new file mode 100644
index 0000000..e457bef
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+public class RawSchema implements DeserializationSchema<byte[]>,
+ SerializationSchema<byte[], byte[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public byte[] deserialize(byte[] message) {
+ return message;
+ }
+
+ @Override
+ public boolean isEndOfStream(byte[] nextElement) {
+ return false;
+ }
+
+ @Override
+ public byte[] serialize(byte[] element) {
+ return element;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
new file mode 100644
index 0000000..8124eb0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+public interface SerializationSchema<T, R> extends Serializable {
+
+ /**
+ * Serializes the incoming element to a specified type.
+ *
+ * @param element
+ * The incoming element to be serialized
+ * @return The serialized element.
+ */
+ public R serialize(T element);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
new file mode 100644
index 0000000..3d0a0d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+public class SimpleStringSchema implements DeserializationSchema<String>,
+ SerializationSchema<String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String deserialize(byte[] message) {
+ return new String(message);
+ }
+
+ @Override
+ public boolean isEndOfStream(String nextElement) {
+ return false;
+ }
+
+ @Override
+ public String serialize(String element) {
+ return element;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3dc54d6..7d77b5b 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
SingleOutputStreamOperator, GroupedDataStream}
+import org.apache.flink.streaming.util.serialization.SerializationSchema
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -605,6 +606,13 @@ class DataStream[T](javaStream: JavaStream[T]) {
javaStream.writeAsCsv(path, millis)
/**
+ * Writes the DataStream to a socket as a byte array. The format of the output is
+ * specified by a {@link SerializationSchema}.
+ */
+ def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T, Array[Byte]]):
+ DataStream[T] = javaStream.writeToSocket(hostname, port, schema)
+
+ /**
* Adds the given sink to this DataStream. Only streams with sinks added
* will be executed once the StreamExecutionEnvironment.execute(...)
* method is called.