You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/16 08:19:19 UTC

flink git commit: [FLINK-8175] remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala

Repository: flink
Updated Branches:
  refs/heads/master 1440e4feb -> 907361d86


[FLINK-8175] remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala

update doc

move classes to /experimental

update license header

reorg scala class level

enforce stylecheck and change API annotation

This closes #5112.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/907361d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/907361d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/907361d8

Branch: refs/heads/master
Commit: 907361d862c77a70ff60d27e7fcc13647eac0e6d
Parents: 1440e4f
Author: Bowen Li <bo...@gmail.com>
Authored: Sat Dec 2 00:18:57 2017 -0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jan 16 09:13:24 2018 +0100

----------------------------------------------------------------------
 docs/dev/datastream_api.md                      |   7 +-
 flink-contrib/flink-streaming-contrib/pom.xml   | 187 ------------------
 .../flink/contrib/streaming/CollectSink.java    | 116 -----------
 .../contrib/streaming/DataStreamUtils.java      | 110 -----------
 .../contrib/streaming/SocketStreamIterator.java | 189 ------------------
 .../contrib/streaming/scala/utils/package.scala |  48 -----
 .../flink/contrib/streaming/CollectITCase.java  |  66 -------
 .../streaming/SocketStreamIteratorTest.java     | 109 -----------
 .../src/test/resources/log4j-test.properties    |  27 ---
 flink-contrib/pom.xml                           |   1 -
 .../streaming/experimental/CollectSink.java     | 121 ++++++++++++
 .../streaming/experimental/DataStreamUtils.java | 115 +++++++++++
 .../experimental/SocketStreamIterator.java      | 194 +++++++++++++++++++
 .../streaming/experimental/package-info.java    |  25 +++
 .../experimental/SocketStreamIteratorTest.java  | 112 +++++++++++
 .../experimental/scala/DataStreamUtils.scala    |  48 +++++
 .../streaming/experimental/CollectITCase.java   |  70 +++++++
 17 files changed, 690 insertions(+), 855 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 32cb519..dee2f78 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -598,7 +598,7 @@ Flink also provides a sink to collect DataStream results for testing and debuggi
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-import org.apache.flink.contrib.streaming.DataStreamUtils
+import org.apache.flink.streaming.experimental.DataStreamUtils
 
 DataStream<Tuple2<String, Integer>> myResult = ...
 Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
@@ -608,7 +608,7 @@ Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
 <div data-lang="scala" markdown="1">
 
 {% highlight scala %}
-import org.apache.flink.contrib.streaming.DataStreamUtils
+import org.apache.flink.streaming.experimental.DataStreamUtils
 import scala.collection.JavaConverters.asScalaIteratorConverter
 
 val myResult: DataStream[(String, Int)] = ...
@@ -619,6 +619,9 @@ val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStr
 
 {% top %}
 
+**Note:** `flink-streaming-contrib` module is removed from Flink 1.5.0.
+Its classes have been moved into `flink-streaming-java` and `flink-streaming-scala`.
+
 Where to go next?
 -----------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/pom.xml b/flink-contrib/flink-streaming-contrib/pom.xml
