You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/18 13:48:59 UTC

[1/7] flink git commit: [FLINK-2698] Add trailing newline to flink-conf.yaml

Repository: flink
Updated Branches:
  refs/heads/master 482d4373f -> 057109488


[FLINK-2698] Add trailing newline to flink-conf.yaml

This closes #1142


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db49a3f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db49a3f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db49a3f0

Branch: refs/heads/master
Commit: db49a3f097e1c1b9f87f5ec0dfba23d9a5faab1a
Parents: 482d437
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Sep 17 14:54:48 2015 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 11:35:11 2015 +0200

----------------------------------------------------------------------
 flink-dist/src/main/resources/flink-conf.yaml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db49a3f0/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 8ecd59c..4dd8173 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -139,4 +139,4 @@ state.backend: jobmanager
 #
 # recovery.mode: zookeeper
 #
-# ha.zookeeper.quorum: localhost
\ No newline at end of file
+# ha.zookeeper.quorum: localhost


[6/7] flink git commit: [FLINK-2702] [examples] Add JAR packaging for DistCp example, plus minor cleanups.

Posted by se...@apache.org.
[FLINK-2702] [examples] Add JAR packaging for DistCp example, plus minor cleanups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a74fa8ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a74fa8ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a74fa8ce

Branch: refs/heads/master
Commit: a74fa8ce93472c6d022852e8a4c1827d71f1bee6
Parents: b9148b6
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 18 12:01:01 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 12:01:01 2015 +0200

----------------------------------------------------------------------
 flink-examples/flink-java-examples/pom.xml      | 73 ++++++--------------
 .../flink/examples/java/distcp/DistCp.java      | 10 ++-
 .../examples/java/distcp/FileCopyTask.java      |  7 +-
 .../java/distcp/FileCopyTaskInputFormat.java    |  8 ++-
 .../java/distcp/FileCopyTaskInputSplit.java     | 12 ++--
 5 files changed, 50 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/pom.xml b/flink-examples/flink-java-examples/pom.xml
index 0ed2df6..d90d034 100644
--- a/flink-examples/flink-java-examples/pom.xml
+++ b/flink-examples/flink-java-examples/pom.xml
@@ -197,56 +197,7 @@ under the License.
 							</includes>
 						</configuration>
 					</execution>
-					
-					<!-- These queries are currently not self-contained -->
-
-					<!-- TPC-H Query 10 -->
 
-					<!--
-					<execution>
-						<id>TPCHQuery10</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>TPCHQuery10</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.relational.TPCHQuery10</program-class>
-								</manifestEntries>
-							</archive>
-							<includes>
-								<include>**/java/relational/TPCHQuery10.class</include>
-								<include>**/java/relational/TPCHQuery10$*.class</include>
-							</includes>
-						</configuration>
-					</execution> -->
-				
-					<!-- TPC-H Query 3 -->
-					<!--
-					<execution>
-						<id>TPCHQuery3</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>TPCHQuery3</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.relational.TPCHQuery3</program-class>
-								</manifestEntries>
-							</archive>
-							<includes>
-								<include>**/java/relational/TPCHQuery3.class</include>
-								<include>**/java/relational/TPCHQuery3$*.class</include>
-							</includes>
-						</configuration>
-					</execution> -->
-		
 					<!-- WebLogAnalysis -->
 					<execution>
 						<id>WebLogAnalysis</id>
@@ -319,7 +270,28 @@ under the License.
 							</includes>
 						</configuration>
 					</execution>
