You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/16 08:19:19 UTC
flink git commit: [FLINK-8175] remove flink-streaming-contrib and
migrate its classes to flink-streaming-java/scala
Repository: flink
Updated Branches:
refs/heads/master 1440e4feb -> 907361d86
[FLINK-8175] remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala
update doc
move classes to /experimental
update license header
reorg scala class level
enforce stylecheck and change API annotation
This closes #5112.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/907361d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/907361d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/907361d8
Branch: refs/heads/master
Commit: 907361d862c77a70ff60d27e7fcc13647eac0e6d
Parents: 1440e4f
Author: Bowen Li <bo...@gmail.com>
Authored: Sat Dec 2 00:18:57 2017 -0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jan 16 09:13:24 2018 +0100
----------------------------------------------------------------------
docs/dev/datastream_api.md | 7 +-
flink-contrib/flink-streaming-contrib/pom.xml | 187 ------------------
.../flink/contrib/streaming/CollectSink.java | 116 -----------
.../contrib/streaming/DataStreamUtils.java | 110 -----------
.../contrib/streaming/SocketStreamIterator.java | 189 ------------------
.../contrib/streaming/scala/utils/package.scala | 48 -----
.../flink/contrib/streaming/CollectITCase.java | 66 -------
.../streaming/SocketStreamIteratorTest.java | 109 -----------
.../src/test/resources/log4j-test.properties | 27 ---
flink-contrib/pom.xml | 1 -
.../streaming/experimental/CollectSink.java | 121 ++++++++++++
.../streaming/experimental/DataStreamUtils.java | 115 +++++++++++
.../experimental/SocketStreamIterator.java | 194 +++++++++++++++++++
.../streaming/experimental/package-info.java | 25 +++
.../experimental/SocketStreamIteratorTest.java | 112 +++++++++++
.../experimental/scala/DataStreamUtils.scala | 48 +++++
.../streaming/experimental/CollectITCase.java | 70 +++++++
17 files changed, 690 insertions(+), 855 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 32cb519..dee2f78 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -598,7 +598,7 @@ Flink also provides a sink to collect DataStream results for testing and debuggi
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-import org.apache.flink.contrib.streaming.DataStreamUtils
+import org.apache.flink.streaming.experimental.DataStreamUtils
DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
@@ -608,7 +608,7 @@ Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
<div data-lang="scala" markdown="1">
{% highlight scala %}
-import org.apache.flink.contrib.streaming.DataStreamUtils
+import org.apache.flink.streaming.experimental.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter
val myResult: DataStream[(String, Int)] = ...
@@ -619,6 +619,9 @@ val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStr
{% top %}
+**Note:** `flink-streaming-contrib` module is removed from Flink 1.5.0.
+Its classes have been moved into `flink-streaming-java` and `flink-streaming-scala`.
+
Where to go next?
-----------------
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/pom.xml b/flink-contrib/flink-streaming-contrib/pom.xml
deleted file mode 100644
index b238164..0000000
--- a/flink-contrib/flink-streaming-contrib/pom.xml
+++ /dev/null
@@ -1,187 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-contrib</artifactId>
- <version>1.5-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-streaming-contrib_${scala.binary.version}</artifactId>
- <name>flink-streaming-contrib</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
-
- <!-- core dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <!-- test dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <!-- Scala Compiler -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <executions>
- <!-- Run scala compiler in the process-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) compile phase -->
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
-
- <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) test-compile phase -->
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <jvmArgs>
- <jvmArg>-Xms128m</jvmArg>
- <jvmArg>-Xmx512m</jvmArg>
- </jvmArgs>
- <compilerPlugins combine.children="append">
- <compilerPlugin>
- <groupId>org.scalamacros</groupId>
- <artifactId>paradise_${scala.version}</artifactId>
- <version>${scala.macros.version}</version>
- </compilerPlugin>
- </compilerPlugins>
- </configuration>
- </plugin>
-
- <!-- Eclipse Integration -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.8</version>
- <configuration>
- <downloadSources>true</downloadSources>
- <projectnatures>
- <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
- <projectnature>org.eclipse.jdt.core.javanature</projectnature>
- </projectnatures>
- <buildcommands>
- <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
- </buildcommands>
- <classpathContainers>
- <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
- <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
- </classpathContainers>
- <excludes>
- <exclude>org.scala-lang:scala-library</exclude>
- <exclude>org.scala-lang:scala-compiler</exclude>
- </excludes>
- <sourceIncludes>
- <sourceInclude>**/*.scala</sourceInclude>
- <sourceInclude>**/*.java</sourceInclude>
- </sourceIncludes>
- </configuration>
- </plugin>
-
- <!-- Adding scala source directories to build path -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <!-- Add src/main/scala to eclipse build path -->
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/scala</source>
- </sources>
- </configuration>
- </execution>
- <!-- Add src/test/scala to eclipse build path -->
- <execution>
- <id>add-test-source</id>
- <phase>generate-test-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/test/scala</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- Scala Code Style, most of the configuration done via plugin management -->
- <plugin>
- <groupId>org.scalastyle</groupId>
- <artifactId>scalastyle-maven-plugin</artifactId>
- <configuration>
- <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
deleted file mode 100644
index 13127fe..0000000
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.Socket;
-
-/**
- * A specialized data sink to be used by DataStreamUtils.collect.
- */
-class CollectSink<IN> extends RichSinkFunction<IN> {
-
- private static final long serialVersionUID = 1L;
-
- private final InetAddress hostIp;
- private final int port;
- private final TypeSerializer<IN> serializer;
-
- private transient Socket client;
- private transient OutputStream outputStream;
- private transient DataOutputViewStreamWrapper streamWriter;
-
- /**
- * Creates a CollectSink that will send the data to the specified host.
- *
- * @param hostIp IP address of the Socket server.
- * @param port Port of the Socket server.
- * @param serializer A serializer for the data.
- */
- public CollectSink(InetAddress hostIp, int port, TypeSerializer<IN> serializer) {
- this.hostIp = hostIp;
- this.port = port;
- this.serializer = serializer;
- }
-
- @Override
- public void invoke(IN value) throws Exception {
- try {
- serializer.serialize(value, streamWriter);
- }
- catch (Exception e) {
- throw new IOException("Error sending data back to client (" + hostIp.toString() + ":" + port + ')', e);
- }
- }
-
- /**
- * Initialize the connection with the Socket in the server.
- * @param parameters Configuration.
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- try {
- client = new Socket(hostIp, port);
- outputStream = client.getOutputStream();
- streamWriter = new DataOutputViewStreamWrapper(outputStream);
- }
- catch (IOException e) {
- throw new IOException("Cannot connect to the client to send back the stream", e);
- }
- }
-
- /**
- * Closes the connection with the Socket server.
- */
- @Override
- public void close() throws Exception {
- try {
- if (outputStream != null) {
- outputStream.flush();
- outputStream.close();
- }
-
- // first regular attempt to cleanly close. Failing that will escalate
- if (client != null) {
- client.close();
- }
- }
- catch (Exception e) {
- throw new IOException("Error while closing connection that streams data back to client at "
- + hostIp.toString() + ":" + port, e);
- }
- finally {
- // if we failed prior to closing the client, close it
- if (client != null) {
- try {
- client.close();
- }
- catch (Throwable t) {
- // best effort to close, we do not care about an exception here any more
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
deleted file mode 100644
index 430c98c..0000000
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.net.ConnectionUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.Iterator;
-
-/**
- * A collection of utilities for {@link DataStream DataStreams}.
- */
-public final class DataStreamUtils {
-
- /**
- * Returns an iterator to iterate over the elements of the DataStream.
- * @return The iterator
- */
- public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) throws IOException {
-
- TypeSerializer<OUT> serializer = stream.getType().createSerializer(
- stream.getExecutionEnvironment().getConfig());
-
- SocketStreamIterator<OUT> iter = new SocketStreamIterator<OUT>(serializer);
-
- //Find out what IP of us should be given to CollectSink, that it will be able to connect to
- StreamExecutionEnvironment env = stream.getExecutionEnvironment();
- InetAddress clientAddress;
-
- if (env instanceof RemoteStreamEnvironment) {
- String host = ((RemoteStreamEnvironment) env).getHost();
- int port = ((RemoteStreamEnvironment) env).getPort();
- try {
- clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
- }
- catch (Exception e) {
- throw new IOException("Could not determine an suitable network address to " +
- "receive back data from the streaming program.", e);
- }
- } else if (env instanceof LocalStreamEnvironment) {
- clientAddress = InetAddress.getLoopbackAddress();
- } else {
- try {
- clientAddress = InetAddress.getLocalHost();
- } catch (UnknownHostException e) {
- throw new IOException("Could not determine this machines own local address to " +
- "receive back data from the streaming program.", e);
- }
- }
-
- DataStreamSink<OUT> sink = stream.addSink(new CollectSink<OUT>(clientAddress, iter.getPort(), serializer));
- sink.setParallelism(1); // It would not work if multiple instances would connect to the same port
-
- (new CallExecute(env, iter)).start();
-
- return iter;
- }
-
- private static class CallExecute extends Thread {
-
- private final StreamExecutionEnvironment toTrigger;
- private final SocketStreamIterator<?> toNotify;
-
- private CallExecute(StreamExecutionEnvironment toTrigger, SocketStreamIterator<?> toNotify) {
- this.toTrigger = toTrigger;
- this.toNotify = toNotify;
- }
-
- @Override
- public void run(){
- try {
- toTrigger.execute();
- }
- catch (Throwable t) {
- toNotify.notifyOfError(t);
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Private constructor to prevent instantiation.
- */
- private DataStreamUtils() {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
deleted file mode 100644
index fddfe4e..0000000
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-/**
- * An iterator that returns the data from a socket stream.
- *
- * <p>The iterator's constructor opens a server socket. In the first call to {@link #next()}
- * or {@link #hasNext()}, the iterator waits for a socket to connect, and starts receiving,
- * deserializing, and returning the data from that socket.
- *
- * @param <T> The type of elements returned from the iterator.
- */
-class SocketStreamIterator<T> implements Iterator<T> {
-
- /** Server socket to listen at. */
- private final ServerSocket socket;
-
- /** Serializer to deserialize stream. */
- private final TypeSerializer<T> serializer;
-
- /** Set by the same thread that reads it. */
- private DataInputViewStreamWrapper inStream;
-
- /** Next element, handover from hasNext() to next(). */
- private T next;
-
- /** The socket for the specific stream. */
- private Socket connectedSocket;
-
- /** Async error, for example by the executor of the program that produces the stream. */
- private volatile Throwable error;
-
- SocketStreamIterator(TypeSerializer<T> serializer) throws IOException {
- this.serializer = serializer;
- try {
- socket = new ServerSocket(0, 1);
- }
- catch (IOException e) {
- throw new RuntimeException("Could not open socket to receive back stream results");
- }
- }
-
- // ------------------------------------------------------------------------
- // properties
- // ------------------------------------------------------------------------
-
- /**
- * Returns the port on which the iterator is getting the data. (Used internally.)
- * @return The port
- */
- public int getPort() {
- return socket.getLocalPort();
- }
-
- public InetAddress getBindAddress() {
- return socket.getInetAddress();
- }
-
- public void close() {
- if (connectedSocket != null) {
- try {
- connectedSocket.close();
- } catch (Throwable ignored) {}
- }
-
- try {
- socket.close();
- } catch (Throwable ignored) {}
- }
-
- // ------------------------------------------------------------------------
- // iterator semantics
- // ------------------------------------------------------------------------
-
- /**
- * Returns true if the DataStream has more elements.
- * (Note: blocks if there will be more elements, but they are not available yet.)
- * @return true if the DataStream has more elements
- */
- @Override
- public boolean hasNext() {
- if (next == null) {
- try {
- next = readNextFromStream();
- } catch (Exception e) {
- throw new RuntimeException("Failed to receive next element: " + e.getMessage(), e);
- }
- }
-
- return next != null;
- }
-
- /**
- * Returns the next element of the DataStream. (Blocks if it is not available yet.)
- * @return The element
- * @throws NoSuchElementException if the stream has already ended
- */
- @Override
- public T next() {
- if (hasNext()) {
- T current = next;
- next = null;
- return current;
- } else {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- private T readNextFromStream() throws Exception {
- try {
- if (inStream == null) {
- connectedSocket = socket.accept();
- inStream = new DataInputViewStreamWrapper(connectedSocket.getInputStream());
- }
-
- return serializer.deserialize(inStream);
- }
- catch (EOFException e) {
- try {
- connectedSocket.close();
- } catch (Throwable ignored) {}
-
- try {
- socket.close();
- } catch (Throwable ignored) {}
-
- return null;
- }
- catch (Exception e) {
- if (error == null) {
- throw e;
- }
- else {
- // throw the root cause error
- throw new Exception("Receiving stream failed: " + error.getMessage(), error);
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // errors
- // ------------------------------------------------------------------------
-
- public void notifyOfError(Throwable error) {
- if (error != null && this.error == null) {
- this.error = error;
-
- // this should wake up any blocking calls
- try {
- connectedSocket.close();
- } catch (Throwable ignored) {}
- try {
- socket.close();
- } catch (Throwable ignored) {}
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala b/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala
deleted file mode 100644
index 86a2bdc..0000000
--- a/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.scala
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.contrib.streaming.{DataStreamUtils => JavaStreamUtils}
-import org.apache.flink.streaming.api.scala._
-
-import _root_.scala.reflect.ClassTag
-import scala.collection.JavaConverters._
-
-package object utils {
-
- /**
- * This class provides simple utility methods for collecting a [[DataStream]],
- * effectively enriching it with the functionality encapsulated by [[JavaStreamUtils]].
- *
- * @param self DataStream
- */
- implicit class DataStreamUtils[T: TypeInformation : ClassTag](val self: DataStream[T]) {
-
- /**
- * Returns a scala iterator to iterate over the elements of the DataStream.
- * @return The iterator
- */
- def collect() : Iterator[T] = {
- JavaStreamUtils.collect(self.javaStream).asScala
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
deleted file mode 100644
index 55a4df3..0000000
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.Iterator;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * This test verifies the behavior of DataStreamUtils.collect.
- */
-public class CollectITCase extends TestLogger {
-
- @Test
- public void testCollect() throws Exception {
- final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
- try {
- cluster.start();
-
- TestStreamEnvironment.setAsContext(cluster, 1);
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- final long n = 10;
- DataStream<Long> stream = env.generateSequence(1, n);
-
- long i = 1;
- for (Iterator<Long> it = DataStreamUtils.collect(stream); it.hasNext(); ) {
- long x = it.next();
- assertEquals("received wrong element", i, x);
- i++;
- }
-
- assertEquals("received wrong number of elements", n + 1, i);
- }
- finally {
- TestStreamEnvironment.unsetAsContext();
- cluster.stop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
deleted file mode 100644
index 0693ce2..0000000
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-
-import org.junit.Test;
-
-import java.net.Socket;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for the SocketStreamIterator.
- */
-public class SocketStreamIteratorTest {
-
- @Test
- public void testIterator() throws Exception {
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- final long seed = new Random().nextLong();
- final int numElements = 1000;
-
- final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE);
-
- Thread writer = new Thread() {
-
- @Override
- public void run() {
- try {
- try (Socket sock = new Socket(iterator.getBindAddress(), iterator.getPort());
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(sock.getOutputStream())) {
-
- final TypeSerializer<Long> serializer = LongSerializer.INSTANCE;
- final Random rnd = new Random(seed);
-
- for (int i = 0; i < numElements; i++) {
- serializer.serialize(rnd.nextLong(), out);
- }
- }
- }
- catch (Throwable t) {
- error.set(t);
- }
- }
- };
-
- writer.start();
-
- final Random validator = new Random(seed);
- for (int i = 0; i < numElements; i++) {
- assertTrue(iterator.hasNext());
- assertTrue(iterator.hasNext());
- assertEquals(validator.nextLong(), iterator.next().longValue());
- }
-
- assertFalse(iterator.hasNext());
- writer.join();
- assertFalse(iterator.hasNext());
- }
-
- @Test
- public void testIteratorWithException() throws Exception {
-
- final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE);
-
- // asynchronously set an error
- new Thread() {
- @Override
- public void run() {
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignored) {}
- iterator.notifyOfError(new Exception("test"));
- }
- }.start();
-
- try {
- iterator.hasNext();
- }
- catch (Exception e) {
- assertTrue(e.getCause().getMessage().contains("test"));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties b/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties
deleted file mode 100644
index 45a18ec..0000000
--- a/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=ON, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/pom.xml b/flink-contrib/pom.xml
index d80ddb2..30cadfa 100644
--- a/flink-contrib/pom.xml
+++ b/flink-contrib/pom.xml
@@ -39,7 +39,6 @@ under the License.
<modules>
<module>flink-storm</module>
<module>flink-storm-examples</module>
- <module>flink-streaming-contrib</module>
<module>flink-connector-wikiedits</module>
<module>flink-statebackend-rocksdb</module>
</modules>
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
new file mode 100644
index 0000000..23b5280
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.experimental;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+
+/**
+ * A specialized data sink to be used by DataStreamUtils.collect().
+ *
+ * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
+ * for more information.
+ */
+@Internal
+class CollectSink<IN> extends RichSinkFunction<IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final InetAddress hostIp;
+ private final int port;
+ private final TypeSerializer<IN> serializer;
+
+ private transient Socket client;
+ private transient OutputStream outputStream;
+ private transient DataOutputViewStreamWrapper streamWriter;
+
+ /**
+ * Creates a CollectSink that will send the data to the specified host.
+ *
+ * @param hostIp IP address of the Socket server.
+ * @param port Port of the Socket server.
+ * @param serializer A serializer for the data.
+ */
+ public CollectSink(InetAddress hostIp, int port, TypeSerializer<IN> serializer) {
+ this.hostIp = hostIp;
+ this.port = port;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public void invoke(IN value) throws Exception {
+ try {
+ serializer.serialize(value, streamWriter);
+ }
+ catch (Exception e) {
+ throw new IOException("Error sending data back to client (" + hostIp.toString() + ":" + port + ')', e);
+ }
+ }
+
+ /**
+ * Initialize the connection with the Socket in the server.
+ * @param parameters Configuration.
+ */
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ try {
+ client = new Socket(hostIp, port);
+ outputStream = client.getOutputStream();
+ streamWriter = new DataOutputViewStreamWrapper(outputStream);
+ }
+ catch (IOException e) {
+ throw new IOException("Cannot connect to the client to send back the stream", e);
+ }
+ }
+
+ /**
+ * Closes the connection with the Socket server.
+ */
+ @Override
+ public void close() throws Exception {
+ try {
+ if (outputStream != null) {
+ outputStream.flush();
+ outputStream.close();
+ }
+
+ // first regular attempt to cleanly close. Failing that will escalate
+ if (client != null) {
+ client.close();
+ }
+ }
+ catch (Exception e) {
+ throw new IOException("Error while closing connection that streams data back to client at "
+ + hostIp.toString() + ":" + port, e);
+ }
+ finally {
+ // if we failed prior to closing the client, close it
+ if (client != null) {
+ try {
+ client.close();
+ }
+ catch (Throwable t) {
+ // best effort to close, we do not care about an exception here any more
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
new file mode 100644
index 0000000..59ad6a8
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.experimental;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+
+/**
+ * A collection of utilities for {@link DataStream DataStreams}.
+ *
+ * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
+ * for more information.
+ */
+@PublicEvolving
+public final class DataStreamUtils {
+
+ /**
+ * Returns an iterator to iterate over the elements of the DataStream.
+ * @return The iterator
+ */
+ public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) throws IOException {
+
+ TypeSerializer<OUT> serializer = stream.getType().createSerializer(
+ stream.getExecutionEnvironment().getConfig());
+
+ SocketStreamIterator<OUT> iter = new SocketStreamIterator<OUT>(serializer);
+
+ //Find out what IP of us should be given to CollectSink, that it will be able to connect to
+ StreamExecutionEnvironment env = stream.getExecutionEnvironment();
+ InetAddress clientAddress;
+
+ if (env instanceof RemoteStreamEnvironment) {
+ String host = ((RemoteStreamEnvironment) env).getHost();
+ int port = ((RemoteStreamEnvironment) env).getPort();
+ try {
+ clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
+ }
+ catch (Exception e) {
+ throw new IOException("Could not determine an suitable network address to " +
+ "receive back data from the streaming program.", e);
+ }
+ } else if (env instanceof LocalStreamEnvironment) {
+ clientAddress = InetAddress.getLoopbackAddress();
+ } else {
+ try {
+ clientAddress = InetAddress.getLocalHost();
+ } catch (UnknownHostException e) {
+ throw new IOException("Could not determine this machines own local address to " +
+ "receive back data from the streaming program.", e);
+ }
+ }
+
+ DataStreamSink<OUT> sink = stream.addSink(new CollectSink<OUT>(clientAddress, iter.getPort(), serializer));
+ sink.setParallelism(1); // It would not work if multiple instances would connect to the same port
+
+ (new CallExecute(env, iter)).start();
+
+ return iter;
+ }
+
+ private static class CallExecute extends Thread {
+
+ private final StreamExecutionEnvironment toTrigger;
+ private final SocketStreamIterator<?> toNotify;
+
+ private CallExecute(StreamExecutionEnvironment toTrigger, SocketStreamIterator<?> toNotify) {
+ this.toTrigger = toTrigger;
+ this.toNotify = toNotify;
+ }
+
+ @Override
+ public void run(){
+ try {
+ toTrigger.execute();
+ }
+ catch (Throwable t) {
+ toNotify.notifyOfError(t);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Private constructor to prevent instantiation.
+ */
+ private DataStreamUtils() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
new file mode 100644
index 0000000..871c0f7
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.experimental;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * An iterator that returns the data from a socket stream.
+ *
+ * <p>The iterator's constructor opens a server socket. In the first call to {@link #next()}
+ * or {@link #hasNext()}, the iterator waits for a socket to connect, and starts receiving,
+ * deserializing, and returning the data from that socket.
+ *
+ * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
+ * for more information.
+ *
+ * @param <T> The type of elements returned from the iterator.
+ */
+@PublicEvolving
+class SocketStreamIterator<T> implements Iterator<T> {
+
+ /** Server socket to listen at. */
+ private final ServerSocket socket;
+
+ /** Serializer to deserialize stream. */
+ private final TypeSerializer<T> serializer;
+
+ /** Set by the same thread that reads it. */
+ private DataInputViewStreamWrapper inStream;
+
+ /** Next element, handover from hasNext() to next(). */
+ private T next;
+
+ /** The socket for the specific stream. */
+ private Socket connectedSocket;
+
+ /** Async error, for example by the executor of the program that produces the stream. */
+ private volatile Throwable error;
+
+ SocketStreamIterator(TypeSerializer<T> serializer) throws IOException {
+ this.serializer = serializer;
+ try {
+ socket = new ServerSocket(0, 1);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Could not open socket to receive back stream results");
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns the port on which the iterator is getting the data. (Used internally.)
+ * @return The port
+ */
+ public int getPort() {
+ return socket.getLocalPort();
+ }
+
+ public InetAddress getBindAddress() {
+ return socket.getInetAddress();
+ }
+
+ public void close() {
+ if (connectedSocket != null) {
+ try {
+ connectedSocket.close();
+ } catch (Throwable ignored) {}
+ }
+
+ try {
+ socket.close();
+ } catch (Throwable ignored) {}
+ }
+
+ // ------------------------------------------------------------------------
+ // iterator semantics
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns true if the DataStream has more elements.
+ * (Note: blocks if there will be more elements, but they are not available yet.)
+ * @return true if the DataStream has more elements
+ */
+ @Override
+ public boolean hasNext() {
+ if (next == null) {
+ try {
+ next = readNextFromStream();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to receive next element: " + e.getMessage(), e);
+ }
+ }
+
+ return next != null;
+ }
+
+ /**
+ * Returns the next element of the DataStream. (Blocks if it is not available yet.)
+ * @return The element
+ * @throws NoSuchElementException if the stream has already ended
+ */
+ @Override
+ public T next() {
+ if (hasNext()) {
+ T current = next;
+ next = null;
+ return current;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private T readNextFromStream() throws Exception {
+ try {
+ if (inStream == null) {
+ connectedSocket = socket.accept();
+ inStream = new DataInputViewStreamWrapper(connectedSocket.getInputStream());
+ }
+
+ return serializer.deserialize(inStream);
+ }
+ catch (EOFException e) {
+ try {
+ connectedSocket.close();
+ } catch (Throwable ignored) {}
+
+ try {
+ socket.close();
+ } catch (Throwable ignored) {}
+
+ return null;
+ }
+ catch (Exception e) {
+ if (error == null) {
+ throw e;
+ }
+ else {
+ // throw the root cause error
+ throw new Exception("Receiving stream failed: " + error.getMessage(), error);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // errors
+ // ------------------------------------------------------------------------
+
+ public void notifyOfError(Throwable error) {
+ if (error != null && this.error == null) {
+ this.error = error;
+
+ // this should wake up any blocking calls
+ try {
+ connectedSocket.close();
+ } catch (Throwable ignored) {}
+ try {
+ socket.close();
+ } catch (Throwable ignored) {}
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/package-info.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/package-info.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/package-info.java
new file mode 100644
index 0000000..57e637c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package holds classes that are experimental.
+ *
+ * <p>They are NOT battle-tested code and may be changed or removed in future versions.
+ *
+ * <p>None of the classes should be @Public.
+ */
+package org.apache.flink.streaming.experimental;
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java
new file mode 100644
index 0000000..718d46c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.experimental;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Test;
+
+import java.net.Socket;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the SocketStreamIterator.
+ *
+ * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
+ * for more information.
+ */
+public class SocketStreamIteratorTest {
+
+ @Test
+ public void testIterator() throws Exception {
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final long seed = new Random().nextLong();
+ final int numElements = 1000;
+
+ final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE);
+
+ Thread writer = new Thread() {
+
+ @Override
+ public void run() {
+ try {
+ try (Socket sock = new Socket(iterator.getBindAddress(), iterator.getPort());
+ DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(sock.getOutputStream())) {
+
+ final TypeSerializer<Long> serializer = LongSerializer.INSTANCE;
+ final Random rnd = new Random(seed);
+
+ for (int i = 0; i < numElements; i++) {
+ serializer.serialize(rnd.nextLong(), out);
+ }
+ }
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+
+ writer.start();
+
+ final Random validator = new Random(seed);
+ for (int i = 0; i < numElements; i++) {
+ assertTrue(iterator.hasNext());
+ assertTrue(iterator.hasNext());
+ assertEquals(validator.nextLong(), iterator.next().longValue());
+ }
+
+ assertFalse(iterator.hasNext());
+ writer.join();
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testIteratorWithException() throws Exception {
+
+ final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE);
+
+ // asynchronously set an error
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {}
+ iterator.notifyOfError(new Exception("test"));
+ }
+ }.start();
+
+ try {
+ iterator.hasNext();
+ }
+ catch (Exception e) {
+ assertTrue(e.getCause().getMessage().contains("test"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
new file mode 100644
index 0000000..8c4beff
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.experimental.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.experimental.{DataStreamUtils => JavaStreamUtils}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+/**
+ * This class provides simple utility methods for collecting a [[DataStream]],
+ * effectively enriching it with the functionality encapsulated by [[DataStreamUtils]].
+ *
+ * This experimental class is relocated from flink-streaming-contrib.
+ *
+ * @param self DataStream
+ */
+@PublicEvolving
+class DataStreamUtils[T: TypeInformation : ClassTag](val self: DataStream[T]) {
+
+ /**
+ * Returns a scala iterator to iterate over the elements of the DataStream.
+ * @return The iterator
+ */
+ def collect() : Iterator[T] = {
+ JavaStreamUtils.collect(self.javaStream).asScala
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/907361d8/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
new file mode 100644
index 0000000..ad07390
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.experimental;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.experimental.DataStreamUtils;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test verifies the behavior of DataStreamUtils.collect().
+ *
+ * <p>This experimental class is relocated from flink-streaming-contrib. Please see package-info.java
+ * for more information.
+ */
+public class CollectITCase extends TestLogger {
+
+ @Test
+ public void testCollect() throws Exception {
+ final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
+ try {
+ cluster.start();
+
+ TestStreamEnvironment.setAsContext(cluster, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final long n = 10;
+ DataStream<Long> stream = env.generateSequence(1, n);
+
+ long i = 1;
+ for (Iterator<Long> it = DataStreamUtils.collect(stream); it.hasNext(); ) {
+ long x = it.next();
+ assertEquals("received wrong element", i, x);
+ i++;
+ }
+
+ assertEquals("received wrong number of elements", n + 1, i);
+ }
+ finally {
+ TestStreamEnvironment.unsetAsContext();
+ cluster.stop();
+ }
+ }
+}