deleted file mode 100644
index b238164..0000000
--- a/flink-contrib/flink-streaming-contrib/pom.xml
+++ /dev/null
@@ -1,187 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-contrib</artifactId>
-		<version>1.5-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-streaming-contrib_${scala.binary.version}</artifactId>
-	<name>flink-streaming-contrib</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-
-		<!-- core dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<!-- test dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<executions>
-					<!-- Run scala compiler in the process-resources phase, so that dependencies on
-						scala classes can be resolved later in the (Java) compile phase -->
-					<execution>
-						<id>scala-compile-first</id>
-						<phase>process-resources</phase>
-						<goals>
-							<goal>compile</goal>
-						</goals>
-					</execution>
-
-					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-						 scala classes can be resolved later in the (Java) test-compile phase -->
-					<execution>
-						<id>scala-test-compile</id>
-						<phase>process-test-resources</phase>
-						<goals>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<jvmArgs>
-						<jvmArg>-Xms128m</jvmArg>
-						<jvmArg>-Xmx512m</jvmArg>
-					</jvmArgs>
-					<compilerPlugins combine.children="append">
-						<compilerPlugin>
-							<groupId>org.scalamacros</groupId>
-							<artifactId>paradise_${scala.version}</artifactId>
-							<version>${scala.macros.version}</version>
-						</compilerPlugin>
-					</compilerPlugins>
-				</configuration>
-			</plugin>
-
-			<!-- Eclipse Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-
-			<!-- Adding scala source directories to build path -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-					<!-- Add src/test/scala to eclipse build path -->
-					<execution>
-						<id>add-test-source</id>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/test/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!-- Scala Code Style, most of the configuration done via plugin management -->
-			<plugin>
-				<groupId>org.scalastyle</groupId>
-				<artifactId>scalastyle-maven-plugin</artifactId>
-				<configuration>
-					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
deleted file mode 100644
index 13127fe..0000000
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *	http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.Socket;
-
-/**
- * A specialized data sink to be used by DataStreamUtils.collect.
- */
-class CollectSink<IN> extends RichSinkFunction<IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final InetAddress hostIp;
-	private final int port;
-	private final TypeSerializer<IN> serializer;
-
-	private transient Socket client;
-	private transient OutputStream outputStream;
-	private transient DataOutputViewStreamWrapper streamWriter;
-
-	/**
-	 * Creates a CollectSink that will send the data to the specified host.
-	 *
-	 * @param hostIp IP address of the Socket server.
-	 * @param port Port of the Socket server.
-	 * @param serializer A serializer for the data.
-	 */
-	public CollectSink(InetAddress hostIp, int port, TypeSerializer<IN> serializer) {
-		this.hostIp = hostIp;
-		this.port = port;
-		this.serializer = serializer;
-	}
-
-	@Override
-	public void invoke(IN value) throws Exception {
-		try {
-			serializer.serialize(value, streamWriter);
-		}
-		catch (Exception e) {
-			throw new IOException("Error sending data back to client (" + hostIp.toString() + ":" + port + ')', e);
-		}
-	}
-
-	/**
-	 * Initialize the connection with the Socket in the server.
-	 * @param parameters Configuration.
-	 */
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		try {
-			client = new Socket(hostIp, port);
-			outputStream = client.getOutputStream();
-			streamWriter = new DataOutputViewStreamWrapper(outputStream);
-		}
-		catch (IOException e) {
-			throw new IOException("Cannot connect to the client to send back the stream", e);
-		}
-	}
-
-	/**
-	 * Closes the connection with the Socket server.
-	 */
-	@Override
-	public void close() throws Exception {
-		try {
-			if (outputStream != null) {
-				outputStream.flush();
-				outputStream.close();
-			}
-
-			// first regular attempt to cleanly close. Failing that will escalate
-			if (client != null) {
-				client.close();
-			}
-		}
-		catch (Exception e) {
-			throw new IOException("Error while closing connection that streams data back to client at "
-					+ hostIp.toString() + ":" + port, e);
-		}
-		finally {
-			// if we failed prior to closing the client, close it
-			if (client != null) {
-				try {
-					client.close();
-				}
-				catch (Throwable t) {
-					// best effort to close, we do not care about an exception here any more
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
deleted file mode 100644
index 430c98c..0000000
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *	http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.net.ConnectionUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.Iterator;
-
-/**
- * A collection of utilities for {@link DataStream DataStreams}.
- */
-public final class DataStreamUtils {
-
-	/**
-	 * Returns an iterator to iterate over the elements of the DataStream.
-	 * @return The iterator
-	 */
-	public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) throws IOException {
-
-		TypeSerializer<OUT> serializer = stream.getType().createSerializer(
-				stream.getExecutionEnvironment().getConfig());
-
-		SocketStreamIterator<OUT> iter = new SocketStreamIterator<OUT>(serializer);
-
-		//Find out what IP of us should be given to CollectSink, that it will be able to connect to
-		StreamExecutionEnvironment env = stream.getExecutionEnvironment();
-		InetAddress clientAddress;
-
-		if (env instanceof RemoteStreamEnvironment) {
-			String host = ((RemoteStreamEnvironment) env).getHost();
-			int port = ((RemoteStreamEnvironment) env).getPort();
-			try {
-				clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
-			}
-			catch (Exception e) {
-				throw new IOException("Could not determine an suitable network address to " +
-						"receive back data from the streaming program.", e);
-			}
-		} else if (env instanceof LocalStreamEnvironment) {
-			clientAddress = InetAddress.getLoopbackAddress();
-		} else {
-			try {
-				clientAddress = InetAddress.getLocalHost();
-			} catch (UnknownHostException e) {
-				throw new IOException("Could not determine this machines own local address to " +
-						"receive back data from the streaming program.", e);
-			}
-		}
-
-		DataStreamSink<OUT> sink = stream.addSink(new CollectSink<OUT>(clientAddress, iter.getPort(), serializer));
-		sink.setParallelism(1); // It would not work if multiple instances would connect to the same port
-
-		(new CallExecute(env, iter)).start();
-
-		return iter;
-	}
-
-	private static class CallExecute extends Thread {
-
-		private final StreamExecutionEnvironment toTrigger;
-		private final SocketStreamIterator<?> toNotify;
-
-		private CallExecute(StreamExecutionEnvironment toTrigger, SocketStreamIterator<?> toNotify) {
-			this.toTrigger = toTrigger;
-			this.toNotify = toNotify;
-		}
-
-		@Override
-		public void run(){
-			try {
-				toTrigger.execute();
-			}
-			catch (Throwable t) {
-				toNotify.notifyOfError(t);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private DataStreamUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
deleted file mode 100644
index fddfe4e..0000000
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-/**
- * An iterator that returns the data from a socket stream.
- *
- * <p>The iterator's constructor opens a server socket. In the first call to {@link #next()}
- * or {@link #hasNext()}, the iterator waits for a socket to connect, and starts receiving,
- * deserializing, and returning the data from that socket.
- *
- * @param <T> The type of elements returned from the iterator.
- */
-class SocketStreamIterator<T> implements Iterator<T> {
-
-	/** Server socket to listen at. */
-	private final ServerSocket socket;
-
-	/** Serializer to deserialize stream. */
-	private final TypeSerializer<T> serializer;
-
-	/** Set by the same thread that reads it. */
-	private DataInputViewStreamWrapper inStream;
-
-	/** Next element, handover from hasNext() to next(). */
-	private T next;
-
-	/** The socket for the specific stream. */
-	private Socket connectedSocket;
-
-	/** Async error, for example by the executor of the program that produces the stream. */
-	private volatile Throwable error;
-
-	SocketStreamIterator(TypeSerializer<T> serializer) throws IOException {
-		this.serializer = serializer;
-		try {
-			socket = new ServerSocket(0, 1);
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Could not open socket to receive back stream results");
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  properties
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Returns the port on which the iterator is getting the data. (Used internally.)
-	 * @return The port
-	 */
-	public int getPort() {
-		return socket.getLocalPort();
-	}
-
-	public InetAddress getBindAddress() {
-		return socket.getInetAddress();
-	}
-
-	public void close() {
-		if (connectedSocket != null) {
-			try {
-				connectedSocket.close();
-			} catch (Throwable ignored) {}
-		}
-
-		try {
-			socket.close();
-		} catch (Throwable ignored) {}
-	}
-
-	// ------------------------------------------------------------------------
-	//  iterator semantics
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Returns true if the DataStream has more elements.
-	 * (Note: blocks if there will be more elements, but they are not available yet.)
-	 * @return true if the DataStream has more elements
-	 */
-	@Override
-	public boolean hasNext() {
-		if (next == null) {
-			try {
-				next = readNextFromStream();
-			} catch (Exception e) {
-				throw new RuntimeException("Failed to receive next element: " + e.getMessage(), e);
-			}
-		}
-
-		return next != null;
-	}
-
-	/**
-	 * Returns the next element of the DataStream. (Blocks if it is not available yet.)
-	 * @return The element
-	 * @throws NoSuchElementException if the stream has already ended
-	 */
-	@Override
-	public T next() {
-		if (hasNext()) {
-			T current = next;
-			next = null;
-			return current;
-		} else {
-			throw new NoSuchElementException();
-		}
-	}
-
-	@Override
-	public void remove() {
-		throw new UnsupportedOperationException();
-	}
-
-	private T readNextFromStream() throws Exception {
-		try {
-			if (inStream == null) {
-				connectedSocket = socket.accept();
-				inStream = new DataInputViewStreamWrapper(connectedSocket.getInputStream());
-			}
-
-			return serializer.deserialize(inStream);
-		}
-		catch (EOFException e) {
-			try {
-				connectedSocket.close();
-			} catch (Throwable ignored) {}
-
-			try {
-				socket.close();
-			} catch (Throwable ignored) {}
-
-			return null;
-		}
-		catch (Exception e) {
-			if (error == null) {
-				throw e;
-			}
-			else {
-				// throw the root cause error
-				throw new Exception("Receiving stream failed: " + error.getMessage(), error);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  errors
-	// ------------------------------------------------------------------------
-
-	public void notifyOfError(Throwable error) {
-		if (error != null && this.error == null) {
-			this.error = error;
-
-			// this should wake up any blocking calls
-			try {
-				connectedSocket.close();
-			} catch (Throwable ignored) {}
-			try {
-				socket.close();
-			} catch (Throwable ignored) {}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala b/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala
deleted file mode 100644
index 86a2bdc..0000000
--- a/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.scala
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.contrib.streaming.{DataStreamUtils => JavaStreamUtils}
-import org.apache.flink.streaming.api.scala._
-
-import _root_.scala.reflect.ClassTag
-import scala.collection.JavaConverters._
-
-package object utils {
-
-  /**
-   * This class provides simple utility methods for collecting a [[DataStream]],
-   * effectively enriching it with the functionality encapsulated by [[JavaStreamUtils]].
-   *
-   * @param self DataStream
-   */
-  implicit class DataStreamUtils[T: TypeInformation : ClassTag](val self: DataStream[T]) {
-
-    /**
-      * Returns a scala iterator to iterate over the elements of the DataStream.
-      * @return The iterator
-      */
-    def collect() : Iterator[T] = {
-      JavaStreamUtils.collect(self.javaStream).asScala
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
deleted file mode 100644
index 55a4df3..0000000
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.Iterator;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * This test verifies the behavior of DataStreamUtils.collect.
- */
-public class CollectITCase extends TestLogger {
-
-	@Test
-	public void testCollect() throws Exception {
-		final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
-		try {
-			cluster.start();
-
-			TestStreamEnvironment.setAsContext(cluster, 1);
-
-			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-			final long n = 10;
-			DataStream<Long> stream = env.generateSequence(1, n);
-
-			long i = 1;
-			for (Iterator<Long> it = DataStreamUtils.collect(stream); it.hasNext(); ) {
-				long x = it.next();
-				assertEquals("received wrong element", i, x);
-				i++;
-			}
-
-			assertEquals("received wrong number of elements", n + 1, i);
-		}
-		finally {
-			TestStreamEnvironment.unsetAsContext();
-			cluster.stop();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
deleted file mode 100644
index 0693ce2..0000000
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-
-import org.junit.Test;
-
-import java.net.Socket;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for the SocketStreamIterator.
- */
-public class SocketStreamIteratorTest {
-
-	@Test
-	public void testIterator() throws Exception {
-
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-
-		final long seed = new Random().nextLong();
-		final int numElements = 1000;
-
-		final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE);
-
-		Thread writer = new Thread() {
-
-			@Override
-			public void run() {
-				try {
-					try (Socket sock = new Socket(iterator.getBindAddress(), iterator.getPort());
-						DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(sock.getOutputStream())) {
-
-						final TypeSerializer<Long> serializer = LongSerializer.INSTANCE;
-						final Random rnd = new Random(seed);
-
-						for (int i = 0; i < numElements; i++) {
-							serializer.serialize(rnd.nextLong(), out);
-						}
-					}
-				}
-				catch (Throwable t) {
-					error.set(t);
-				}
-			}
-		};
-
-		writer.start();
-
-		final Random validator = new Random(seed);
-		for (int i = 0; i < numElements; i++) {
-			assertTrue(iterator.hasNext());
-			assertTrue(iterator.hasNext());
-			assertEquals(validator.nextLong(), iterator.next().longValue());
-		}
-
-		assertFalse(iterator.hasNext());
-		writer.join();
-		assertFalse(iterator.hasNext());
-	}
-
-	@Test
-	public void testIteratorWithException() throws Exception {
-
-		final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE);
-
-		// asynchronously set an error
-		new Thread() {
-			@Override
-			public void run() {
-				try {
-					Thread.sleep(100);
-				} catch (InterruptedException ignored) {}
-				iterator.notifyOfError(new Exception("test"));
-			}
-		}.start();
-
-		try {
-			iterator.hasNext();
-		}
-		catch (Exception e) {
-			assertTrue(e.getCause().getMessage().contains("test"));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties b/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties
deleted file mode 100644
index 45a18ec..0000000
--- a/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=ON, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/pom.xml b/flink-contrib/pom.xml
index d80ddb2..30cadfa 100644
--- a/flink-contrib/pom.xml
+++ b/flink-contrib/pom.xml
@@ -39,7 +39,6 @@ under the License.
 	<modules>
 		<module>flink-storm</module>
 		<module>flink-storm-examples</module>
-		<module>flink-streaming-contrib</module>
 		<module>flink-connector-wikiedits</module>
 		<module>flink-statebackend-rocksdb</module>
 	</modules>

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
new file mode 100644
index 0000000..23b5280
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.experimental;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+
+/**
+ * A specialized data sink to be used by DataStreamUtils.collect().
+ *
+ * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
+ * for more information.
+ */
+@Internal
+class CollectSink<IN> extends RichSinkFunction<IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final InetAddress hostIp;
+	private final int port;
+	private final TypeSerializer<IN> serializer;
+
+	private transient Socket client;
+	private transient OutputStream outputStream;
+	private transient DataOutputViewStreamWrapper streamWriter;
+
+	/**
+	 * Creates a CollectSink that will send the data to the specified host.
+	 *
+	 * @param hostIp IP address of the Socket server.
+	 * @param port Port of the Socket server.
+	 * @param serializer A serializer for the data.
+	 */
+	public CollectSink(InetAddress hostIp, int port, TypeSerializer<IN> serializer) {
+		this.hostIp = hostIp;
+		this.port = port;
+		this.serializer = serializer;
+	}
+
+	@Override
+	public void invoke(IN value) throws Exception {
+		try {
+			serializer.serialize(value, streamWriter);
+		}
+		catch (Exception e) {
+			throw new IOException("Error sending data back to client (" + hostIp.toString() + ":" + port + ')', e);
+		}
+	}
+
+	/**
+	 * Initialize the connection with the Socket in the server.
+	 * @param parameters Configuration.
+	 */
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		try {
+			client = new Socket(hostIp, port);
+			outputStream = client.getOutputStream();
+			streamWriter = new DataOutputViewStreamWrapper(outputStream);
+		}
+		catch (IOException e) {
+			throw new IOException("Cannot connect to the client to send back the stream", e);
+		}
+	}
+
+	/**
+	 * Closes the connection with the Socket server.
+	 */
+	@Override
+	public void close() throws Exception {
+		try {
+			if (outputStream != null) {
+				outputStream.flush();
+				outputStream.close();
+			}
+
+			// first regular attempt to cleanly close. Failing that will escalate
+			if (client != null) {
+				client.close();
+			}
+		}
+		catch (Exception e) {
+			throw new IOException("Error while closing connection that streams data back to client at "
+					+ hostIp.toString() + ":" + port, e);
+		}
+		finally {
+			// if we failed prior to closing the client, close it
+			if (client != null) {
+				try {
+					client.close();
+				}
+				catch (Throwable t) {
+					// best effort to close, we do not care about an exception here any more
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
new file mode 100644
index 0000000..59ad6a8
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.experimental;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+
+/**
+ * A collection of utilities for {@link DataStream DataStreams}.
+ *
+ * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
+ * for more information.
+ */
+@PublicEvolving
+public final class DataStreamUtils {
+
+	/**
+	 * Returns an iterator to iterate over the elements of the DataStream.
+	 * @return The iterator
+	 */
+	public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) throws IOException {
+
+		TypeSerializer<OUT> serializer = stream.getType().createSerializer(
+				stream.getExecutionEnvironment().getConfig());
+
+		SocketStreamIterator<OUT> iter = new SocketStreamIterator<OUT>(serializer);
+
+		//Find out what IP of us should be given to CollectSink, that it will be able to connect to
+		StreamExecutionEnvironment env = stream.getExecutionEnvironment();
+		InetAddress clientAddress;
+
+		if (env instanceof RemoteStreamEnvironment) {
+			String host = ((RemoteStreamEnvironment) env).getHost();
+			int port = ((RemoteStreamEnvironment) env).getPort();
+			try {
+				clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
+			}
+			catch (Exception e) {
+				throw new IOException("Could not determine an suitable network address to " +
+						"receive back data from the streaming program.", e);
+			}
+		} else if (env instanceof LocalStreamEnvironment) {
+			clientAddress = InetAddress.getLoopbackAddress();
+		} else {
+			try {
+				clientAddress = InetAddress.getLocalHost();
+			} catch (UnknownHostException e) {
+				throw new IOException("Could not determine this machines own local address to " +
+						"receive back data from the streaming program.", e);
+			}
+		}
+
+		DataStreamSink<OUT> sink = stream.addSink(new CollectSink<OUT>(clientAddress, iter.getPort(), serializer));
+		sink.setParallelism(1); // It would not work if multiple instances would connect to the same port
+
+		(new CallExecute(env, iter)).start();
+
+		return iter;
+	}
+
+	private static class CallExecute extends Thread {
+
+		private final StreamExecutionEnvironment toTrigger;
+		private final SocketStreamIterator<?> toNotify;
+
+		private CallExecute(StreamExecutionEnvironment toTrigger, SocketStreamIterator<?> toNotify) {
+			this.toTrigger = toTrigger;
+			this.toNotify = toNotify;
+		}
+
+		@Override
+		public void run(){
+			try {
+				toTrigger.execute();
+			}
+			catch (Throwable t) {
+				toNotify.notifyOfError(t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Private constructor to prevent instantiation.
+	 */
+	private DataStreamUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
new file mode 100644
index 0000000..871c0f7
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.experimental;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * An iterator that returns the data from a socket stream.
+ *
+ * <p>The iterator's constructor opens a server socket. In the first call to {@link #next()}
+ * or {@link #hasNext()}, the iterator waits for a socket to connect, and starts receiving,
+ * deserializing, and returning the data from that socket.
+ *
+ * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
+ * for more information.
+ *
+ * @param <T> The type of elements returned from the iterator.
+ */
+@PublicEvolving
+class SocketStreamIterator<T> implements Iterator<T> {
+
+	/** Server socket to listen at. */
+	private final ServerSocket socket;
+
+	/** Serializer to deserialize stream. */
+	private final TypeSerializer<T> serializer;
+
+	/** Set by the same thread that reads it. */
+	private DataInputViewStreamWrapper inStream;
+
+	/** Next element, handover from hasNext() to next(). */
+	private T next;
+
+	/** The socket for the specific stream. */
+	private Socket connectedSocket;
+
+	/** Async error, for example by the executor of the program that produces the stream. */
+	private volatile Throwable error;
+
+	SocketStreamIterator(TypeSerializer<T> serializer) throws IOException {
+		this.serializer = serializer;
+		try {
+			socket = new ServerSocket(0, 1);
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Could not open socket to receive back stream results");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns the port on which the iterator is getting the data. (Used internally.)
+	 * @return The port
+	 */
+	public int getPort() {
+		return socket.getLocalPort();
+	}
+
+	public InetAddress getBindAddress() {
+		return socket.getInetAddress();
+	}
+
+	public void close() {
+		if (connectedSocket != null) {
+			try {
+				connectedSocket.close();
+			} catch (Throwable ignored) {}
+		}
+
+		try {
+			socket.close();
+		} catch (Throwable ignored) {}
+	}
+
+	// ------------------------------------------------------------------------
+	//  iterator semantics
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns true if the DataStream has more elements.
+	 * (Note: blocks if there will be more elements, but they are not available yet.)
+	 * @return true if the DataStream has more elements
+	 */
+	@Override
+	public boolean hasNext() {
+		if (next == null) {
+			try {
+				next = readNextFromStream();
+			} catch (Exception e) {
+				throw new RuntimeException("Failed to receive next element: " + e.getMessage(), e);
+			}
+		}
+
+		return next != null;
+	}
+
+	/**
+	 * Returns the next element of the DataStream. (Blocks if it is not available yet.)
+	 * @return The element
+	 * @throws NoSuchElementException if the stream has already ended
+	 */
+	@Override
+	public T next() {
+		if (hasNext()) {
+			T current = next;
+			next = null;
+			return current;
+		} else {
+			throw new NoSuchElementException();
+		}
+	}
+
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+
+	private T readNextFromStream() throws Exception {
+		try {
+			if (inStream == null) {
+				connectedSocket = socket.accept();
+				inStream = new DataInputViewStreamWrapper(connectedSocket.getInputStream());
+			}
+
+			return serializer.deserialize(inStream);
+		}
+		catch (EOFException e) {
+			try {
+				connectedSocket.close();
+			} catch (Throwable ignored) {}
+
+			try {
+				socket.close();
+			} catch (Throwable ignored) {}
+
+			return null;
+		}
+		catch (Exception e) {
+			if (error == null) {
+				throw e;
+			}
+			else {
+				// throw the root cause error
+				throw new Exception("Receiving stream failed: " + error.getMessage(), error);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  errors
+	// ------------------------------------------------------------------------
+
+	public void notifyOfError(Throwable error) {
+		if (error != null && this.error == null) {
+			this.error = error;
+
+			// this should wake up any blocking calls
+			try {
+				connectedSocket.close();
+			} catch (Throwable ignored) {}
+			try {
+				socket.close();
+			} catch (Throwable ignored) {}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/package-info.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/package-info.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/package-info.java
new file mode 100644
index 0000000..57e637c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package holds classes that are experimental.
+ *
+ * <p>They are NOT battle-tested code and may be changed or removed in future versions.
+ *
+ * <p>None of the classes should be @Public.
+ */
+package org.apache.flink.streaming.experimental;

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java
new file mode 100644
index 0000000..718d46c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.experimental;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Test;
+
+import java.net.Socket;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the SocketStreamIterator.
+ *
+ * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
+ * for more information.
+ */
+public class SocketStreamIteratorTest {
+
+	@Test
+	public void testIterator() throws Exception {
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final long seed = new Random().nextLong();
+		final int numElements = 1000;
+
+		final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE);
+
+		Thread writer = new Thread() {
+
+			@Override
+			public void run() {
+				try {
+					try (Socket sock = new Socket(iterator.getBindAddress(), iterator.getPort());
+						DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(sock.getOutputStream())) {
+
+						final TypeSerializer<Long> serializer = LongSerializer.INSTANCE;
+						final Random rnd = new Random(seed);
+
+						for (int i = 0; i < numElements; i++) {
+							serializer.serialize(rnd.nextLong(), out);
+						}
+					}
+				}
+				catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+
+		writer.start();
+
+		final Random validator = new Random(seed);
+		for (int i = 0; i < numElements; i++) {
+			assertTrue(iterator.hasNext());
+			assertTrue(iterator.hasNext());
+			assertEquals(validator.nextLong(), iterator.next().longValue());
+		}
+
+		assertFalse(iterator.hasNext());
+		writer.join();
+		assertFalse(iterator.hasNext());
+	}
+
+	@Test
+	public void testIteratorWithException() throws Exception {
+
+		final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE);
+
+		// asynchronously set an error
+		new Thread() {
+			@Override
+			public void run() {
+				try {
+					Thread.sleep(100);
+				} catch (InterruptedException ignored) {}
+				iterator.notifyOfError(new Exception("test"));
+			}
+		}.start();
+
+		try {
+			iterator.hasNext();
+		}
+		catch (Exception e) {
+			assertTrue(e.getCause().getMessage().contains("test"));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
new file mode 100644
index 0000000..8c4beff
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.experimental.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.experimental.{DataStreamUtils => JavaStreamUtils}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+/**
+  * This class provides simple utility methods for collecting a [[DataStream]],
+  * effectively enriching it with the functionality encapsulated by [[DataStreamUtils]].
+  *
+  * This experimental class is relocated from flink-streaming-contrib.
+  *
+  * @param self DataStream
+  */
+@PublicEvolving
+class DataStreamUtils[T: TypeInformation : ClassTag](val self: DataStream[T]) {
+
+  /**
+    * Returns a scala iterator to iterate over the elements of the DataStream.
+    * @return The iterator
+    */
+  def collect() : Iterator[T] = {
+    JavaStreamUtils.collect(self.javaStream).asScala
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
new file mode 100644
index 0000000..ad07390
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.experimental;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.experimental.DataStreamUtils;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test verifies the behavior of DataStreamUtils.collect().
+ *
+ * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
+ * for more information.
+ */
+public class CollectITCase extends TestLogger {
+
+	@Test
+	public void testCollect() throws Exception {
+		final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
+		try {
+			cluster.start();
+
+			TestStreamEnvironment.setAsContext(cluster, 1);
+
+			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+			final long n = 10;
+			DataStream<Long> stream = env.generateSequence(1, n);
+
+			long i = 1;
+			for (Iterator<Long> it = DataStreamUtils.collect(stream); it.hasNext(); ) {
+				long x = it.next();
+				assertEquals("received wrong element", i, x);
+				i++;
+			}
+
+			assertEquals("received wrong number of elements", n + 1, i);
+		}
+		finally {
+			TestStreamEnvironment.unsetAsContext();
+			cluster.stop();
+		}
+	}
+}