-					
+
+					<!-- Distributed Copy -->
+					<execution>
+						<id>DistCp</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>DistCp</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.examples.java.distcp.DistCp</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>**/java/distcp/*</include>
+							</includes>
+						</configuration>
+					</execution>
 				</executions>
 			</plugin>
 
@@ -346,6 +318,7 @@ under the License.
 								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WebLogAnalysis.jar" tofile="${project.basedir}/target/WebLogAnalysis.jar" />
 								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
 								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WordCountPOJO.jar" tofile="${project.basedir}/target/WordCountPOJO.jar" />
+								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-DistCp.jar" tofile="${project.basedir}/target/DistCp.jar" />
 							</target>
 						</configuration>
 					</execution>

http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
index 3eae211..08f90a6 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
@@ -19,6 +19,7 @@
 package org.apache.flink.examples.java.distcp;
 
 import org.apache.commons.io.IOUtils;
+
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -34,6 +35,7 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,7 @@ import java.util.Map;
  * However, in a distributed environment HDFS paths must be provided both as input and output.
  */
 public class DistCp {
+	
 	private static final Logger LOGGER = LoggerFactory.getLogger(DistCp.class);
 	public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
 	public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
@@ -61,7 +64,7 @@ public class DistCp {
 	public static void main(String[] args) throws Exception {
 		if (args.length != 3) {
 			printHelp();
-			System.exit(1);
+			return;
 		}
 
 		final Path sourcePath = new Path(args[0]);
@@ -84,8 +87,9 @@ public class DistCp {
 
 
 		FlatMapOperator<FileCopyTask, Object> res = inputTasks.flatMap(new RichFlatMapFunction<FileCopyTask, Object>() {
-			public LongCounter fileCounter;
-			public LongCounter bytesCounter;
+			
+			private LongCounter fileCounter;
+			private LongCounter bytesCounter;
 
 			@Override
 			public void open(Configuration parameters) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
index 3778775..7f38a8b 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
@@ -27,8 +27,11 @@ import java.io.Serializable;
  * A Java POJO that represents a task for copying a single file
  */
 public class FileCopyTask implements Serializable {
-	private Path path;
-	private String relativePath;
+	
+	private static final long serialVersionUID = -8760082278978316032L;
+	
+	private final Path path;
+	private final String relativePath;
 
 	public FileCopyTask(Path path, String relativePath) {
 		if (StringUtils.isEmpty(relativePath)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
index 3ac872a..6137e12 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
@@ -38,7 +38,12 @@ import java.util.Queue;
  * that have finished previously assigned tasks
  */
 public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCopyTaskInputSplit> {
+
+	private static final long serialVersionUID = -644394866425221151L;
+	
 	private static final Logger LOGGER = LoggerFactory.getLogger(FileCopyTaskInputFormat.class);
+	
+
 	private final List<FileCopyTask> tasks;
 
 	public FileCopyTaskInputFormat(List<FileCopyTask> tasks) {
@@ -74,7 +79,8 @@ public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCo
 		FileCopyTaskInputSplit[] splits = new FileCopyTaskInputSplit[tasks.size()];
 		int i = 0;
 		for (FileCopyTask t : tasks) {
-			splits[i++] = new FileCopyTaskInputSplit(t);
+			splits[i] = new FileCopyTaskInputSplit(t, i);
+			i++;
 		}
 		return splits;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
index 8ee5e09..33943b6 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
@@ -24,11 +24,15 @@ import org.apache.flink.core.io.InputSplit;
  * Implementation of {@code InputSplit} for copying files
  */
 public class FileCopyTaskInputSplit implements InputSplit {
-	private int splitNo = 0;
-	private FileCopyTask task;
+	
+	private static final long serialVersionUID = -7621656017747660450L;
+	
+	private final FileCopyTask task;
+	private final int splitNumber;
 
-	public FileCopyTaskInputSplit(FileCopyTask task) {
+	public FileCopyTaskInputSplit(FileCopyTask task, int splitNumber) {
 		this.task = task;
+		this.splitNumber = splitNumber;
 	}
 
 	public FileCopyTask getTask() {
@@ -37,6 +41,6 @@ public class FileCopyTaskInputSplit implements InputSplit {
 
 	@Override
 	public int getSplitNumber() {
-		return splitNo++;
+		return splitNumber;
 	}
 }


[2/7] flink git commit: [FLINK-2701] [streaming] Add getter for wrapped Java environment to Scala ExecutionEnvironment

Posted by se...@apache.org.
[FLINK-2701] [streaming] Add getter for wrapped Java environment to Scala ExecutionEnvironment

This closes #1120


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6dcc38d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6dcc38d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6dcc38d5

Branch: refs/heads/master
Commit: 6dcc38d532e68a08b0ba04d0d58f652e4140a68c
Parents: db49a3f
Author: Alexander Kolb <al...@ottogroup.com>
Authored: Fri Sep 11 11:23:27 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 11:38:03 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/scala/StreamExecutionEnvironment.scala | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6dcc38d5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 0eb3297..5e02ec5 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -509,6 +509,12 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   def getStreamGraph = javaEnv.getStreamGraph
 
   /**
+   * Getter of the wrapped [[org.apache.flink.streaming.api.environment.StreamExecutionEnvironment]]
+   * @return The encased ExecutionEnvironment
+   */
+  def getWrappedStreamExecutionEnvironment = javaEnv
+
+  /**
    * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
    * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]
    */


[3/7] flink git commit: [FLINK-2557] [core] TypeExtractor properly returns MissingTypeInfo

Posted by se...@apache.org.
[FLINK-2557] [core] TypeExtractor properly returns MissingTypeInfo

This closes #1045


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fe9145d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fe9145d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fe9145d

Branch: refs/heads/master
Commit: 3fe9145df37fd353229d1de297cc134f3a553c07
Parents: 6dcc38d
Author: zentol <s....@web.de>
Authored: Sun Aug 23 22:06:34 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 11:40:28 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java |  6 +++---
 .../java/type/extractor/TypeExtractorTest.java  | 20 ++++++++++++++++++++
 2 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fe9145d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 0196b5d..252842e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -704,8 +704,8 @@ public class TypeExtractor {
 			return parameter;
 		}
 		
-		throw new IllegalArgumentException("The types of the interface " + baseClass.getName() + " could not be inferred. " + 
-						"Support for synthetic interfaces, lambdas, and generic types is limited at this point.");
+		throw new InvalidTypesException("The types of the interface " + baseClass.getName() + " could not be inferred. " + 
+						"Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point");
 	}
 	
 	private static Type getParameterTypeFromGenericType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Type t, int pos) {
@@ -755,7 +755,7 @@ public class TypeExtractor {
 		try {
 			inType = getParameterType(baseClass, typeHierarchy, clazz, inputParamPos);
 		}
-		catch (IllegalArgumentException e) {
+		catch (InvalidTypesException e) {
 			return; // skip input validation e.g. for raw types
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe9145d/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index 7f0cf5b..eae767d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
@@ -848,6 +849,25 @@ public class TypeExtractorTest {
 		}
 	}
 
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	@Test
+	public void testFunctionWithMissingGenericsAndReturns() {
+		RichMapFunction function = new RichMapFunction() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Object map(Object value) throws Exception {
+				return null;
+			}
+		};
+
+		TypeInformation info = ExecutionEnvironment.getExecutionEnvironment()
+				.fromElements("arbitrary", "data")
+				.map(function).returns("String").getResultType();
+
+		Assert.assertEquals(TypeInfoParser.parse("String"), info);
+	}
+
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testFunctionDependingOnInputAsSuperclass() {


[5/7] flink git commit: [FLINK-2702] [examples] Add a distributed copy utility example

Posted by se...@apache.org.
[FLINK-2702] [examples] Add a distributed copy utility example

This closes #1090


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9148b66
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9148b66
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9148b66

Branch: refs/heads/master
Commit: b9148b667d24d4d30a0fd877848c1178e4be647a
Parents: 0c5ebc8
Author: Vyacheslav Zholudev <vy...@researchgate.com>
Authored: Thu Sep 3 16:09:18 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 11:49:51 2015 +0200

----------------------------------------------------------------------
 .../flink/examples/java/distcp/DistCp.java      | 178 +++++++++++++++++++
 .../examples/java/distcp/FileCopyTask.java      |  56 ++++++
 .../java/distcp/FileCopyTaskInputFormat.java    | 110 ++++++++++++
 .../java/distcp/FileCopyTaskInputSplit.java     |  42 +++++
 4 files changed, 386 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9148b66/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
new file mode 100644
index 0000000..3eae211
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.FlatMapOperator;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A main class of the Flink distcp utility.
+ * It's a simple reimplementation of Hadoop distcp
+ * (see <a href="http://hadoop.apache.org/docs/r1.2.1/distcp.html">http://hadoop.apache.org/docs/r1.2.1/distcp.html</a>)
+ * with a dynamic input format
+ * Note that this tool does not deal with retriability. Additionally, empty directories are not copied over.
+ * <p/>
+ * When running locally, local file systems paths can be used.
+ * However, in a distributed environment HDFS paths must be provided both as input and output.
+ */
+public class DistCp {
+	private static final Logger LOGGER = LoggerFactory.getLogger(DistCp.class);
+	public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
+	public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
+
+	public static void main(String[] args) throws Exception {
+		if (args.length != 3) {
+			printHelp();
+			System.exit(1);
+		}
+
+		final Path sourcePath = new Path(args[0]);
+		final Path targetPath = new Path(args[1]);
+		int parallelism = Integer.valueOf(args[2], 10);
+
+		// set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		checkInputParams(env, sourcePath, targetPath, parallelism);
+		env.setParallelism(parallelism);
+
+		long startTime = System.currentTimeMillis();
+		LOGGER.info("Initializing copy tasks");
+		List<FileCopyTask> tasks = getCopyTasks(sourcePath);
+		LOGGER.info("Copy task initialization took " + (System.currentTimeMillis() - startTime) + "ms");
+
+		DataSet<FileCopyTask> inputTasks = new DataSource<>(env,
+				new FileCopyTaskInputFormat(tasks),
+				new GenericTypeInfo<>(FileCopyTask.class), "fileCopyTasks");
+
+
+		FlatMapOperator<FileCopyTask, Object> res = inputTasks.flatMap(new RichFlatMapFunction<FileCopyTask, Object>() {
+			public LongCounter fileCounter;
+			public LongCounter bytesCounter;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);
+				fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);
+			}
+
+			@Override
+			public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception {
+				LOGGER.info("Processing task: " + task);
+				Path outPath = new Path(targetPath, task.getRelativePath());
+
+				FileSystem targetFs = targetPath.getFileSystem();
+				// creating parent folders in case of a local FS
+				if (!targetFs.isDistributedFS()) {
+					//dealing with cases like file:///tmp or just /tmp
+					File outFile = outPath.toUri().isAbsolute() ? new File(outPath.toUri()) : new File(outPath.toString());
+					File parentFile = outFile.getParentFile();
+					if (!parentFile.mkdirs() && !parentFile.exists()) {
+						throw new RuntimeException("Cannot create local file system directories: " + parentFile);
+					}
+				}
+				FSDataOutputStream outputStream = null;
+				FSDataInputStream inputStream = null;
+				try {
+					outputStream = targetFs.create(outPath, true);
+					inputStream = task.getPath().getFileSystem().open(task.getPath());
+					int bytes = IOUtils.copy(inputStream, outputStream);
+					bytesCounter.add(bytes);
+				} finally {
+					IOUtils.closeQuietly(inputStream);
+					IOUtils.closeQuietly(outputStream);
+				}
+				fileCounter.add(1l);
+			}
+		});
+
+		// no data sinks are needed, therefore just printing an empty result
+		res.print();
+
+		Map<String, Object> accumulators = env.getLastJobExecutionResult().getAllAccumulatorResults();
+		LOGGER.info("== COUNTERS ==");
+		for (Map.Entry<String, Object> e : accumulators.entrySet()) {
+			LOGGER.info(e.getKey() + ": " + e.getValue());
+		}
+	}
+
+
+	// -----------------------------------------------------------------------------------------
+	// HELPER METHODS
+	// -----------------------------------------------------------------------------------------
+
+	private static void checkInputParams(ExecutionEnvironment env, Path sourcePath, Path targetPath, int parallelism) throws IOException {
+		if (parallelism <= 0) {
+			throw new IllegalArgumentException("Parallelism should be greater than 0");
+		}
+
+		boolean isLocal = env instanceof LocalEnvironment;
+		if (!isLocal &&
+				!(sourcePath.getFileSystem().isDistributedFS() && targetPath.getFileSystem().isDistributedFS())) {
+			throw new IllegalArgumentException("In a distributed mode only HDFS input/output paths are supported");
+		}
+	}
+
+	private static void printHelp() {
+		System.err.println("Usage: <input_path> <output_path> <level_of_parallelism>");
+	}
+
+	private static List<FileCopyTask> getCopyTasks(Path sourcePath) throws IOException {
+		List<FileCopyTask> tasks = new ArrayList<>();
+		getCopyTasks(sourcePath, "", tasks);
+		return tasks;
+	}
+
+	private static void getCopyTasks(Path p, String rel, List<FileCopyTask> tasks) throws IOException {
+		FileStatus[] res = p.getFileSystem().listStatus(p);
+		if (res == null) {
+			return;
+		}
+		for (FileStatus fs : res) {
+			if (fs.isDir()) {
+				getCopyTasks(fs.getPath(), rel + fs.getPath().getName() + "/", tasks);
+			} else {
+				Path cp = fs.getPath();
+				tasks.add(new FileCopyTask(cp, rel + cp.getName()));
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9148b66/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
new file mode 100644
index 0000000..3778775
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * A Java POJO that represents a task for copying a single file
+ */
+public class FileCopyTask implements Serializable {
+	private Path path;
+	private String relativePath;
+
+	public FileCopyTask(Path path, String relativePath) {
+		if (StringUtils.isEmpty(relativePath)) {
+			throw new IllegalArgumentException("Relative path should not be empty for: " + path);
+		}
+		this.path = path;
+		this.relativePath = relativePath;
+	}
+
+	public Path getPath() {
+		return path;
+	}
+
+	public String getRelativePath() {
+		return relativePath;
+	}
+
+	@Override
+	public String toString() {
+		return "FileCopyTask{" +
+				"path=" + path +
+				", relativePath='" + relativePath + '\'' +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9148b66/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
new file mode 100644
index 0000000..3ac872a
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * An implementation of an input format that dynamically assigns {@code FileCopyTask} to the mappers
+ * that have finished previously assigned tasks
+ */
+public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCopyTaskInputSplit> {
+	private static final Logger LOGGER = LoggerFactory.getLogger(FileCopyTaskInputFormat.class);
+	private final List<FileCopyTask> tasks;
+
+	public FileCopyTaskInputFormat(List<FileCopyTask> tasks) {
+		this.tasks = tasks;
+	}
+
+	private class FileCopyTaskAssigner implements InputSplitAssigner {
+		private Queue<FileCopyTaskInputSplit> splits;
+
+		public FileCopyTaskAssigner(FileCopyTaskInputSplit[] inputSplits) {
+			splits = new LinkedList<>(Arrays.asList(inputSplits));
+		}
+
+		@Override
+		public InputSplit getNextInputSplit(String host, int taskId) {
+			LOGGER.info("Getting copy task for task: " + taskId);
+			return splits.poll();
+		}
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+		//no op
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+		return null;
+	}
+
+	@Override
+	public FileCopyTaskInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+		FileCopyTaskInputSplit[] splits = new FileCopyTaskInputSplit[tasks.size()];
+		int i = 0;
+		for (FileCopyTask t : tasks) {
+			splits[i++] = new FileCopyTaskInputSplit(t);
+		}
+		return splits;
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(FileCopyTaskInputSplit[] inputSplits) {
+		return new FileCopyTaskAssigner(inputSplits);
+	}
+
+	private FileCopyTaskInputSplit curInputSplit = null;
+
+	@Override
+	public void open(FileCopyTaskInputSplit split) throws IOException {
+		curInputSplit = split;
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return curInputSplit == null;
+	}
+
+	@Override
+	public FileCopyTask nextRecord(FileCopyTask reuse) throws IOException {
+		FileCopyTask toReturn = curInputSplit.getTask();
+		curInputSplit = null;
+		return toReturn;
+	}
+
+	@Override
+	public void close() throws IOException {
+		//no op
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9148b66/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
new file mode 100644
index 0000000..8ee5e09
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.flink.core.io.InputSplit;
+
+/**
+ * Implementation of {@code InputSplit} for copying files
+ */
+public class FileCopyTaskInputSplit implements InputSplit {
+	private int splitNo = 0;
+	private FileCopyTask task;
+
+	public FileCopyTaskInputSplit(FileCopyTask task) {
+		this.task = task;
+	}
+
+	public FileCopyTask getTask() {
+		return task;
+	}
+
+	@Override
+	public int getSplitNumber() {
+		return splitNo++;
+	}
+}


[4/7] flink git commit: [FLINK-2627] [scala api] Makes scala data set utils easier to access.

Posted by se...@apache.org.
[FLINK-2627] [scala api] Makes scala data set utils easier to access.

The import required is now org.apache.flink.api.scala.utils._
Also adds a method to create case class type information for scala tuples

This closes #1099


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c5ebc8b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c5ebc8b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c5ebc8b

Branch: refs/heads/master
Commit: 0c5ebc8b9361e48c32425f1235a86e5e1bf34976
Parents: 3fe9145
Author: Sachin Goel <sa...@gmail.com>
Authored: Sat Sep 5 18:42:38 2015 +0530
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 11:42:28 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/api/scala/CrossDataSet.scala   |  24 +----
 .../apache/flink/api/scala/DataSetUtils.scala   | 103 ------------------
 .../api/scala/UnfinishedCoGroupOperation.scala  |  29 +----
 .../apache/flink/api/scala/joinDataSet.scala    |  23 +---
 .../org/apache/flink/api/scala/package.scala    |  33 +++++-
 .../apache/flink/api/scala/utils/package.scala  | 108 +++++++++++++++++++
 .../api/scala/StreamCrossOperator.scala         |  25 +----
 .../api/scala/StreamJoinOperator.scala          |  28 +----
 .../flink/streaming/api/scala/package.scala     |   8 +-
 .../api/scala/operators/SampleITCase.scala      |   4 +-
 .../api/scala/util/DataSetUtilsITCase.scala     |   6 +-
 11 files changed, 160 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
index 19818a0..c9e1540 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
@@ -17,14 +17,10 @@
  */
 package org.apache.flink.api.scala
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.{CrossFunction, RichCrossFunction}
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.operators._
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
@@ -111,25 +107,7 @@ private[flink] object CrossDataSet {
         (left, right)
       }
     }
-    val returnType = new CaseClassTypeInfo[(L, R)](
-      classOf[(L, R)],
-      Array(leftInput.getType, rightInput.getType),
-      Seq(leftInput.getType, rightInput.getType),
-      Array("_1", "_2")) {
-
-      override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(L, R)] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer(executionConfig)
-        }
-
-        new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[L], fields(1).asInstanceOf[R])
-          }
-        }
-      }
-    }
+    val returnType = createTuple2TypeInformation[L, R](leftInput.getType(), rightInput.getType())
     val crossOperator = new CrossOperator[L, R, (L, R)](
       leftInput.javaSet,
       rightInput.javaSet,

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
deleted file mode 100644
index 793b201..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSetUtils.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{Utils, utils => jutils}
-
-import _root_.scala.language.implicitConversions
-import _root_.scala.reflect.ClassTag
-
-/**
- * This class provides simple utility methods for zipping elements in a data set with an index
- * or with a unique identifier.
- */
-
-class DataSetUtils[T](val self: DataSet[T]) {
-
-  /**
-   * Method that takes a set of subtask index, total number of elements mappings
-   * and assigns ids to all the elements from the input data set.
-   *
-   * @return a data set of tuple 2 consisting of consecutive ids and initial values.
-   */
-  def zipWithIndex(implicit ti: TypeInformation[(Long, T)],
-                   ct: ClassTag[(Long, T)]): DataSet[(Long, T)] = {
-    wrap(jutils.DataSetUtils.zipWithIndex(self.javaSet))
-      .map { t => (t.f0.toLong, t.f1) }
-  }
-
-  /**
-   * Method that assigns a unique id to all the elements of the input data set.
-   *
-   * @return a data set of tuple 2 consisting of ids and initial values.
-   */
-  def zipWithUniqueId(implicit ti: TypeInformation[(Long, T)],
-                      ct: ClassTag[(Long, T)]): DataSet[(Long, T)] = {
-    wrap(jutils.DataSetUtils.zipWithUniqueId(self.javaSet))
-      .map { t => (t.f0.toLong, t.f1) }
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Sample
-  // --------------------------------------------------------------------------------------------
-  /**
-   * Generate a sample of DataSet by the probability fraction of each element.
-   *
-   * @param withReplacement Whether element can be selected more than once.
-   * @param fraction        Probability that each element is chosen, should be [0,1] without
-   *                        replacement, and [0, ∞) with replacement. While fraction is larger
-   *                        than 1, the elements are expected to be selected multi times into
-   *                        sample on average.
-   * @param seed            Random number generator seed.
-   * @return The sampled DataSet
-   */
-  def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.RNG.nextLong())
-            (implicit ti: TypeInformation[T], ct: ClassTag[T]): DataSet[T] = {
-
-    wrap(jutils.DataSetUtils.sample(self.javaSet, withReplacement, fraction, seed))
-  }
-
-  /**
-   * Generate a sample of DataSet with fixed sample size.
-   * <p>
-   * <strong>NOTE:</strong> Sample with fixed size is not as efficient as sample with fraction,
-   * use sample with fraction unless you need exact precision.
-   * <p/>
-   *
-   * @param withReplacement Whether element can be selected more than once.
-   * @param numSample       The expected sample size.
-   * @param seed            Random number generator seed.
-   * @return The sampled DataSet
-   */
-  def sampleWithSize(withReplacement: Boolean, numSample: Int, seed: Long = Utils.RNG.nextLong())
-                    (implicit ti: TypeInformation[T], ct: ClassTag[T]): DataSet[T] = {
-
-    wrap(jutils.DataSetUtils.sampleWithSize(self.javaSet, withReplacement, numSample, seed))
-  }
-}
-
-object DataSetUtils {
-
-  /**
-   * Tie the new class to an existing Scala API class: DataSet.
-   */
-  implicit def utilsToDataSet[T: TypeInformation: ClassTag](dataSet: DataSet[T]) =
-    new DataSetUtils[T](dataSet)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
index 98c8c31..91f8c85 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
@@ -18,12 +18,10 @@
 
 package org.apache.flink.api.scala
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.util.Collector
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
@@ -63,31 +61,12 @@ class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag](
     // Maybe because ObjectArrayTypeInfo does not accept the Scala Array as an array class.
     val leftArrayType =
       ObjectArrayTypeInfo.getInfoFor(new Array[L](0).getClass, leftInput.getType)
+        .asInstanceOf[TypeInformation[Array[L]]]
     val rightArrayType =
       ObjectArrayTypeInfo.getInfoFor(new Array[R](0).getClass, rightInput.getType)
+        .asInstanceOf[TypeInformation[Array[R]]]
 
-    val returnType = new CaseClassTypeInfo[(Array[L], Array[R])](
-      classOf[(Array[L], Array[R])],
-      Array(leftArrayType, rightArrayType),
-      Seq(leftArrayType, rightArrayType),
-      Array("_1", "_2")) {
-
-      override def createSerializer(
-          executionConfig: ExecutionConfig): TypeSerializer[(Array[L], Array[R])] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer(executionConfig)
-        }
-
-        new CaseClassSerializer[(Array[L], Array[R])](
-          classOf[(Array[L], Array[R])],
-          fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[Array[L]], fields(1).asInstanceOf[Array[R]])
-          }
-        }
-      }
-    }
+    val returnType = createTuple2TypeInformation[Array[L], Array[R]](leftArrayType, rightArrayType)
     val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])](
       leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType,
       null, // partitioner

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index 0ed74f4..ecc1aab 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -17,15 +17,12 @@
  */
 package org.apache.flink.api.scala
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction, Partitioner, RichFlatJoinFunction}
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin.WrappingFlatJoinFunction
 import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
 import org.apache.flink.api.java.operators._
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
@@ -232,25 +229,7 @@ class UnfinishedJoinOperation[L, R](
         out.collect((left, right))
       }
     }
-    val returnType = new CaseClassTypeInfo[(L, R)](
-      classOf[(L, R)],
-      Array(leftSet.getType, rightSet.getType),
-      Seq(leftSet.getType, rightSet.getType),
-      Array("_1", "_2")) {
-
-      override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(L, R)] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer(executionConfig)
-        }
-
-        new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[L], fields(1).asInstanceOf[R])
-          }
-        }
-      }
-    }
+    val returnType = createTuple2TypeInformation[L, R](leftInput.getType(), rightInput.getType())
     val joinOperator = new EquiJoin[L, R, (L, R)](
       leftSet.javaSet, rightSet.javaSet, leftKey, rightKey, joiner, returnType, joinHint,
         getCallLocationName())

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
index a0c0b58..db9c68c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -18,11 +18,14 @@
 
 package org.apache.flink.api
 
-import _root_.scala.reflect.ClassTag
-import language.experimental.macros
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
+import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo, TypeUtils}
+
+import _root_.scala.reflect.ClassTag
+import language.experimental.macros
 
 /**
  * The Flink Scala API. [[org.apache.flink.api.scala.ExecutionEnvironment]] is the starting-point
@@ -70,4 +73,28 @@ package object scala {
     }
     st(depth).toString
   }
+
+  def createTuple2TypeInformation[T1, T2](
+      t1: TypeInformation[T1],
+      t2: TypeInformation[T2])
+    : TypeInformation[(T1, T2)] =
+    new CaseClassTypeInfo[(T1, T2)](
+      classOf[(T1, T2)],
+      Array(t1, t2),
+      Seq(t1, t2),
+      Array("_1", "_2")) {
+
+      override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(T1, T2)] = {
+        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
+        for (i <- 0 until getArity) {
+          fieldSerializers(i) = types(i).createSerializer(executionConfig)
+        }
+
+        new CaseClassSerializer[(T1, T2)](classOf[(T1, T2)], fieldSerializers) {
+          override def createInstance(fields: Array[AnyRef]) = {
+            (fields(0).asInstanceOf[T1], fields(1).asInstanceOf[T2])
+          }
+        }
+      }
+    }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
new file mode 100644
index 0000000..0d0f6e2
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.Utils
+import org.apache.flink.api.java.utils.{DataSetUtils => jutils}
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.reflect.ClassTag
+
+package object utils {
+
+  /**
+   * This class provides simple utility methods for zipping elements in a data set with an index
+   * or with a unique identifier, sampling elements from a data set.
+   *
+   * @param self Data Set
+   */
+  implicit class DataSetUtils[T: TypeInformation : ClassTag](val self: DataSet[T]) {
+
+    /**
+     * Method that takes a set of subtask index, total number of elements mappings
+     * and assigns ids to all the elements from the input data set.
+     *
+     * @return a data set of tuple 2 consisting of consecutive ids and initial values.
+     */
+    def zipWithIndex: DataSet[(Long, T)] = {
+      implicit val typeInfo = createTuple2TypeInformation[Long, T](
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+        implicitly[TypeInformation[T]]
+      )
+      wrap(jutils.zipWithIndex(self.javaSet)).map { t => (t.f0.toLong, t.f1) }
+    }
+
+    /**
+     * Method that assigns a unique id to all the elements of the input data set.
+     *
+     * @return a data set of tuple 2 consisting of ids and initial values.
+     */
+    def zipWithUniqueId: DataSet[(Long, T)] = {
+      implicit val typeInfo = createTuple2TypeInformation[Long, T](
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+        implicitly[TypeInformation[T]]
+      )
+      wrap(jutils.zipWithUniqueId(self.javaSet)).map { t => (t.f0.toLong, t.f1) }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    //  Sample
+    // --------------------------------------------------------------------------------------------
+    /**
+     * Generate a sample of DataSet by the probability fraction of each element.
+     *
+     * @param withReplacement Whether element can be selected more than once.
+     * @param fraction        Probability that each element is chosen, should be [0,1] without
+     *                        replacement, and [0, ∞) with replacement. While fraction is larger
+     *                        than 1, the elements are expected to be selected multi times into
+     *                        sample on average.
+     * @param seed            Random number generator seed.
+     * @return The sampled DataSet
+     */
+    def sample(
+        withReplacement: Boolean,
+        fraction: Double,
+        seed: Long = Utils.RNG.nextLong())
+      : DataSet[T] = {
+      wrap(jutils.sample(self.javaSet, withReplacement, fraction, seed))
+    }
+
+    /**
+     * Generate a sample of DataSet with fixed sample size.
+     * <p>
+     * <strong>NOTE:</strong> Sample with fixed size is not as efficient as sample with fraction,
+     * use sample with fraction unless you need exact precision.
+     * <p/>
+     *
+     * @param withReplacement Whether element can be selected more than once.
+     * @param numSample       The expected sample size.
+     * @param seed            Random number generator seed.
+     * @return The sampled DataSet
+     */
+    def sampleWithSize(
+        withReplacement: Boolean,
+        numSample: Int,
+        seed: Long = Utils.RNG.nextLong())
+      : DataSet[T] = {
+      wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSample, seed))
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index 0e01eee..0060a9f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -20,13 +20,10 @@ package org.apache.flink.streaming.api.scala
 
 import java.util.concurrent.TimeUnit
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.CrossFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
 import org.apache.flink.streaming.api.functions.co.CrossWindowFunction
 import org.apache.flink.streaming.api.operators.co.CoStreamWindow
 
@@ -40,26 +37,8 @@ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extend
     val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
       (l: I1, r: I2) => (l, r))
 
-    val returnType = new CaseClassTypeInfo[(I1, I2)](
-      classOf[(I1, I2)],
-      Array(input1.getType, input2.getType),
-      Seq(input1.getType, input2.getType),
-      Array("_1", "_2")) {
-
-      override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(I1, I2)] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer(executionConfig)
-        }
-
-        new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
-          }
-        }
-      }
-    }
 
+    val returnType = createTuple2TypeInformation[I1, I2](input1.getType, input2.getType)
     val javaStream = input1.connect(input2).addGeneralWindowCombine(
       crossWindowFunction,
       returnType, windowSize,

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index e872851..e2be44f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -20,17 +20,13 @@ package org.apache.flink.streaming.api.scala
 
 import java.util.concurrent.TimeUnit
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.JoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.operators.Keys
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
 import org.apache.flink.streaming.api.functions.co.JoinWindowFunction
-import org.apache.flink.streaming.api.operators.co.CoStreamWindow
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
 
 import scala.Array.canBuildFrom
@@ -155,27 +151,7 @@ object StreamJoinOperator {
 
     private def createJoinOperator(): JavaStream[(I1, I2)] = {
 
-      val returnType = new CaseClassTypeInfo[(I1, I2)](
-        classOf[(I1, I2)],
-        Array(op.input1.getType, op.input2.getType),
-        Seq(op.input1.getType, op.input2.getType),
-        Array("_1", "_2")) {
-
-        override def createSerializer(
-            executionConfig: ExecutionConfig): TypeSerializer[(I1, I2)] = {
-          val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-          for (i <- 0 until getArity) {
-            fieldSerializers(i) = types(i).createSerializer(executionConfig)
-          }
-
-          new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
-            override def createInstance(fields: Array[AnyRef]) = {
-              (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
-            }
-          }
-        }
-      }
-
+      val returnType = createTuple2TypeInformation[I1, I2](op.input1.getType, op.input2.getType)
       op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
         .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
           returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 4c7be5e..2eb4f9e 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api
 import _root_.scala.reflect.ClassTag
 import language.experimental.macros
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreator}
 import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
@@ -28,7 +29,6 @@ import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaS
 import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream }
 import org.apache.flink.streaming.api.datastream.{ GroupedDataStream => GroupedJavaStream }
 import language.implicitConversions
-import org.apache.flink.streaming.api.windowing.StreamWindow
 
 package object scala {
   // We have this here so that we always have generated TypeInformationS when
@@ -72,4 +72,10 @@ package object scala {
           "supported on Case Classes (for now).")
     }
   }
+
+  def createTuple2TypeInformation[T1, T2](
+      t1: TypeInformation[T1],
+      t2: TypeInformation[T2])
+    : TypeInformation[(T1, T2)] =
+    apiTupleCreator[T1, T2](t1, t2)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
index 86b0818..1b62824 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
@@ -21,14 +21,14 @@ import java.util.{List => JavaList, Random}
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.utils._
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
 import org.junit.Assert._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Before, After, Test}
+import org.junit.{After, Before, Test}
 
-import org.apache.flink.api.scala.DataSetUtils.utilsToDataSet
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])

http://git-wip-us.apache.org/repos/asf/flink/blob/0c5ebc8b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
index d973908..7fff8ff 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
@@ -19,11 +19,11 @@
 package org.apache.flink.api.scala.util
 
 import org.apache.flink.api.scala._
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit._
-import org.apache.flink.api.scala.DataSetUtils.utilsToDataSet
 
 @RunWith(classOf[Parameterized])
 class DataSetUtilsITCase (


[7/7] flink git commit: [FLINK-2704] [streaming] Clean up naming of State/Checkpoint Interfaces

Posted by se...@apache.org.
[FLINK-2704] [streaming] Clean up naming of State/Checkpoint Interfaces

This closes #671

The interfaces are used on StreamTask (for now) but were called
*Operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05710948
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05710948
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05710948

Branch: refs/heads/master
Commit: 0571094885da2766dfb52a6fb38020cab7602114
Parents: a74fa8c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 10 15:45:53 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 12:04:18 2015 +0200

----------------------------------------------------------------------
 .../tasks/CheckpointNotificationOperator.java   | 35 ------------
 .../jobgraph/tasks/CheckpointedOperator.java    | 38 -------------
 .../jobgraph/tasks/OperatorStateCarrier.java    | 38 -------------
 .../runtime/jobgraph/tasks/StatefulTask.java    | 57 ++++++++++++++++++++
 .../apache/flink/runtime/state/StateUtils.java  |  6 +--
 .../apache/flink/runtime/taskmanager/Task.java  | 22 ++++----
 .../runtime/taskmanager/TaskAsyncCallTest.java  | 13 +++--
 .../streaming/runtime/tasks/StreamTask.java     |  7 +--
 8 files changed, 81 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
deleted file mode 100644
index 0eb9e07..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-/**
- * This interface needs to be implemented by runtime tasks that want to be able to receive
- * notifications about completed checkpoints.
- */
-public interface CheckpointNotificationOperator {
-
-	/**
-	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
-	 * the notification from all participating tasks.
-	 * 
-	 * @param checkpointId The ID of the checkpoint that is complete..
-	 * @throws Exception The notification method may forward its exceptions.
-	 */
-	void notifyCheckpointComplete(long checkpointId) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
deleted file mode 100644
index 60f70dc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-/**
- * This interface must be implemented by invokable operators (subclasses
- * of {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable} that
- * participate in state checkpoints.
- */
-public interface CheckpointedOperator {
-
-	/**
-	 * This method is either called directly and asynchronously by the checkpoint
-	 * coordinator (in the case of functions that are directly notified - usually
-	 * the data sources), or called synchronously when all incoming channels have
-	 * reported a checkpoint barrier.
-	 * 
-	 * @param checkpointId The ID of the checkpoint, incrementing.
-	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
-	 */
-	void triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
deleted file mode 100644
index 5045ca4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-import org.apache.flink.runtime.state.StateHandle;
-
-/**
- * This interface must be implemented by any invokable that has recoverable state.
- * The method {@link #setInitialState(org.apache.flink.runtime.state.StateHandle)} is used
- * to set the initial state of the operator, upon recovery.
- */
-public interface OperatorStateCarrier<T extends StateHandle<?>> {
-
-	/**
-	 * Sets the initial state of the operator, upon recovery. The initial state is typically
-	 * a snapshot of the state from a previous execution.
-	 * 
-	 * @param stateHandle The handle to the state.
-	 */
-	void setInitialState(T stateHandle) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
new file mode 100644
index 0000000..894e6d9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.runtime.state.StateHandle;
+
+/**
+ * This interface must be implemented by any invokable that has recoverable state and participates
+ * in checkpointing.
+ */
+public interface StatefulTask<T extends StateHandle<?>> {
+
+	/**
+	 * Sets the initial state of the operator, upon recovery. The initial state is typically
+	 * a snapshot of the state from a previous execution.
+	 * 
+	 * @param stateHandle The handle to the state.
+	 */
+	void setInitialState(T stateHandle) throws Exception;
+
+	/**
+	 * This method is either called directly and asynchronously by the checkpoint
+	 * coordinator (in the case of functions that are directly notified - usually
+	 * the data sources), or called synchronously when all incoming channels have
+	 * reported a checkpoint barrier.
+	 *
+	 * @param checkpointId The ID of the checkpoint, incrementing.
+	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+	 */
+	void triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
+
+
+	/**
+	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
+	 * the notification from all participating tasks.
+	 *
+	 * @param checkpointId The ID of the checkpoint that is complete..
+	 * @throws Exception The notification method may forward its exceptions.
+	 */
+	void notifyCheckpointComplete(long checkpointId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
index 30daec1..88b0d18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
 /**
  * A collection of utility methods for dealing with operator state.
@@ -41,10 +41,10 @@ public class StateUtils {
 	 * @param <T>
 	 *            Type bound for the
 	 */
-	public static <T extends StateHandle<?>> void setOperatorState(OperatorStateCarrier<?> op,
+	public static <T extends StateHandle<?>> void setOperatorState(StatefulTask<?> op,
 			StateHandle<?> state) throws Exception {
 		@SuppressWarnings("unchecked")
-		OperatorStateCarrier<T> typedOp = (OperatorStateCarrier<T>) op;
+		StatefulTask<T> typedOp = (StatefulTask<T>) op;
 		@SuppressWarnings("unchecked")
 		T typedHandle = (T) state;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 0d1dd13..7eff45a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -46,9 +46,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
-import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
@@ -534,10 +532,10 @@ public class Task implements Runnable {
 			SerializedValue<StateHandle<?>> operatorState = this.operatorState;
 
 			if (operatorState != null) {
-				if (invokable instanceof OperatorStateCarrier) {
+				if (invokable instanceof StatefulTask) {
 					try {
 						StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
-						OperatorStateCarrier<?> op = (OperatorStateCarrier<?>) invokable;
+						StatefulTask<?> op = (StatefulTask<?>) invokable;
 						StateUtils.setOperatorState(op, state);
 					}
 					catch (Exception e) {
@@ -869,7 +867,7 @@ public class Task implements Runnable {
 
 	/**
 	 * Calls the invokable to trigger a checkpoint, if the invokable implements the interface
-	 * {@link org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator}.
+	 * {@link org.apache.flink.runtime.jobgraph.tasks.StatefulTask}.
 	 * 
 	 * @param checkpointID The ID identifying the checkpoint.
 	 * @param checkpointTimestamp The timestamp associated with the checkpoint.   
@@ -878,10 +876,10 @@ public class Task implements Runnable {
 		AbstractInvokable invokable = this.invokable;
 
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
-			if (invokable instanceof CheckpointedOperator) {
+			if (invokable instanceof StatefulTask) {
 
 				// build a local closure 
-				final CheckpointedOperator checkpointer = (CheckpointedOperator) invokable;
+				final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
 				final Logger logger = LOG;
 				final String taskName = taskNameWithSubtask;
 
@@ -889,7 +887,7 @@ public class Task implements Runnable {
 					@Override
 					public void run() {
 						try {
-							checkpointer.triggerCheckpoint(checkpointID, checkpointTimestamp);
+							statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
 						}
 						catch (Throwable t) {
 							failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName, t));
@@ -912,10 +910,10 @@ public class Task implements Runnable {
 		AbstractInvokable invokable = this.invokable;
 
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
-			if (invokable instanceof CheckpointNotificationOperator) {
+			if (invokable instanceof StatefulTask) {
 
 				// build a local closure 
-				final CheckpointNotificationOperator checkpointer = (CheckpointNotificationOperator) invokable;
+				final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
 				final Logger logger = LOG;
 				final String taskName = taskNameWithSubtask;
 
@@ -923,7 +921,7 @@ public class Task implements Runnable {
 					@Override
 					public void run() {
 						try {
-							checkpointer.notifyCheckpointComplete(checkpointID);
+							statefulTask.notifyCheckpointComplete(checkpointID);
 						}
 						catch (Throwable t) {
 							// fail task if checkpoint confirmation failed.

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 7857123..c0fe750 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -37,15 +37,16 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNo
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 
+import org.apache.flink.runtime.state.StateHandle;
 import org.junit.Before;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
@@ -168,8 +169,7 @@ public class TaskAsyncCallTest {
 				new TaskManagerRuntimeInfo("localhost", new Configuration()));
 	}
 	
-	public static class CheckpointsInOrderInvokable extends AbstractInvokable
-			implements CheckpointedOperator, CheckpointNotificationOperator {
+	public static class CheckpointsInOrderInvokable extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {
 
 		private volatile long lastCheckpointId = 0;
 		
@@ -196,6 +196,11 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
+		public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
+
+		}
+
+		@Override
 		public void triggerCheckpoint(long checkpointId, long timestamp) {
 			lastCheckpointId++;
 			if (checkpointId == lastCheckpointId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d5bdce2..d357a4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -33,9 +33,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
-import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.FileStateHandle;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
@@ -74,8 +72,7 @@ import org.slf4j.LoggerFactory;
  * @param <OUT>
  * @param <O>
  */
-public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements
-		OperatorStateCarrier<StateHandle<Serializable>>, CheckpointedOperator, CheckpointNotificationOperator {
+public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);