You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/11 16:19:15 UTC

[01/13] flink git commit: [hotfix] [tests] Add a lightweight test for classloading in the Kryo Serializer

Repository: flink
Updated Branches:
  refs/heads/master a4b901a45 -> fca8caea7


[hotfix] [tests] Add a lightweight test for classloading in the Kryo Serializer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc279638
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc279638
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc279638

Branch: refs/heads/master
Commit: bc2796383af0beac989341b4e34f6418baa7106b
Parents: 70c48aa
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 10 12:37:28 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 12:45:51 2017 +0200

----------------------------------------------------------------------
 .../kryo/KryoSerializerClassLoadingTest.java    | 113 +++++++++++++++++++
 1 file changed, 113 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc279638/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerClassLoadingTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerClassLoadingTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerClassLoadingTest.java
new file mode 100644
index 0000000..9823e11
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerClassLoadingTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.fail;
+
+/**
+ * This test validates that the Kryo-based serializer handles classes with custom
+ * class loaders correctly.
+ */
+public class KryoSerializerClassLoadingTest extends SerializerTestBase<Object> {
+
+	/** Class loader for the object that is not in the test class path */
+	private static final ClassLoader CLASS_LOADER =
+			new URLClassLoader(new URL[0], KryoSerializerClassLoadingTest.class.getClassLoader());
+
+	/** An object that is not in the test class path */
+	private static final Serializable OBJECT_OUT_OF_CLASSPATH =
+			CommonTestUtils.createObjectForClassNotInClassPath(CLASS_LOADER);
+
+	// ------------------------------------------------------------------------
+
+	private ClassLoader originalClassLoader;
+
+	@Before
+	public void setupClassLoader() {
+		originalClassLoader = Thread.currentThread().getContextClassLoader();
+		Thread.currentThread().setContextClassLoader(CLASS_LOADER);
+	}
+
+	@After
+	public void restoreOriginalClassLoader() {
+		Thread.currentThread().setContextClassLoader(originalClassLoader);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void guardTestAssumptions() {
+		try {
+			Class.forName(OBJECT_OUT_OF_CLASSPATH.getClass().getName());
+			fail("This test's assumptions are broken");
+		}
+		catch (ClassNotFoundException ignored) {
+			// expected
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected TypeSerializer<Object> createSerializer() {
+		return new KryoSerializer<>(Object.class, new ExecutionConfig());
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<Object> getTypeClass() {
+		return Object.class;
+	}
+
+	@Override
+	protected Object[] getTestData() {
+		return new Object[] {
+				new Integer(7),
+
+				// an object whose class is not on the classpath
+				OBJECT_OUT_OF_CLASSPATH,
+
+				// an object whose class IS on the classpath with a nested object whose class
+				// is NOT on the classpath
+				new Tuple1<>(OBJECT_OUT_OF_CLASSPATH)
+		};
+	}
+
+	@Override
+	public void testInstantiate() {
+		// this serializer does not support instantiation
+	}
+}


[07/13] flink git commit: [FLINK-6508] [build] Include licenses of bundled/shaded dependencies where required

Posted by se...@apache.org.
[FLINK-6508] [build] Include licenses of bundled/shaded dependencies where required


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ff59319
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ff59319
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ff59319

Branch: refs/heads/master
Commit: 2ff5931982111f37dd51895b7110c6074cb53276
Parents: aafa579
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 10 16:42:15 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 12:45:52 2017 +0200

----------------------------------------------------------------------
 flink-core/packaged_licenses/LICENSE.asm.txt    | 31 +++++++++++++++++++
 flink-java/packaged_licenses/LICENSE.asm.txt    | 31 +++++++++++++++++++
 .../packaged_licenses/LICENSE.janino.txt        | 31 +++++++++++++++++++
 .../packaged_licenses/LICENSE.reflections.txt   | 14 +++++++++
 flink-runtime/packaged_licenses/LICENSE.asm.txt | 31 +++++++++++++++++++
 flink-scala/packaged_licenses/LICENSE.asm.txt   | 31 +++++++++++++++++++
 pom.xml                                         | 32 +++++++++++++++++++-
 7 files changed, 200 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ff59319/flink-core/packaged_licenses/LICENSE.asm.txt
----------------------------------------------------------------------
diff --git a/flink-core/packaged_licenses/LICENSE.asm.txt b/flink-core/packaged_licenses/LICENSE.asm.txt
new file mode 100644
index 0000000..62ffbcc
--- /dev/null
+++ b/flink-core/packaged_licenses/LICENSE.asm.txt
@@ -0,0 +1,31 @@
+ASM: a very small and fast Java bytecode manipulation framework
+
+Copyright (c) 2000-2011 INRIA, France Telecom
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+1. Redistributions of source code must retain the above copyright
+   notice, this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright
+   notice, this list of conditions and the following disclaimer in the
+   documentation and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holders nor the names of its
+   contributors may be used to endorse or promote products derived from
+   this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff59319/flink-java/packaged_licenses/LICENSE.asm.txt
----------------------------------------------------------------------
diff --git a/flink-java/packaged_licenses/LICENSE.asm.txt b/flink-java/packaged_licenses/LICENSE.asm.txt
new file mode 100644
index 0000000..62ffbcc
--- /dev/null
+++ b/flink-java/packaged_licenses/LICENSE.asm.txt
@@ -0,0 +1,31 @@
+ASM: a very small and fast Java bytecode manipulation framework
+
+Copyright (c) 2000-2011 INRIA, France Telecom
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+1. Redistributions of source code must retain the above copyright
+   notice, this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright
+   notice, this list of conditions and the following disclaimer in the
+   documentation and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holders nor the names of its
+   contributors may be used to endorse or promote products derived from
+   this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff59319/flink-libraries/flink-table/packaged_licenses/LICENSE.janino.txt
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/packaged_licenses/LICENSE.janino.txt b/flink-libraries/flink-table/packaged_licenses/LICENSE.janino.txt
new file mode 100644
index 0000000..ef871e2
--- /dev/null
+++ b/flink-libraries/flink-table/packaged_licenses/LICENSE.janino.txt
@@ -0,0 +1,31 @@
+Janino - An embedded Java[TM] compiler
+
+Copyright (c) 2001-2016, Arno Unkrig
+Copyright (c) 2015-2016  TIBCO Software Inc.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+   1. Redistributions of source code must retain the above copyright
+      notice, this list of conditions and the following disclaimer.
+   2. Redistributions in binary form must reproduce the above
+      copyright notice, this list of conditions and the following
+      disclaimer in the documentation and/or other materials
+      provided with the distribution.
+   3. Neither the name of JANINO nor the names of its contributors
+      may be used to endorse or promote products derived from this
+      software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
+IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
+IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff59319/flink-libraries/flink-table/packaged_licenses/LICENSE.reflections.txt
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/packaged_licenses/LICENSE.reflections.txt b/flink-libraries/flink-table/packaged_licenses/LICENSE.reflections.txt
new file mode 100644
index 0000000..236d111
--- /dev/null
+++ b/flink-libraries/flink-table/packaged_licenses/LICENSE.reflections.txt
@@ -0,0 +1,14 @@
+            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
+                    Version 2, December 2004
+
+ Copyright (C) 2004 Sam Hocevar <sa...@hocevar.net>
+
+ Everyone is permitted to copy and distribute verbatim or modified
+ copies of this license document, and changing it is allowed as long
+ as the name is changed.
+
+            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
+   TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
+
+  0. You just DO WHAT THE FUCK YOU WANT TO.
+  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff59319/flink-runtime/packaged_licenses/LICENSE.asm.txt
----------------------------------------------------------------------
diff --git a/flink-runtime/packaged_licenses/LICENSE.asm.txt b/flink-runtime/packaged_licenses/LICENSE.asm.txt
new file mode 100644
index 0000000..62ffbcc
--- /dev/null
+++ b/flink-runtime/packaged_licenses/LICENSE.asm.txt
@@ -0,0 +1,31 @@
+ASM: a very small and fast Java bytecode manipulation framework
+
+Copyright (c) 2000-2011 INRIA, France Telecom
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+1. Redistributions of source code must retain the above copyright
+   notice, this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright
+   notice, this list of conditions and the following disclaimer in the
+   documentation and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holders nor the names of its
+   contributors may be used to endorse or promote products derived from
+   this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff59319/flink-scala/packaged_licenses/LICENSE.asm.txt
----------------------------------------------------------------------
diff --git a/flink-scala/packaged_licenses/LICENSE.asm.txt b/flink-scala/packaged_licenses/LICENSE.asm.txt
new file mode 100644
index 0000000..62ffbcc
--- /dev/null
+++ b/flink-scala/packaged_licenses/LICENSE.asm.txt
@@ -0,0 +1,31 @@
+ASM: a very small and fast Java bytecode manipulation framework
+
+Copyright (c) 2000-2011 INRIA, France Telecom
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+1. Redistributions of source code must retain the above copyright
+   notice, this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright
+   notice, this list of conditions and the following disclaimer in the
+   documentation and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holders nor the names of its
+   contributors may be used to endorse or promote products derived from
+   this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff59319/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4e0bf52..4cc3895 100644
--- a/pom.xml
+++ b/pom.xml
@@ -912,6 +912,9 @@ under the License.
 						<exclude>**/resources/**/bootstrap*</exclude>
 						<exclude>flink-clients/src/main/resources/web-docs/js/*d3.js</exclude>
 
+						<!-- the licenses that are re-bundled -->
+						<exclude>**/packaged_licenses/LICENSE.*.txt</exclude>
+
 						<!-- web dashboard config JSON files -->
 						<exclude>flink-runtime-web/web-dashboard/package.json</exclude>
 						<exclude>flink-runtime-web/web-dashboard/bower.json</exclude>
@@ -1115,6 +1118,34 @@ under the License.
 				</executions>
 			</plugin>
 
+			<!-- make sure all licenses that need to be repackaged are in the target folder -->
+			<plugin>
+				<artifactId>maven-resources-plugin</artifactId>
+				<configuration>
+					<encoding>UTF-8</encoding>
+				</configuration>
+				<executions>
+					<execution>
+						<id>copy-packaged-licenses</id>
+						<phase>prepare-package</phase>
+						<goals>
+							<goal>copy-resources</goal>
+						</goals>
+						<configuration>
+							<outputDirectory>${basedir}/target/classes/META-INF/license/</outputDirectory>
+							<resources>
+								<resource>
+									<directory>${basedir}/packaged_licenses</directory>
+									<includes>
+										<include>LICENSE.*.txt</include>
+									</includes>
+								</resource>
+							</resources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
 			<!-- We use shading in all packages for relocating some classes, such as
 				Guava and ASM.
 				By doing so, users adding Flink as a dependency won't run into conflicts.
@@ -1185,7 +1216,6 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
-
 		</plugins>
 
 		<!-- Plugin configurations for plugins activated in sub-projects --> 


[04/13] flink git commit: [FLINK-6515] [runtime] Fix classloading of JavaSerializer

Posted by se...@apache.org.
[FLINK-6515] [runtime] Fix classloading of JavaSerializer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f8022e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f8022e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f8022e3

Branch: refs/heads/master
Commit: 6f8022e35e0a49d5dfffa0ab7fd1c964b1c1bf0d
Parents: 609bfa1
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 10 11:20:07 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 12:45:51 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  10 +-
 .../flink/runtime/state/JavaSerializer.java     |  17 ++-
 .../flink/runtime/state/JavaSerializerTest.java | 116 +++++++++++++++++
 .../flink/core/testutils/CommonTestUtils.java   | 124 +++++++++++++++++++
 4 files changed, 257 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f8022e3/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index b7ccace..8abdc3c 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -162,7 +162,7 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
-		<!-- core dependencies -->
+		<!-- test dependencies -->
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -172,6 +172,14 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.curator</groupId>
 			<artifactId>curator-test</artifactId>
 			<version>${curator.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/6f8022e3/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
index d49b1d2..5252b3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -24,12 +24,12 @@ import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
 import java.io.Serializable;
 
-@SuppressWarnings("serial")
 @Internal
 final class JavaSerializer<T extends Serializable> extends TypeSerializerSingleton<T> {
 
@@ -47,11 +47,10 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet
 
 	@Override
 	public T copy(T from) {
-
 		try {
-			return InstantiationUtil.clone(from);
+			return InstantiationUtil.clone(from, Thread.currentThread().getContextClassLoader());
 		} catch (IOException | ClassNotFoundException e) {
-			throw new RuntimeException("Could not copy instance of " + from + '.', e);
+			throw new FlinkRuntimeException("Could not copy element via serialization: " + from, e);
 		}
 	}
 
@@ -62,7 +61,7 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet
 
 	@Override
 	public int getLength() {
-		return 0;
+		return -1;
 	}
 
 	@Override
@@ -74,7 +73,8 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet
 	public T deserialize(DataInputView source) throws IOException {
 		try {
 			return InstantiationUtil.deserializeObject(
-					new DataInputViewStream(source), Thread.currentThread().getContextClassLoader());
+					new DataInputViewStream(source),
+					Thread.currentThread().getContextClassLoader());
 		} catch (ClassNotFoundException e) {
 			throw new IOException("Could not deserialize object.", e);
 		}
@@ -87,9 +87,8 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		int size = source.readInt();
-		target.writeInt(size);
-		target.write(source, size);
+		T tmp = deserialize(source);
+		serialize(tmp, target);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6f8022e3/flink-runtime/src/test/java/org/apache/flink/runtime/state/JavaSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/JavaSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/JavaSerializerTest.java
new file mode 100644
index 0000000..de6fbce
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/JavaSerializerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.Serializable;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.*;
+
+/**
+ * A test that verifies that the {@link JavaSerializer} properly handles class loading. 
+ */
+public class JavaSerializerTest extends SerializerTestBase<Serializable> {
+
+	/** Class loader for the object that is not in the test class path */
+	private static final ClassLoader CLASS_LOADER = 
+			new URLClassLoader(new URL[0], JavaSerializerTest.class.getClassLoader());
+
+	/** An object that is not in the test class path */
+	private static final Serializable OBJECT_OUT_OF_CLASSPATH = 
+			CommonTestUtils.createObjectForClassNotInClassPath(CLASS_LOADER);
+
+	// ------------------------------------------------------------------------
+
+	private ClassLoader originalClassLoader;
+
+	@Before
+	public void setupClassLoader() {
+		originalClassLoader = Thread.currentThread().getContextClassLoader();
+		Thread.currentThread().setContextClassLoader(CLASS_LOADER);
+	}
+
+	@After
+	public void restoreOriginalClassLoader() {
+		Thread.currentThread().setContextClassLoader(originalClassLoader);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void guardTest() {
+		// make sure that this test's assumptions hold
+		try {
+			Class.forName(OBJECT_OUT_OF_CLASSPATH.getClass().getName());
+			fail("Test ineffective: The test class that should not be on the classpath is actually on the classpath.");
+		} catch (ClassNotFoundException e) {
+			// expected
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected TypeSerializer<Serializable> createSerializer() {
+		Thread.currentThread().setContextClassLoader(CLASS_LOADER);
+		return new JavaSerializer<>();
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<Serializable> getTypeClass() {
+		return Serializable.class;
+	}
+
+	@Override
+	protected Serializable[] getTestData() {
+		return new Serializable[] {
+				new Integer(42),
+				new File("/some/path/that/I/made/up"),
+
+				// an object that is not in the classpath
+				OBJECT_OUT_OF_CLASSPATH,
+
+				// an object that is in the classpath with a nested object not in the classpath
+				new Tuple1<>(OBJECT_OUT_OF_CLASSPATH)
+		};
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void testInstantiate() {
+		// this serializer does not support instantiation
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6f8022e3/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 639b065..cf2bb7f 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -29,7 +29,12 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.lang.reflect.Field;
+import java.security.CodeSource;
+import java.security.Permissions;
+import java.security.ProtectionDomain;
+import java.security.cert.Certificate;
 import java.util.Map;
 
 import static org.junit.Assert.fail;
@@ -178,4 +183,123 @@ public class CommonTestUtils {
 			throw new RuntimeException(e1);
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  Testing of objects not in the application class loader
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new class that is not part of the classpath that the current JVM uses, and
+	 * instantiates it.
+	 *
+	 * <p>This method uses {@link #createClassNotInClassPath(ClassLoader)} to define the new class.
+	 *
+	 * @param targetClassLoader The class loader to attach the class to
+	 * @return The object instantiated from the newly defined class.
+	 */
+	public static Serializable createObjectForClassNotInClassPath(ClassLoader targetClassLoader) {
+		try {
+			Class<? extends Serializable> clazz = createClassNotInClassPath(targetClassLoader);
+			return clazz.newInstance();
+		}
+		catch (Exception e) {
+			throw new AssertionError("test setup broken", e);
+		}
+	}
+
+	/**
+	 * Creates a new class that is not part of the classpath that the current JVM uses.
+	 * The class is ad-hoc defined and attached to the given ClassLoader.
+	 *
+	 * @param targetClassLoader The class loader to attach the class to
+	 * @return The newly defined class
+	 */
+	public static Class<? extends Serializable> createClassNotInClassPath(ClassLoader targetClassLoader) {
+		final byte[] classData = {-54, -2, -70, -66, 0, 0, 0, 51, 0, 65, 10, 0, 15, 0, 43, 7, 0, 44,
+				10, 0, 2, 0, 43, 10, 0, 2, 0, 45, 9, 0, 7, 0, 46, 10, 0, 15, 0, 47, 7, 0, 48, 7, 0,
+				49, 10, 0, 8, 0, 43, 8, 0, 50, 10, 0, 8, 0, 51, 10, 0, 8, 0, 52, 10, 0, 8, 0, 53, 10,
+				0, 8, 0, 54, 7, 0, 55, 7, 0, 56, 1, 0, 16, 115, 101, 114, 105, 97, 108, 86, 101, 114,
+				115, 105, 111, 110, 85, 73, 68, 1, 0, 1, 74, 1, 0, 13, 67, 111, 110, 115, 116, 97, 110,
+				116, 86, 97, 108, 117, 101, 5, -1, -1, -1, -1, -1, -1, -1, -3, 1, 0, 6, 114, 97, 110,
+				100, 111, 109, 1, 0, 6, 60, 105, 110, 105, 116, 62, 1, 0, 3, 40, 41, 86, 1, 0, 4, 67,
+				111, 100, 101, 1, 0, 15, 76, 105, 110, 101, 78, 117, 109, 98, 101, 114, 84, 97, 98, 108,
+				101, 1, 0, 18, 76, 111, 99, 97, 108, 86, 97, 114, 105, 97, 98, 108, 101, 84, 97, 98,
+				108, 101, 1, 0, 4, 116, 104, 105, 115, 1, 0, 35, 76, 111, 114, 103, 47, 97, 112, 97, 99,
+				104, 101, 47, 102, 108, 105, 110, 107, 47, 84, 101, 115, 116, 83, 101, 114, 105, 97, 108,
+				105, 122, 97, 98, 108, 101, 59, 1, 0, 6, 101, 113, 117, 97, 108, 115, 1, 0, 21, 40, 76,
+				106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 79, 98, 106, 101, 99, 116, 59, 41, 90, 1, 0,
+				1, 111, 1, 0, 18, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 79, 98, 106, 101, 99,
+				116, 59, 1, 0, 4, 116, 104, 97, 116, 1, 0, 13, 83, 116, 97, 99, 107, 77, 97, 112, 84, 97,
+				98, 108, 101, 7, 0, 48, 1, 0, 8, 104, 97, 115, 104, 67, 111, 100, 101, 1, 0, 3, 40, 41,
+				73, 1, 0, 8, 116, 111, 83, 116, 114, 105, 110, 103, 1, 0, 20, 40, 41, 76, 106, 97, 118, 97,
+				47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, 103, 59, 1, 0, 10, 83, 111, 117, 114,
+				99, 101, 70, 105, 108, 101, 1, 0, 21, 84, 101, 115, 116, 83, 101, 114, 105, 97, 108, 105,
+				122, 97, 98, 108, 101, 46, 106, 97, 118, 97, 12, 0, 23, 0, 24, 1, 0, 16, 106, 97, 118, 97,
+				47, 117, 116, 105, 108, 47, 82, 97, 110, 100, 111, 109, 12, 0, 57, 0, 58, 12, 0, 22, 0, 18,
+				12, 0, 59, 0, 60, 1, 0, 33, 111, 114, 103, 47, 97, 112, 97, 99, 104, 101, 47, 102, 108, 105,
+				110, 107, 47, 84, 101, 115, 116, 83, 101, 114, 105, 97, 108, 105, 122, 97, 98, 108, 101, 1,
+				0, 23, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, 103, 66, 117,
+				105, 108, 100, 101, 114, 1, 0, 24, 84, 101, 115, 116, 83, 101, 114, 105, 97, 108, 105, 122,
+				97, 98, 108, 101, 123, 114, 97, 110, 100, 111, 109, 61, 12, 0, 61, 0, 62, 12, 0, 61, 0, 63,
+				12, 0, 61, 0, 64, 12, 0, 39, 0, 40, 1, 0, 16, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47,
+				79, 98, 106, 101, 99, 116, 1, 0, 20, 106, 97, 118, 97, 47, 105, 111, 47, 83, 101, 114, 105,
+				97, 108, 105, 122, 97, 98, 108, 101, 1, 0, 8, 110, 101, 120, 116, 76, 111, 110, 103, 1, 0,
+				3, 40, 41, 74, 1, 0, 8, 103, 101, 116, 67, 108, 97, 115, 115, 1, 0, 19, 40, 41, 76, 106, 97,
+				118, 97, 47, 108, 97, 110, 103, 47, 67, 108, 97, 115, 115, 59, 1, 0, 6, 97, 112, 112, 101,
+				110, 100, 1, 0, 45, 40, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105,
+				110, 103, 59, 41, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110,
+				103, 66, 117, 105, 108, 100, 101, 114, 59, 1, 0, 28, 40, 74, 41, 76, 106, 97, 118, 97, 47,
+				108, 97, 110, 103, 47, 83, 116, 114, 105, 110, 103, 66, 117, 105, 108, 100, 101, 114, 59, 1,
+				0, 28, 40, 67, 41, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110,
+				103, 66, 117, 105, 108, 100, 101, 114, 59, 0, 33, 0, 7, 0, 15, 0, 1, 0, 16, 0, 2, 0, 26, 0,
+				17, 0, 18, 0, 1, 0, 19, 0, 0, 0, 2, 0, 20, 0, 18, 0, 22, 0, 18, 0, 0, 0, 4, 0, 1, 0, 23, 0,
+				24, 0, 1, 0, 25, 0, 0, 0, 69, 0, 3, 0, 1, 0, 0, 0, 19, 42, -73, 0, 1, 42, -69, 0, 2, 89, -73,
+				0, 3, -74, 0, 4, -75, 0, 5, -79, 0, 0, 0, 2, 0, 26, 0, 0, 0, 14, 0, 3, 0, 0, 0, 30, 0, 4, 0,
+				31, 0, 18, 0, 32, 0, 27, 0, 0, 0, 12, 0, 1, 0, 0, 0, 19, 0, 28, 0, 29, 0, 0, 0, 1, 0, 30, 0,
+				31, 0, 1, 0, 25, 0, 0, 0, -116, 0, 4, 0, 3, 0, 0, 0, 47, 42, 43, -90, 0, 5, 4, -84, 43, -58,
+				0, 14, 42, -74, 0, 6, 43, -74, 0, 6, -91, 0, 5, 3, -84, 43, -64, 0, 7, 77, 42, -76, 0, 5, 44,
+				-76, 0, 5, -108, -102, 0, 7, 4, -89, 0, 4, 3, -84, 0, 0, 0, 3, 0, 26, 0, 0, 0, 18, 0, 4, 0, 0,
+				0, 36, 0, 7, 0, 37, 0, 24, 0, 39, 0, 29, 0, 40, 0, 27, 0, 0, 0, 32, 0, 3, 0, 0, 0, 47, 0, 28,
+				0, 29, 0, 0, 0, 0, 0, 47, 0, 32, 0, 33, 0, 1, 0, 29, 0, 18, 0, 34, 0, 29, 0, 2, 0, 35, 0, 0,
+				0, 13, 0, 5, 7, 14, 1, -4, 0, 20, 7, 0, 36, 64, 1, 0, 1, 0, 37, 0, 38, 0, 1, 0, 25, 0, 0, 0,
+				56, 0, 5, 0, 1, 0, 0, 0, 14, 42, -76, 0, 5, 42, -76, 0, 5, 16, 32, 125, -125, -120, -84, 0, 0,
+				0, 2, 0, 26, 0, 0, 0, 6, 0, 1, 0, 0, 0, 46, 0, 27, 0, 0, 0, 12, 0, 1, 0, 0, 0, 14, 0, 28, 0,
+				29, 0, 0, 0, 1, 0, 39, 0, 40, 0, 1, 0, 25, 0, 0, 0, 70, 0, 3, 0, 1, 0, 0, 0, 28, -69, 0, 8,
+				89, -73, 0, 9, 18, 10, -74, 0, 11, 42, -76, 0, 5, -74, 0, 12, 16, 125, -74, 0, 13, -74, 0, 14,
+				-80, 0, 0, 0, 2, 0, 26, 0, 0, 0, 6, 0, 1, 0, 0, 0, 51, 0, 27, 0, 0, 0, 12, 0, 1, 0, 0, 0, 28,
+				0, 28, 0, 29, 0, 0, 0, 1, 0, 41, 0, 0, 0, 2, 0, 42,};
+
+		try {
+			// define a class into the classloader
+			Class<?> clazz = getUnsafe().defineClass(
+					"org.apache.flink.TestSerializable",
+					classData, 0, classData.length,
+					targetClassLoader,
+					new ProtectionDomain(new CodeSource(null, (Certificate[]) null), new Permissions()));
+
+			return clazz.asSubclass(Serializable.class);
+		}
+		catch (Exception e) {
+			throw new AssertionError("test setup broken", e);
+		}
+	}
+
+	@SuppressWarnings("restriction")
+	private static sun.misc.Unsafe getUnsafe() {
+		try {
+			Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+			unsafeField.setAccessible(true);
+			return (sun.misc.Unsafe) unsafeField.get(null);
+		} catch (SecurityException e) {
+			throw new RuntimeException("Could not access the sun.misc.Unsafe handle, permission denied by security manager.", e);
+		} catch (NoSuchFieldException e) {
+			throw new RuntimeException("The static handle field in sun.misc.Unsafe was not found.");
+		} catch (IllegalArgumentException e) {
+			throw new RuntimeException("Bug: Illegal argument reflection access for static field.", e);
+		} catch (IllegalAccessException e) {
+			throw new RuntimeException("Access to sun.misc.Unsafe is forbidden by the runtime.", e);
+		} catch (Throwable t) {
+			throw new RuntimeException("Unclassified error while trying to access the sun.misc.Unsafe handle.", t);
+		}
+	}
 }


[10/13] flink git commit: [FLINK-6414] [build] Use scala.binary.version in place of change-scala-version.sh

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-libraries/flink-gelly-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml
index 009781f..68ad050 100644
--- a/flink-libraries/flink-gelly-examples/pom.xml
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -27,7 +27,7 @@
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-gelly-examples_2.10</artifactId>
+	<artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
 	<name>flink-gelly-examples</name>
 	<packaging>jar</packaging>
 
@@ -42,22 +42,22 @@
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-gelly_2.10</artifactId>
+			<artifactId>flink-gelly_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-gelly-scala_2.10</artifactId>
+			<artifactId>flink-gelly-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -94,14 +94,14 @@
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
+			<artifactId>flink-tests_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-libraries/flink-gelly-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/pom.xml b/flink-libraries/flink-gelly-scala/pom.xml
index 256fc53..35b2188 100644
--- a/flink-libraries/flink-gelly-scala/pom.xml
+++ b/flink-libraries/flink-gelly-scala/pom.xml
@@ -28,7 +28,7 @@ under the License.
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>flink-gelly-scala_2.10</artifactId>
+    <artifactId>flink-gelly-scala_${scala.binary.version}</artifactId>
     <name>flink-gelly-scala</name>
 
     <packaging>jar</packaging>
@@ -38,21 +38,21 @@ under the License.
         <!-- core dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala_2.10</artifactId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients_2.10</artifactId>
+            <artifactId>flink-clients_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-gelly_2.10</artifactId>
+            <artifactId>flink-gelly_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
         </dependency>
 
@@ -87,7 +87,7 @@ under the License.
         
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-tests_2.10</artifactId>
+            <artifactId>flink-tests_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
             <type>test-jar</type>
@@ -95,7 +95,7 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_2.10</artifactId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-libraries/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/pom.xml b/flink-libraries/flink-gelly/pom.xml
index d656bb3..d620a66 100644
--- a/flink-libraries/flink-gelly/pom.xml
+++ b/flink-libraries/flink-gelly/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-gelly_2.10</artifactId>
+	<artifactId>flink-gelly_${scala.binary.version}</artifactId>
 	<name>flink-gelly</name>
 
 	<packaging>jar</packaging>
@@ -47,7 +47,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -62,7 +62,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
@@ -77,7 +77,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-optimizer_2.10</artifactId>
+			<artifactId>flink-optimizer_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-libraries/flink-ml/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml
index 8aa84d2..0f203b9 100644
--- a/flink-libraries/flink-ml/pom.xml
+++ b/flink-libraries/flink-ml/pom.xml
@@ -28,7 +28,7 @@
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-ml_2.10</artifactId>
+	<artifactId>flink-ml_${scala.binary.version}</artifactId>
 	<name>flink-ml</name>
 
 	<packaging>jar</packaging>
@@ -39,7 +39,7 @@
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -86,14 +86,14 @@
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-libraries/flink-python/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/pom.xml b/flink-libraries/flink-python/pom.xml
index 9d2f0c3..5b6405e 100644
--- a/flink-libraries/flink-python/pom.xml
+++ b/flink-libraries/flink-python/pom.xml
@@ -27,7 +27,7 @@ under the License.
         <relativePath>..</relativePath>
     </parent>
 
-    <artifactId>flink-python_2.10</artifactId>
+    <artifactId>flink-python_${scala.binary.version}</artifactId>
     <name>flink-python</name>
     <packaging>jar</packaging>
 
@@ -69,7 +69,7 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_2.10</artifactId>
+            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
 			<scope>provided</scope>
         </dependency>
@@ -78,7 +78,7 @@ under the License.
         
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_2.10</artifactId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 8fa2ed2..a34fa1b 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -27,7 +27,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-table_2.10</artifactId>
+	<artifactId>flink-table_${scala.binary.version}</artifactId>
 	<name>flink-table</name>
 
 	<packaging>jar</packaging>
@@ -38,7 +38,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_2.10</artifactId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -114,14 +114,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
+			<artifactId>flink-tests_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -137,20 +137,20 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-statebackend-rocksdb_2.10</artifactId>
+			<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
index d03144f..a1ef900 100644
--- a/flink-mesos/pom.xml
+++ b/flink-mesos/pom.xml
@@ -27,7 +27,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 	
-	<artifactId>flink-mesos_2.10</artifactId>
+	<artifactId>flink-mesos_${scala.binary.version}</artifactId>
 	<name>flink-mesos</name>
 	<packaging>jar</packaging>
 
@@ -38,13 +38,13 @@ under the License.
     <dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -130,7 +130,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -138,7 +138,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-metrics/flink-metrics-datadog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/pom.xml b/flink-metrics/flink-metrics-datadog/pom.xml
index 0d473fc..656f308 100644
--- a/flink-metrics/flink-metrics-datadog/pom.xml
+++ b/flink-metrics/flink-metrics-datadog/pom.xml
@@ -63,7 +63,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-metrics/flink-metrics-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/pom.xml b/flink-metrics/flink-metrics-dropwizard/pom.xml
index 2ba12e6..631d68e 100644
--- a/flink-metrics/flink-metrics-dropwizard/pom.xml
+++ b/flink-metrics/flink-metrics-dropwizard/pom.xml
@@ -58,7 +58,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-metrics/flink-metrics-jmx/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/pom.xml b/flink-metrics/flink-metrics-jmx/pom.xml
index d738fc4..1e8ecc8 100644
--- a/flink-metrics/flink-metrics-jmx/pom.xml
+++ b/flink-metrics/flink-metrics-jmx/pom.xml
@@ -49,7 +49,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -65,7 +65,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-metrics/flink-metrics-statsd/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/pom.xml b/flink-metrics/flink-metrics-statsd/pom.xml
index 80cfa32..5b77f8d 100644
--- a/flink-metrics/flink-metrics-statsd/pom.xml
+++ b/flink-metrics/flink-metrics-statsd/pom.xml
@@ -51,7 +51,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-optimizer/pom.xml
----------------------------------------------------------------------
diff --git a/flink-optimizer/pom.xml b/flink-optimizer/pom.xml
index c347082..499487b 100644
--- a/flink-optimizer/pom.xml
+++ b/flink-optimizer/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-optimizer_2.10</artifactId>
+	<artifactId>flink-optimizer_${scala.binary.version}</artifactId>
 	<name>flink-optimizer</name>
 
 	<packaging>jar</packaging>
@@ -46,7 +46,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index 2fa1835..9991d2c 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -82,12 +82,12 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${flink.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${flink.version}</version>
 		</dependency>
 
@@ -124,13 +124,13 @@ under the License.
 				</dependency>
 				<dependency>
 					<groupId>org.apache.flink</groupId>
-					<artifactId>flink-streaming-java_2.10</artifactId>
+					<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 					<version>${flink.version}</version>
 					<scope>provided</scope>
 				</dependency>
 				<dependency>
 					<groupId>org.apache.flink</groupId>
-					<artifactId>flink-clients_2.10</artifactId>
+					<artifactId>flink-clients_${scala.binary.version}</artifactId>
 					<version>${flink.version}</version>
 					<scope>provided</scope>
 				</dependency>
@@ -201,20 +201,20 @@ under the License.
 									<exclude>org.apache.flink:flink-shaded-curator-recipes</exclude>
 									<exclude>org.apache.flink:flink-core</exclude>
 									<exclude>org.apache.flink:flink-java</exclude>
-									<exclude>org.apache.flink:flink-scala_2.10</exclude>
-									<exclude>org.apache.flink:flink-runtime_2.10</exclude>
-									<exclude>org.apache.flink:flink-optimizer_2.10</exclude>
-									<exclude>org.apache.flink:flink-clients_2.10</exclude>
-									<exclude>org.apache.flink:flink-avro_2.10</exclude>
-									<exclude>org.apache.flink:flink-examples-batch_2.10</exclude>
-									<exclude>org.apache.flink:flink-examples-streaming_2.10</exclude>
-									<exclude>org.apache.flink:flink-streaming-java_2.10</exclude>
-									<exclude>org.apache.flink:flink-streaming-scala_2.10</exclude>
-									<exclude>org.apache.flink:flink-scala-shell_2.10</exclude>
+									<exclude>org.apache.flink:flink-scala_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-runtime_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-optimizer_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-clients_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-avro_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-examples-batch_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-examples-streaming_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-streaming-java_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-streaming-scala_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-scala-shell_${scala.binary.version}</exclude>
 									<exclude>org.apache.flink:flink-python</exclude>
 									<exclude>org.apache.flink:flink-metrics-core</exclude>
 									<exclude>org.apache.flink:flink-metrics-jmx</exclude>
-									<exclude>org.apache.flink:flink-statebackend-rocksdb_2.10</exclude>
+									<exclude>org.apache.flink:flink-statebackend-rocksdb_${scala.binary.version}</exclude>
 
 									<!-- Also exclude very big transitive dependencies of Flink
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index 189bbce..2139c6b 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -78,17 +78,17 @@ under the License.
 		<!-- Apache Flink dependencies -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
 			<version>${flink.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_2.10</artifactId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 			<version>${flink.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${flink.version}</version>
 		</dependency>
 		
@@ -116,19 +116,19 @@ under the License.
 			<dependencies>
 				<dependency>
 					<groupId>org.apache.flink</groupId>
-					<artifactId>flink-scala_2.10</artifactId>
+					<artifactId>flink-scala_${scala.binary.version}</artifactId>
 					<version>${flink.version}</version>
 					<scope>provided</scope>
 				</dependency>
 				<dependency>
 					<groupId>org.apache.flink</groupId>
-					<artifactId>flink-streaming-scala_2.10</artifactId>
+					<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 					<version>${flink.version}</version>
 					<scope>provided</scope>
 				</dependency>
 				<dependency>
 					<groupId>org.apache.flink</groupId>
-					<artifactId>flink-clients_2.10</artifactId>
+					<artifactId>flink-clients_${scala.binary.version}</artifactId>
 					<version>${flink.version}</version>
 					<scope>provided</scope>
 				</dependency>
@@ -202,20 +202,20 @@ under the License.
 									<exclude>org.apache.flink:flink-shaded-curator-recipes</exclude>
 									<exclude>org.apache.flink:flink-core</exclude>
 									<exclude>org.apache.flink:flink-java</exclude>
-									<exclude>org.apache.flink:flink-scala_2.10</exclude>
-									<exclude>org.apache.flink:flink-runtime_2.10</exclude>
-									<exclude>org.apache.flink:flink-optimizer_2.10</exclude>
-									<exclude>org.apache.flink:flink-clients_2.10</exclude>
-									<exclude>org.apache.flink:flink-avro_2.10</exclude>
-									<exclude>org.apache.flink:flink-examples-batch_2.10</exclude>
-									<exclude>org.apache.flink:flink-examples-streaming_2.10</exclude>
-									<exclude>org.apache.flink:flink-streaming-java_2.10</exclude>
-									<exclude>org.apache.flink:flink-streaming-scala_2.10</exclude>
-									<exclude>org.apache.flink:flink-scala-shell_2.10</exclude>
+									<exclude>org.apache.flink:flink-scala_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-runtime_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-optimizer_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-clients_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-avro_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-examples-batch_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-examples-streaming_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-streaming-java_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-streaming-scala_${scala.binary.version}</exclude>
+									<exclude>org.apache.flink:flink-scala-shell_${scala.binary.version}</exclude>
 									<exclude>org.apache.flink:flink-python</exclude>
 									<exclude>org.apache.flink:flink-metrics-core</exclude>
 									<exclude>org.apache.flink:flink-metrics-jmx</exclude>
-									<exclude>org.apache.flink:flink-statebackend-rocksdb_2.10</exclude>
+									<exclude>org.apache.flink:flink-statebackend-rocksdb_${scala.binary.version}</exclude>
 
 									<!-- Also exclude very big transitive dependencies of Flink
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index e7870f3..8a4671b 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-runtime-web_2.10</artifactId>
+	<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
 	<name>flink-runtime-web</name>
 
 	<packaging>jar</packaging>
@@ -42,12 +42,12 @@ under the License.
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -99,7 +99,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 8abdc3c..a13a985 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-runtime_2.10</artifactId>
+	<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 	<name>flink-runtime</name>
 
 	<packaging>jar</packaging>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-scala-shell/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml
index 0aadfc6..c9acbec 100644
--- a/flink-scala-shell/pom.xml
+++ b/flink-scala-shell/pom.xml
@@ -27,7 +27,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-scala-shell_2.10</artifactId>
+	<artifactId>flink-scala-shell_${scala.binary.version}</artifactId>
 	<name>flink-scala-shell</name>
 
 	<packaging>jar</packaging>
@@ -44,19 +44,19 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_2.10</artifactId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -82,7 +82,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
@@ -214,14 +214,14 @@ under the License.
 			<id>scala-2.10</id>
 			<activation>
 				<property>
-					<!-- this is the default scala profile -->
+					<!-- only required for Scala 2.10 -->
 					<name>!scala-2.11</name>
 				</property>
 			</activation>
 			<dependencies>
 				<dependency>
 					<groupId>org.scalamacros</groupId>
-					<artifactId>quasiquotes_${scala.binary.version}</artifactId>
+					<artifactId>quasiquotes_2.10</artifactId>
 					<version>${scala.macros.version}</version>
 				</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 7be1e29..7fad487 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -28,7 +28,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-scala_2.10</artifactId>
+	<artifactId>flink-scala_${scala.binary.version}</artifactId>
 	<name>flink-scala</name>
 	<packaging>jar</packaging>
 
@@ -82,7 +82,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -241,14 +241,14 @@ under the License.
 			<id>scala-2.10</id>
 			<activation>
 				<property>
-					<!-- this is the default scala profile -->
+					<!-- only required for Scala 2.10 -->
 					<name>!scala-2.11</name>
 				</property>
 			</activation>
 			<dependencies>
 				<dependency>
 					<groupId>org.scalamacros</groupId>
-					<artifactId>quasiquotes_${scala.binary.version}</artifactId>
+					<artifactId>quasiquotes_2.10</artifactId>
 					<version>${scala.macros.version}</version>
 				</dependency>
 			</dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-streaming-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index ea987af..46142de 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-streaming-java_2.10</artifactId>
+	<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 	<name>flink-streaming-java</name>
 
 	<packaging>jar</packaging>
@@ -46,13 +46,13 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -85,7 +85,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index 4ec12bd..586dca7 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -28,7 +28,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-streaming-scala_2.10</artifactId>
+	<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 	<name>flink-streaming-scala</name>
 	<packaging>jar</packaging>
 
@@ -38,13 +38,13 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -85,14 +85,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -100,7 +100,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
+			<artifactId>flink-tests_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -108,7 +108,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-test-utils-parent/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml
index 36bdceb..4f53794 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-test-utils_2.10</artifactId>
+	<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 	<name>flink-test-utils</name>
 
 	<packaging>jar</packaging>
@@ -45,21 +45,21 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>compile</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>compile</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>compile</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index b67edbb..3c0b184 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-tests_2.10</artifactId>
+	<artifactId>flink-tests_${scala.binary.version}</artifactId>
 	<name>flink-tests</name>
 
 	<packaging>jar</packaging>
@@ -54,28 +54,28 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-optimizer_2.10</artifactId>
+			<artifactId>flink-optimizer_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime-web_2.10</artifactId>
+			<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime-web_2.10</artifactId>
+			<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -83,7 +83,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
@@ -97,14 +97,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -112,21 +112,21 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-examples-batch_2.10</artifactId>
+			<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
@@ -141,14 +141,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-compatibility_2.10</artifactId>
+			<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-compatibility_2.10</artifactId>
+			<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -156,7 +156,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro_2.10</artifactId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -164,14 +164,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro_2.10</artifactId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-optimizer_2.10</artifactId>
+			<artifactId>flink-optimizer_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -179,7 +179,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -224,7 +224,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-statebackend-rocksdb_2.10</artifactId>
+			<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index f296e8d..626d886 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -33,7 +33,7 @@ under the License.
 	We need the YARN fat jar build by flink-dist for the tests.
 	-->
 	
-	<artifactId>flink-yarn-tests_2.10</artifactId>
+	<artifactId>flink-yarn-tests_${scala.binary.version}</artifactId>
 	<name>flink-yarn-tests</name>
 	<packaging>jar</packaging>
 
@@ -43,14 +43,14 @@ under the License.
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -59,21 +59,21 @@ under the License.
 		<!-- Needed for the streaming wordcount example -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-yarn_2.10</artifactId>
+			<artifactId>flink-yarn_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-yarn_2.10</artifactId>
+			<artifactId>flink-yarn_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 5946094..e97fee5 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -27,7 +27,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 	
-	<artifactId>flink-yarn_2.10</artifactId>
+	<artifactId>flink-yarn_${scala.binary.version}</artifactId>
 	<name>flink-yarn</name>
 	<packaging>jar</packaging>
 
@@ -37,7 +37,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<exclusions>
 				<exclusion>
@@ -49,7 +49,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -61,7 +61,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -98,7 +98,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/tools/change-scala-version.sh
----------------------------------------------------------------------
diff --git a/tools/change-scala-version.sh b/tools/change-scala-version.sh
deleted file mode 100755
index 56c48c6..0000000
--- a/tools/change-scala-version.sh
+++ /dev/null
@@ -1,117 +0,0 @@
-#!/usr/bin/env bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# This shell script is from Apache Spark with some modification.
-
-set -e
-
-VALID_VERSIONS=( 2.10 2.11 )
-
-usage() {
-  echo "Usage: $(basename $0) [-h|--help] <scala version to be used>
-where :
-  -h| --help Display this help text
-  valid scala version values : ${VALID_VERSIONS[*]}
-" 1>&2
-  exit 1
-}
-
-if [[ ($# -ne 1) || ( $1 == "--help") ||  $1 == "-h" ]]; then
-  usage
-fi
-
-TO_VERSION=$1
-
-check_scala_version() {
-  for i in ${VALID_VERSIONS[*]}; do [ $i = "$1" ] && return 0; done
-  echo "Invalid Scala version: $1. Valid versions: ${VALID_VERSIONS[*]}" 1>&2
-  exit 1
-}
-
-check_scala_version "$TO_VERSION"
-
-if [ $TO_VERSION = "2.11" ]; then
-  FROM_SUFFIX="_2\.10"
-  TO_SUFFIX="_2\.11"
-else
-  FROM_SUFFIX="_2\.11"
-  TO_SUFFIX="_2\.10"
-fi
-
-sed_i() {
-  sed -e "$1" "$2" > "$2.tmp" && mv "$2.tmp" "$2"
-}
-
-export -f sed_i
-
-echo "sed_i 's/\(artifactId>flink.*'$FROM_SUFFIX'\)<\/artifactId>/\1'$TO_SUFFIX'<\/artifactId>/g' {}";
-
-BASEDIR=$(dirname $0)/..
-find "$BASEDIR" -name 'pom.xml' -not -path '*target*' -print \
-  -exec bash -c "sed_i 's/\(artifactId>flink.*\)'$FROM_SUFFIX'<\/artifactId>/\1'$TO_SUFFIX'<\/artifactId>/g' {}" \;
-
-# fix for examples
-find "$BASEDIR/flink-examples/flink-examples-batch" -name 'pom.xml' -not -path '*target*' -print \
-  -exec bash -c "sed_i 's/\(<copy file=\".*flink-examples-batch\)'$FROM_SUFFIX'/\1'$TO_SUFFIX'/g' {}" \;
-
-find "$BASEDIR/flink-examples/flink-examples-streaming" -name 'pom.xml' -not -path '*target*' -print \
-  -exec bash -c "sed_i 's/\(<copy file=\".*flink-examples-streaming\)'$FROM_SUFFIX'/\1'$TO_SUFFIX'/g' {}" \;
-
-# fix for quickstart
-find "$BASEDIR/flink-quickstart" -name 'pom.xml' -not -path '*target*' -print \
-  -exec bash -c "sed_i 's/\(<exclude>org\.apache\.flink:flink-.*\)'$FROM_SUFFIX'<\/exclude>/\1'$TO_SUFFIX'<\/exclude>/g' {}" \;
-
-# fix for flink-dist (bin.xml)
-find "$BASEDIR/flink-dist" -name 'bin.xml' -not -path '*target*' -print \
-  -exec bash -c "sed_i 's/\(<source>.*flink-dist\)'$FROM_SUFFIX'/\1'$TO_SUFFIX'/g' {}" \;
-find "$BASEDIR/flink-dist" -name 'bin.xml' -not -path '*target*' -print \
-  -exec bash -c "sed_i 's/\(<include>org\.apache\.flink:flink-.*\)'$FROM_SUFFIX'<\/include>/\1'$TO_SUFFIX'<\/include>/g' {}" \;
-
-# fix for flink-dist (opt.xml)
-find "$BASEDIR/flink-dist" -name 'opt.xml' -not -path '*target*' -print \
-  -exec bash -c "sed_i 's/\(<source>.*flink-.*\)'$FROM_SUFFIX'/\1'$TO_SUFFIX'/g' {}" \;
-find "$BASEDIR/flink-dist" -name 'opt.xml' -not -path '*target*' -print \
-  -exec bash -c "sed_i 's/\(<destName>flink-.*\)'$FROM_SUFFIX'/\1'$TO_SUFFIX'/g' {}" \;
-
-# fix for shading curator with Scala 2.11
-find "$BASEDIR/flink-runtime" -name 'pom.xml' -not -path '*target*' -print \
-     -exec bash -c "sed_i 's/\(<include>org\.apache\.flink:flink-shaded-curator.*\)'$FROM_SUFFIX'<\/include>/\1'$TO_SUFFIX'<\/include>/g' {}" \;
-
-if [ "$TO_VERSION" == "2.11" ]; then
-  # set the profile activation to !scala-2.11 in parent pom, so that it activates by default
-  bash -c "sed_i 's/<name>scala-2.11<\/name>/<name>!scala-2.11<\/name>/g' $BASEDIR/pom.xml" \;
-  # set the profile activation in all sub modules to scala-2.11 (so that they are disabled by default)
-  find $BASEDIR/flink-* -name 'pom.xml' -not -path '*target*' -print \
-    -exec bash -c "sed_i 's/<name>!scala-2.11<\/name>/<name>scala-2.11<\/name>/g' {}" \;
-
-  # set the name of the shading artifact properly
-  bash -c "sed_i 's/\(shading-artifact.name>flink-shaded[a-z0-9\-]*\)'$FROM_SUFFIX'<\/shading-artifact.name>/\1'$TO_SUFFIX'<\/shading-artifact.name>/g' $BASEDIR/pom.xml" \;
-fi
-
-if [ "$TO_VERSION" == "2.10" ]; then
-  # do the opposite as above
-  bash -c "sed_i 's/<name>!scala-2.11<\/name>/<name>scala-2.11<\/name>/g' $BASEDIR/pom.xml" \;
-  # also for the other files
-  find $BASEDIR/flink-* -name 'pom.xml' -not -path '*target*' -print \
-    -exec bash -c "sed_i 's/<name>scala-2.11<\/name>/<name>!scala-2.11<\/name>/g' {}" \;
-
-  # unset shading artifact name
-  bash -c "sed_i 's/\(shading-artifact.name>flink-shaded[a-z0-9\-]*\)'$FROM_SUFFIX'<\/shading-artifact.name>/\1'$TO_SUFFIX'<\/shading-artifact.name>/g' $BASEDIR/pom.xml" \;
-fi
-

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/tools/create_release_files.sh
----------------------------------------------------------------------
diff --git a/tools/create_release_files.sh b/tools/create_release_files.sh
index f4c4673..40d038c 100755
--- a/tools/create_release_files.sh
+++ b/tools/create_release_files.sh
@@ -216,10 +216,9 @@ make_binary_release() {
 
   # make distribution
   cd "${dir_name}"
-  ./tools/change-scala-version.sh ${SCALA_VERSION}
 
   # enable release profile here (to check for the maven version)
-  $MVN clean package $FLAGS -DskipTests -Prelease -Dgpg.skip
+  $MVN clean package $FLAGS -DskipTests -Prelease,scala-${SCALA_VERSION} -Dgpg.skip
 
   cd flink-dist/target/flink-*-bin/
   tar czf "${dir_name}.tgz" flink-*
@@ -243,15 +242,12 @@ deploy_to_maven() {
   cp ../../deploysettings.xml .
   
   echo "Deploying Scala 2.11 version"
-  cd tools && ./change-scala-version.sh 2.11 && cd ..
-
-  $MVN clean deploy -Prelease,docs-and-source --settings deploysettings.xml -DskipTests -Dgpg.executable=$GPG -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10
+  $MVN clean deploy -Prelease,docs-and-source,scala-2.11 --settings deploysettings.xml -DskipTests -Dgpg.executable=$GPG -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10
   
   # It is important to first deploy scala 2.11 and then scala 2.10 so that the quickstarts (which are independent of the scala version)
   # are depending on scala 2.10.
   echo "Deploying Scala 2.10 version"
-  cd tools && ./change-scala-version.sh 2.10 && cd ..
-  $MVN clean deploy -Dgpg.executable=$GPG -Prelease,docs-and-source --settings deploysettings.xml -DskipTests -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10
+  $MVN clean deploy -Prelease,docs-and-source,scala-2.10 --settings deploysettings.xml -DskipTests -Dgpg.executable=$GPG -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10
 }
 
 copy_data() {

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/tools/deploy_to_maven.sh
----------------------------------------------------------------------
diff --git a/tools/deploy_to_maven.sh b/tools/deploy_to_maven.sh
index 16d529e..32acc5a 100755
--- a/tools/deploy_to_maven.sh
+++ b/tools/deploy_to_maven.sh
@@ -89,18 +89,14 @@ if [[ $CURRENT_FLINK_VERSION == *SNAPSHOT* ]] ; then
 
     # hadoop2 scala 2.10
     echo "deploy standard version (hadoop2) for scala 2.10"
-    mvn ${MVN_SNAPSHOT_OPTS}
+    mvn ${MVN_SNAPSHOT_OPTS} -Pscala-2.10
     deploy_to_s3 $CURRENT_FLINK_VERSION "hadoop2"
 
     # hadoop2 scala 2.11
     echo "deploy hadoop2 version (standard) for scala 2.11"
-    ./tools/change-scala-version.sh 2.11
-    mvn ${MVN_SNAPSHOT_OPTS}
+    mvn ${MVN_SNAPSHOT_OPTS} -Pscala-2.11
     deploy_to_s3 $CURRENT_FLINK_VERSION "hadoop2_2.11"
 
-    echo "Changing back to scala 2.10"
-    ./tools/change-scala-version.sh 2.10
-
     exit 0
 else
     exit 1


[02/13] flink git commit: [hotfix] [build] Move JSON dependency in flink-storm into proper section

Posted by se...@apache.org.
[hotfix] [build] Move JSON dependency in flink-storm into proper section


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/faee90e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/faee90e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/faee90e7

Branch: refs/heads/master
Commit: faee90e716826e7cafbac19aedd00037bfaae3cb
Parents: a4b901a
Author: Stephan Ewen <se...@apache.org>
Authored: Mon May 8 20:58:01 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 12:45:51 2017 +0200

----------------------------------------------------------------------
 flink-contrib/flink-storm/pom.xml | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/faee90e7/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml
index ec34ba9..5189b3b 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -164,6 +164,12 @@ under the License.
 			</exclusions>
 		</dependency>
 
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>1.1</version>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>
@@ -173,11 +179,6 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
-		<dependency>
-			<groupId>com.googlecode.json-simple</groupId>
-			<artifactId>json-simple</artifactId>
-			<version>1.1</version>
-		</dependency>
 
 	</dependencies>
 


[05/13] flink git commit: [FLINK-6501] [build] Add NOTICE transformers to shading

Posted by se...@apache.org.
[FLINK-6501] [build] Add NOTICE transformers to shading

This makes sure that transitive NOTICE files are added to the shaded JAR files.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/609bfa1d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/609bfa1d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/609bfa1d

Branch: refs/heads/master
Commit: 609bfa1db66e98e82792b6140748f14d10b79209
Parents: faee90e
Author: Stephan Ewen <se...@apache.org>
Authored: Mon May 8 20:58:26 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 12:45:51 2017 +0200

----------------------------------------------------------------------
 pom.xml | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/609bfa1d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 10677a4..4e0bf52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1173,6 +1173,14 @@ under the License.
 									<shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern>
 								</relocation>
 							</relocations>
+							<transformers>
+								<!-- The service transformer is needed to merge META-INF/services files -->
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+								<!-- The ApacheNoticeResourceTransformer collects and aggregates NOTICE files -->
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+								<!-- The ApacheLicenseResourceTransformer prevents duplicate Apache Licenses -->
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+							</transformers>
 						</configuration>
 					</execution>
 				</executions>


[11/13] flink git commit: [FLINK-6414] [build] Use scala.binary.version in place of change-scala-version.sh

Posted by se...@apache.org.
[FLINK-6414] [build] Use scala.binary.version in place of change-scala-version.sh

Use scala.binary.version as defined in the parent POM and remove the
script to swap scala version identifiers.

This closes #3800


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35c08712
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35c08712
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35c08712

Branch: refs/heads/master
Commit: 35c087129e2a27c2db47c5ed5ce3fb3523a7c719
Parents: 2ff5931
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Apr 27 14:43:18 2017 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 14:11:07 2017 +0200

----------------------------------------------------------------------
 flink-clients/pom.xml                           |   6 +-
 flink-connectors/flink-avro/pom.xml             |   6 +-
 .../flink-connector-cassandra/pom.xml           |  12 +-
 .../flink-connector-elasticsearch-base/pom.xml  |  10 +-
 .../flink-connector-elasticsearch/pom.xml       |  12 +-
 .../flink-connector-elasticsearch2/pom.xml      |  12 +-
 .../flink-connector-elasticsearch5/pom.xml      |  12 +-
 .../flink-connector-filesystem/pom.xml          |  14 +--
 .../flink-connector-kafka-0.10/pom.xml          |  20 ++--
 .../flink-connector-kafka-0.8/pom.xml           |  18 +--
 .../flink-connector-kafka-0.9/pom.xml           |  18 +--
 .../flink-connector-kafka-base/pom.xml          |  16 +--
 .../flink-connector-kinesis/pom.xml             |  12 +-
 flink-connectors/flink-connector-nifi/pom.xml   |  10 +-
 .../flink-connector-rabbitmq/pom.xml            |   8 +-
 .../flink-connector-twitter/pom.xml             |   4 +-
 .../flink-hadoop-compatibility/pom.xml          |   6 +-
 flink-connectors/flink-hbase/pom.xml            |  18 +--
 flink-connectors/flink-hcatalog/pom.xml         |   2 +-
 flink-connectors/flink-jdbc/pom.xml             |   4 +-
 flink-contrib/flink-connector-wikiedits/pom.xml |   4 +-
 .../flink-statebackend-rocksdb/pom.xml          |  10 +-
 flink-contrib/flink-storm-examples/pom.xml      |  24 ++--
 flink-contrib/flink-storm/pom.xml               |   6 +-
 flink-contrib/flink-streaming-contrib/pom.xml   |  10 +-
 flink-contrib/flink-tweet-inputformat/pom.xml   |   6 +-
 flink-dist/pom.xml                              |  42 +++----
 flink-dist/src/main/assemblies/bin.xml          |   4 +-
 flink-dist/src/main/assemblies/opt.xml          |  24 ++--
 flink-examples/flink-examples-batch/pom.xml     |  22 ++--
 flink-examples/flink-examples-streaming/pom.xml |  36 +++---
 flink-examples/flink-examples-table/pom.xml     |  10 +-
 flink-examples/pom.xml                          |   4 +-
 flink-fs-tests/pom.xml                          |  12 +-
 flink-java8/pom.xml                             |  12 +-
 flink-libraries/flink-cep-scala/pom.xml         |  14 +--
 flink-libraries/flink-cep/pom.xml               |  12 +-
 flink-libraries/flink-gelly-examples/pom.xml    |  14 +--
 flink-libraries/flink-gelly-scala/pom.xml       |  12 +-
 flink-libraries/flink-gelly/pom.xml             |   8 +-
 flink-libraries/flink-ml/pom.xml                |   8 +-
 flink-libraries/flink-python/pom.xml            |   6 +-
 flink-libraries/flink-table/pom.xml             |  14 +--
 flink-mesos/pom.xml                             |  10 +-
 flink-metrics/flink-metrics-datadog/pom.xml     |   2 +-
 flink-metrics/flink-metrics-dropwizard/pom.xml  |   2 +-
 flink-metrics/flink-metrics-jmx/pom.xml         |   4 +-
 flink-metrics/flink-metrics-statsd/pom.xml      |   2 +-
 flink-optimizer/pom.xml                         |   4 +-
 .../main/resources/archetype-resources/pom.xml  |  30 ++---
 .../main/resources/archetype-resources/pom.xml  |  34 +++---
 flink-runtime-web/pom.xml                       |   8 +-
 flink-runtime/pom.xml                           |   2 +-
 flink-scala-shell/pom.xml                       |  14 +--
 flink-scala/pom.xml                             |   8 +-
 flink-streaming-java/pom.xml                    |   8 +-
 flink-streaming-scala/pom.xml                   |  14 +--
 .../flink-test-utils/pom.xml                    |   8 +-
 flink-tests/pom.xml                             |  36 +++---
 flink-yarn-tests/pom.xml                        |  12 +-
 flink-yarn/pom.xml                              |  10 +-
 tools/change-scala-version.sh                   | 117 -------------------
 tools/create_release_files.sh                   |  10 +-
 tools/deploy_to_maven.sh                        |   8 +-
 64 files changed, 376 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 8dde2e9..deac7fc 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-clients_2.10</artifactId>
+	<artifactId>flink-clients_${scala.binary.version}</artifactId>
 	<name>flink-clients</name>
 
 	<packaging>jar</packaging>
@@ -47,13 +47,13 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-optimizer_2.10</artifactId>
+			<artifactId>flink-optimizer_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/pom.xml b/flink-connectors/flink-avro/pom.xml
index 170b344..ed99eac 100644
--- a/flink-connectors/flink-avro/pom.xml
+++ b/flink-connectors/flink-avro/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-avro_2.10</artifactId>
+	<artifactId>flink-avro_${scala.binary.version}</artifactId>
 	<name>flink-avro</name>
 
 	<packaging>jar</packaging>
@@ -62,14 +62,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml
index d01c769..2722c30 100644
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-cassandra_2.10</artifactId>
+	<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
 	<name>flink-connector-cassandra</name>
 
 	<packaging>jar</packaging>
@@ -94,7 +94,7 @@ under the License.
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -136,27 +136,27 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
+			<artifactId>flink-tests_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-elasticsearch-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/pom.xml b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
index e183354..79b4f50 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+	<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
 	<name>flink-connector-elasticsearch-base</name>
 
 	<packaging>jar</packaging>
@@ -46,7 +46,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -61,14 +61,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -76,7 +76,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/pom.xml b/flink-connectors/flink-connector-elasticsearch/pom.xml
index 07028df..93e4eb6 100644
--- a/flink-connectors/flink-connector-elasticsearch/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-elasticsearch_2.10</artifactId>
+	<artifactId>flink-connector-elasticsearch_${scala.binary.version}</artifactId>
 	<name>flink-connector-elasticsearch</name>
 
 	<packaging>jar</packaging>
@@ -46,14 +46,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -61,14 +61,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -76,7 +76,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-elasticsearch2/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/pom.xml b/flink-connectors/flink-connector-elasticsearch2/pom.xml
index ec1b7da..7e21b8f 100644
--- a/flink-connectors/flink-connector-elasticsearch2/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch2/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
+	<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>
 	<name>flink-connector-elasticsearch2</name>
 
 	<packaging>jar</packaging>
@@ -46,14 +46,14 @@ under the License.
  
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -68,14 +68,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -83,7 +83,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-elasticsearch5/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch5/pom.xml b/flink-connectors/flink-connector-elasticsearch5/pom.xml
index 93d7bbe..72a0d18 100644
--- a/flink-connectors/flink-connector-elasticsearch5/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch5/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-elasticsearch5_2.10</artifactId>
+	<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
 	<name>flink-connector-elasticsearch5</name>
 
 	<packaging>jar</packaging>
@@ -46,14 +46,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<exclusions>
 				<!-- Elasticsearch Java Client has been moved to a different module in 5.x -->
@@ -88,14 +88,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -103,7 +103,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<exclusions>
 				<exclusion>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/pom.xml b/flink-connectors/flink-connector-filesystem/pom.xml
index 28d9962..07b0ae1 100644
--- a/flink-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-connectors/flink-connector-filesystem/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-filesystem_2.10</artifactId>
+	<artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
 	<name>flink-connector-filesystem</name>
 
 	<packaging>jar</packaging>
@@ -45,7 +45,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -68,21 +68,21 @@ under the License.
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-compatibility_2.10</artifactId>
+			<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -90,7 +90,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
+			<artifactId>flink-tests_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -98,7 +98,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index c17386d..231b22e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-kafka-0.10_2.10</artifactId>
+	<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
 	<name>flink-connector-kafka-0.10</name>
 
 	<packaging>jar</packaging>
@@ -46,7 +46,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
+			<artifactId>flink-connector-kafka-0.9_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<exclusions>
 				<exclusion>
@@ -60,7 +60,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -75,7 +75,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.10</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 			<!-- Projects depending on this project,
@@ -87,7 +87,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -95,7 +95,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
+			<artifactId>flink-connector-kafka-0.9_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<exclusions>
 				<!-- exclude Kafka dependencies -->
@@ -110,7 +110,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-base_2.10</artifactId>
+			<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<exclusions>
 				<!-- exclude Kafka dependencies -->
@@ -133,7 +133,7 @@ under the License.
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
+			<artifactId>flink-tests_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -141,14 +141,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index ceccb3e..5e2ed2d 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
+	<artifactId>flink-connector-kafka-0.8_${scala.binary.version}</artifactId>
 	<name>flink-connector-kafka-0.8</name>
 
 	<packaging>jar</packaging>
@@ -46,7 +46,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -60,13 +60,13 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-base_2.10</artifactId>
+			<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.10</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 			<!-- Projects depending on this project,
@@ -122,7 +122,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -144,7 +144,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-base_2.10</artifactId>
+			<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -152,7 +152,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -160,14 +160,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
+			<artifactId>flink-tests_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml b/flink-connectors/flink-connector-kafka-0.9/pom.xml
index de4dedb..0140353 100644
--- a/flink-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
+	<artifactId>flink-connector-kafka-0.9_${scala.binary.version}</artifactId>
 	<name>flink-connector-kafka-0.9</name>
 
 	<packaging>jar</packaging>
@@ -46,14 +46,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-base_2.10</artifactId>
+			<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<exclusions>
 				<exclusion>
@@ -65,7 +65,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.10</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 			<!-- Projects depending on this project,
@@ -83,7 +83,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -91,7 +91,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-base_2.10</artifactId>
+			<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<exclusions>
 				<!-- exclude 0.8 dependencies -->
@@ -121,7 +121,7 @@ under the License.
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
+			<artifactId>flink-tests_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -129,14 +129,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml
index a6b2349..2cc94b0 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-kafka-base_2.10</artifactId>
+	<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
 	<name>flink-connector-kafka-base</name>
 
 	<packaging>jar</packaging>
@@ -46,14 +46,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.10</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 			<!-- Projects depending on this project,
@@ -132,7 +132,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -140,14 +140,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
+			<artifactId>flink-tests_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -155,7 +155,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -163,7 +163,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro_2.10</artifactId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 7515f31..4628937 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-kinesis_2.10</artifactId>
+	<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
 	<name>flink-connector-kinesis</name>
 	<properties>
 		<aws.sdk.version>1.10.71</aws.sdk.version>
@@ -44,7 +44,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -57,7 +57,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -65,7 +65,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
+			<artifactId>flink-tests_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -73,7 +73,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -81,7 +81,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-nifi/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/pom.xml b/flink-connectors/flink-connector-nifi/pom.xml
index 67bfb29..efa7ae7 100644
--- a/flink-connectors/flink-connector-nifi/pom.xml
+++ b/flink-connectors/flink-connector-nifi/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-nifi_2.10</artifactId>
+	<artifactId>flink-connector-nifi_${scala.binary.version}</artifactId>
 	<name>flink-connector-nifi</name>
 
 	<packaging>jar</packaging>
@@ -48,26 +48,26 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_2.10</artifactId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
 			<scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_2.10</artifactId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
             <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-tests_2.10</artifactId>
+            <artifactId>flink-tests_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_2.10</artifactId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
 			<type>test-jar</type>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/pom.xml b/flink-connectors/flink-connector-rabbitmq/pom.xml
index f26e57c..38337f4 100644
--- a/flink-connectors/flink-connector-rabbitmq/pom.xml
+++ b/flink-connectors/flink-connector-rabbitmq/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-rabbitmq_2.10</artifactId>
+	<artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId>
 	<name>flink-connector-rabbitmq</name>
 
 	<packaging>jar</packaging>
@@ -44,7 +44,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -57,7 +57,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -65,7 +65,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-connector-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-twitter/pom.xml b/flink-connectors/flink-connector-twitter/pom.xml
index c683da1..38bfb30 100644
--- a/flink-connectors/flink-connector-twitter/pom.xml
+++ b/flink-connectors/flink-connector-twitter/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-twitter_2.10</artifactId>
+	<artifactId>flink-connector-twitter_${scala.binary.version}</artifactId>
 	<name>flink-connector-twitter</name>
 
 	<packaging>jar</packaging>
@@ -43,7 +43,7 @@ under the License.
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/pom.xml b/flink-connectors/flink-hadoop-compatibility/pom.xml
index 003225b..2dee17d 100644
--- a/flink-connectors/flink-hadoop-compatibility/pom.xml
+++ b/flink-connectors/flink-hadoop-compatibility/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-hadoop-compatibility_2.10</artifactId>
+	<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
 	<name>flink-hadoop-compatibility</name>
 
 	<packaging>jar</packaging>
@@ -47,7 +47,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -70,7 +70,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/pom.xml b/flink-connectors/flink-hbase/pom.xml
index 6a295a8..949e27a 100644
--- a/flink-connectors/flink-hbase/pom.xml
+++ b/flink-connectors/flink-hbase/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-hbase_2.10</artifactId>
+	<artifactId>flink-hbase_${scala.binary.version}</artifactId>
 	<name>flink-hbase</name>
 	<packaging>jar</packaging>
 
@@ -89,7 +89,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -98,7 +98,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_2.10</artifactId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -107,7 +107,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.10</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 			<!-- Projects depending on this project,
@@ -124,7 +124,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 
@@ -206,7 +206,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<exclusions>
@@ -219,13 +219,13 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-compatibility_2.10</artifactId>
+			<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<exclusions>
 				<exclusion>
 					<groupId>org.apache.flink</groupId>
-					<artifactId>flink-shaded-include-yarn_2.10</artifactId>
+					<artifactId>flink-shaded-include-yarn_${scala.binary.version}</artifactId>
 				</exclusion>
 			</exclusions>
 		</dependency>
@@ -271,7 +271,7 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/pom.xml b/flink-connectors/flink-hcatalog/pom.xml
index a975e53..a9fbceb 100644
--- a/flink-connectors/flink-hcatalog/pom.xml
+++ b/flink-connectors/flink-hcatalog/pom.xml
@@ -44,7 +44,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-compatibility_2.10</artifactId>
+			<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-connectors/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
index 38a0163..a2bbaf4 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -39,7 +39,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.10</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 			<!-- Projects depending on this project,
@@ -56,7 +56,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-contrib/flink-connector-wikiedits/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-connector-wikiedits/pom.xml b/flink-contrib/flink-connector-wikiedits/pom.xml
index fed8656..20b6db4 100644
--- a/flink-contrib/flink-connector-wikiedits/pom.xml
+++ b/flink-contrib/flink-connector-wikiedits/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-wikiedits_2.10</artifactId>
+	<artifactId>flink-connector-wikiedits_${scala.binary.version}</artifactId>
 	<name>flink-connector-wikiedits</name>
 
 	<packaging>jar</packaging>
@@ -37,7 +37,7 @@ under the License.
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index 2148706..527ca18 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -31,7 +31,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-statebackend-rocksdb_2.10</artifactId>
+	<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
 	<name>flink-statebackend-rocksdb</name>
 
 	<packaging>jar</packaging>
@@ -42,14 +42,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -78,7 +78,7 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -86,7 +86,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-contrib/flink-storm-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml
index 4d9d76c..6ef0f7b 100644
--- a/flink-contrib/flink-storm-examples/pom.xml
+++ b/flink-contrib/flink-storm-examples/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-storm-examples_2.10</artifactId>
+	<artifactId>flink-storm-examples_${scala.binary.version}</artifactId>
 	<name>flink-storm-examples</name>
 
 	<packaging>jar</packaging>
@@ -56,19 +56,19 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-storm_2.10</artifactId>
+			<artifactId>flink-storm_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-examples-batch_2.10</artifactId>
+			<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -95,14 +95,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -128,7 +128,7 @@ under the License.
 							<artifactItems>
 								<artifactItem>
 									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-examples-batch_2.10</artifactId>
+									<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
 									<version>${project.version}</version>
 									<type>jar</type>
 									<overWrite>false</overWrite>
@@ -138,7 +138,7 @@ under the License.
 								</artifactItem>
 								<artifactItem>
 									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-storm_2.10</artifactId>
+									<artifactId>flink-storm_${scala.binary.version}</artifactId>
 									<version>${project.version}</version>
 									<type>jar</type>
 									<overWrite>false</overWrite>
@@ -326,8 +326,8 @@ under the License.
 									<!-- Storm's recursive dependencies -->
 									<include>org.yaml:snakeyaml</include>
 									<include>com.googlecode.json-simple:json-simple</include>
-									<include>org.apache.flink:flink-storm_2.10</include>
-									<include>org.apache.flink:flink-storm-examples_2.10</include>
+									<include>org.apache.flink:flink-storm_${scala.binary.version}</include>
+									<include>org.apache.flink:flink-storm-examples_${scala.binary.version}</include>
 								</includes>
 							</artifactSet>
 							<filters>
@@ -354,7 +354,7 @@ under the License.
 									</includes>
 								</filter>
 								<filter>
-									<artifact>org.apache.flink:flink-storm-examples_2.10</artifact>
+									<artifact>org.apache.flink:flink-storm-examples_${scala.binary.version}</artifact>
 									<includes>
 										<include>org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.class
 										</include>
@@ -366,7 +366,7 @@ under the License.
 									</includes>
 								</filter>
 								<filter>
-									<artifact>org.apache.flink:flink-storm_2.10</artifact>
+									<artifact>org.apache.flink:flink-storm_${scala.binary.version}</artifact>
 									<includes>
 										<include>org/apache/flink/storm/api/*.class</include>
 										<include>org/apache/flink/storm/util/*.class</include>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml
index 5189b3b..a10ff68 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-storm_2.10</artifactId>
+	<artifactId>flink-storm_${scala.binary.version}</artifactId>
 	<name>flink-storm</name>
 
 	<packaging>jar</packaging>
@@ -65,7 +65,7 @@ under the License.
 		<!-- Core streaming API -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -174,7 +174,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-contrib/flink-streaming-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/pom.xml b/flink-contrib/flink-streaming-contrib/pom.xml
index 2df0231..2381d70 100644
--- a/flink-contrib/flink-streaming-contrib/pom.xml
+++ b/flink-contrib/flink-streaming-contrib/pom.xml
@@ -31,7 +31,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-streaming-contrib_2.10</artifactId>
+	<artifactId>flink-streaming-contrib_${scala.binary.version}</artifactId>
 	<name>flink-streaming-contrib</name>
 
 	<packaging>jar</packaging>
@@ -42,19 +42,19 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_2.10</artifactId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -62,7 +62,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-contrib/flink-tweet-inputformat/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/pom.xml b/flink-contrib/flink-tweet-inputformat/pom.xml
index 5e0d79c..9328bb6 100644
--- a/flink-contrib/flink-tweet-inputformat/pom.xml
+++ b/flink-contrib/flink-tweet-inputformat/pom.xml
@@ -31,7 +31,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-tweet-inputformat_2.10</artifactId>
+	<artifactId>flink-tweet-inputformat_${scala.binary.version}</artifactId>
 	<name>flink-tweet-inputformat</name>
 
 	<packaging>jar</packaging>
@@ -44,12 +44,12 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 6d8debf..0493b98 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-dist_2.10</artifactId>
+	<artifactId>flink-dist_${scala.binary.version}</artifactId>
 	<name>flink-dist</name>
 	<packaging>jar</packaging>
 
@@ -51,61 +51,61 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime-web_2.10</artifactId>
+			<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-optimizer_2.10</artifactId>
+			<artifactId>flink-optimizer_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro_2.10</artifactId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_2.10</artifactId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-python_2.10</artifactId>
+			<artifactId>flink-python_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala-shell_2.10</artifactId>
+			<artifactId>flink-scala-shell_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -123,19 +123,19 @@ under the License.
         
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-mesos_2.10</artifactId>
+			<artifactId>flink-mesos_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-statebackend-rocksdb_2.10</artifactId>
+			<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-yarn_2.10</artifactId>
+			<artifactId>flink-yarn_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -163,7 +163,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-gelly-examples_2.10</artifactId>
+			<artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -214,42 +214,42 @@ under the License.
 		<!-- start optional Flink libraries -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-cep_2.10</artifactId>
+			<artifactId>flink-cep_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-cep-scala_2.10</artifactId>
+			<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-gelly_2.10</artifactId>
+			<artifactId>flink-gelly_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-gelly-scala_2.10</artifactId>
+			<artifactId>flink-gelly-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-ml_2.10</artifactId>
+			<artifactId>flink-ml_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.10</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index c3385e1..909beb4 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -39,7 +39,7 @@ under the License.
 			<useTransitiveFiltering>true</useTransitiveFiltering>
 
 			<includes>
-				<include>org.apache.flink:flink-python_2.10</include>
+				<include>org.apache.flink:flink-python_${scala.binary.version}</include>
 				<include>org.slf4j:slf4j-log4j12</include>
 				<include>log4j:log4j</include>
 			</includes>
@@ -49,7 +49,7 @@ under the License.
 	<files>
 		<!-- copy fat jar -->
 		<file>
-			<source>target/flink-dist_2.10-${project.version}.jar</source>
+			<source>target/flink-dist_${scala.binary.version}-${project.version}.jar</source>
 			<outputDirectory>lib/</outputDirectory>
 			<fileMode>0644</fileMode>
 		</file>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index 0386b92..23846be 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -31,47 +31,47 @@
 	<files>
 		<!-- CEP -->
 		<file>
-			<source>../flink-libraries/flink-cep/target/flink-cep_2.10-${project.version}.jar</source>
+			<source>../flink-libraries/flink-cep/target/flink-cep_${scala.binary.version}-${project.version}.jar</source>
 			<outputDirectory>opt/</outputDirectory>
-			<destName>flink-cep_2.10-${project.version}.jar</destName>
+			<destName>flink-cep_${scala.binary.version}-${project.version}.jar</destName>
 			<fileMode>0644</fileMode>
 		</file>
 
 		<file>
-			<source>../flink-libraries/flink-cep-scala/target/flink-cep-scala_2.10-${project.version}.jar</source>
+			<source>../flink-libraries/flink-cep-scala/target/flink-cep-scala_${scala.binary.version}-${project.version}.jar</source>
 			<outputDirectory>opt/</outputDirectory>
-			<destName>flink-cep-scala_2.10-${project.version}.jar</destName>
+			<destName>flink-cep-scala_${scala.binary.version}-${project.version}.jar</destName>
 			<fileMode>0644</fileMode>
 		</file>
 
 		<!-- Gelly -->
 		<file>
-			<source>../flink-libraries/flink-gelly/target/flink-gelly_2.10-${project.version}.jar</source>
+			<source>../flink-libraries/flink-gelly/target/flink-gelly_${scala.binary.version}-${project.version}.jar</source>
 			<outputDirectory>opt/</outputDirectory>
-			<destName>flink-gelly_2.10-${project.version}.jar</destName>
+			<destName>flink-gelly_${scala.binary.version}-${project.version}.jar</destName>
 			<fileMode>0644</fileMode>
 		</file>
 
 		<file>
-			<source>../flink-libraries/flink-gelly-scala/target/flink-gelly-scala_2.10-${project.version}-jar-with-dependencies.jar</source>
+			<source>../flink-libraries/flink-gelly-scala/target/flink-gelly-scala_${scala.binary.version}-${project.version}-jar-with-dependencies.jar</source>
 			<outputDirectory>opt/</outputDirectory>
-			<destName>flink-gelly-scala_2.10-${project.version}.jar</destName>
+			<destName>flink-gelly-scala_${scala.binary.version}-${project.version}.jar</destName>
 			<fileMode>0644</fileMode>
 		</file>
 
 		<!-- TableAPI-->
 		<file>
-			<source>../flink-libraries/flink-table/target/flink-table_2.10-${project.version}.jar</source>
+			<source>../flink-libraries/flink-table/target/flink-table_${scala.binary.version}-${project.version}.jar</source>
 			<outputDirectory>opt/</outputDirectory>
-			<destName>flink-table_2.10-${project.version}.jar</destName>
+			<destName>flink-table_${scala.binary.version}-${project.version}.jar</destName>
 			<fileMode>0644</fileMode>
 		</file>
 
 		<!-- ML -->
 		<file>
-			<source>../flink-libraries/flink-ml/target/flink-ml_2.10-${project.version}-jar-with-dependencies.jar</source>
+			<source>../flink-libraries/flink-ml/target/flink-ml_${scala.binary.version}-${project.version}-jar-with-dependencies.jar</source>
 			<outputDirectory>opt/</outputDirectory>
-			<destName>flink-ml_2.10-${project.version}.jar</destName>
+			<destName>flink-ml_${scala.binary.version}-${project.version}.jar</destName>
 			<fileMode>0644</fileMode>
 		</file>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-examples/flink-examples-batch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/pom.xml b/flink-examples/flink-examples-batch/pom.xml
index 269f5a6..cbff7e2 100644
--- a/flink-examples/flink-examples-batch/pom.xml
+++ b/flink-examples/flink-examples-batch/pom.xml
@@ -23,12 +23,12 @@ under the License.
 
 	<parent>
 		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-examples_2.10</artifactId>
+		<artifactId>flink-examples_${scala.binary.version}</artifactId>
 		<version>1.4-SNAPSHOT</version>
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-examples-batch_2.10</artifactId>
+	<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
 	<name>flink-examples-batch</name>
 	<packaging>jar</packaging>
 
@@ -41,7 +41,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_2.10</artifactId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 	</dependencies>
@@ -363,14 +363,14 @@ under the License.
 						</goals>
 						<configuration> 
 							<target>
-								<copy file="${project.basedir}/target/flink-examples-batch_2.10-${project.version}-KMeans.jar" tofile="${project.basedir}/target/KMeans.jar" />
-								<copy file="${project.basedir}/target/flink-examples-batch_2.10-${project.version}-ConnectedComponents.jar" tofile="${project.basedir}/target/ConnectedComponents.jar" />
-								<copy file="${project.basedir}/target/flink-examples-batch_2.10-${project.version}-EnumTriangles.jar" tofile="${project.basedir}/target/EnumTriangles.jar" />
-								<copy file="${project.basedir}/target/flink-examples-batch_2.10-${project.version}-PageRank.jar" tofile="${project.basedir}/target/PageRank.jar" />
-								<copy file="${project.basedir}/target/flink-examples-batch_2.10-${project.version}-TransitiveClosure.jar" tofile="${project.basedir}/target/TransitiveClosure.jar" />
-								<copy file="${project.basedir}/target/flink-examples-batch_2.10-${project.version}-WebLogAnalysis.jar" tofile="${project.basedir}/target/WebLogAnalysis.jar" />
-								<copy file="${project.basedir}/target/flink-examples-batch_2.10-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
-								<copy file="${project.basedir}/target/flink-examples-batch_2.10-${project.version}-DistCp.jar" tofile="${project.basedir}/target/DistCp.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch_${scala.binary.version}-${project.version}-KMeans.jar" tofile="${project.basedir}/target/KMeans.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch_${scala.binary.version}-${project.version}-ConnectedComponents.jar" tofile="${project.basedir}/target/ConnectedComponents.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch_${scala.binary.version}-${project.version}-EnumTriangles.jar" tofile="${project.basedir}/target/EnumTriangles.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch_${scala.binary.version}-${project.version}-PageRank.jar" tofile="${project.basedir}/target/PageRank.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch_${scala.binary.version}-${project.version}-TransitiveClosure.jar" tofile="${project.basedir}/target/TransitiveClosure.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch_${scala.binary.version}-${project.version}-WebLogAnalysis.jar" tofile="${project.basedir}/target/WebLogAnalysis.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch_${scala.binary.version}-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch_${scala.binary.version}-${project.version}-DistCp.jar" tofile="${project.basedir}/target/DistCp.jar" />
 							</target>
 						</configuration>
 					</execution>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-examples/flink-examples-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index 2bacf25..b52116b 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -24,12 +24,12 @@ under the License.
 
 	<parent>
 		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-examples_2.10</artifactId>
+		<artifactId>flink-examples_${scala.binary.version}</artifactId>
 		<version>1.4-SNAPSHOT</version>
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-examples-streaming_2.10</artifactId>
+	<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
 	<name>flink-examples-streaming</name>
 
 	<packaging>jar</packaging>
@@ -40,31 +40,31 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_2.10</artifactId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-examples-batch_2.10</artifactId>
+			<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-twitter_2.10</artifactId>
+			<artifactId>flink-connector-twitter_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
+			<artifactId>flink-connector-kafka-0.8_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -72,14 +72,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
@@ -116,7 +116,7 @@ under the License.
 								<!-- For WordCount example data -->
 								<artifactItem>
 									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-examples-batch_2.10</artifactId>
+									<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
 									<version>${project.version}</version>
 									<type>jar</type>
 									<overWrite>false</overWrite>
@@ -126,7 +126,7 @@ under the License.
 								<!-- For JSON utilities -->
 								<artifactItem>
 									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-connector-twitter_2.10</artifactId>
+									<artifactId>flink-connector-twitter_${scala.binary.version}</artifactId>
 									<version>${project.version}</version>
 									<type>jar</type>
 									<overWrite>false</overWrite>
@@ -475,13 +475,13 @@ under the License.
 						</goals>
 						<configuration>
 							<target>
-								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-IncrementalLearning.jar" tofile="${project.basedir}/target/IncrementalLearning.jar" />
-								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-Iteration.jar" tofile="${project.basedir}/target/Iteration.jar" />
-								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-SessionWindowing.jar" tofile="${project.basedir}/target/SessionWindowing.jar" />
-								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-TopSpeedWindowing.jar" tofile="${project.basedir}/target/TopSpeedWindowing.jar" />
-								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-WindowJoin.jar" tofile="${project.basedir}/target/WindowJoin.jar" />
-								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
-								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-SocketWindowWordCount.jar" tofile="${project.basedir}/target/SocketWindowWordCount.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming_${scala.binary.version}-${project.version}-IncrementalLearning.jar" tofile="${project.basedir}/target/IncrementalLearning.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming_${scala.binary.version}-${project.version}-Iteration.jar" tofile="${project.basedir}/target/Iteration.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming_${scala.binary.version}-${project.version}-SessionWindowing.jar" tofile="${project.basedir}/target/SessionWindowing.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming_${scala.binary.version}-${project.version}-TopSpeedWindowing.jar" tofile="${project.basedir}/target/TopSpeedWindowing.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming_${scala.binary.version}-${project.version}-WindowJoin.jar" tofile="${project.basedir}/target/WindowJoin.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming_${scala.binary.version}-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming_${scala.binary.version}-${project.version}-SocketWindowWordCount.jar" tofile="${project.basedir}/target/SocketWindowWordCount.jar" />
 							</target>
 						</configuration>
 					</execution>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-examples/flink-examples-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml
index ce1c701..e59d8c6 100644
--- a/flink-examples/flink-examples-table/pom.xml
+++ b/flink-examples/flink-examples-table/pom.xml
@@ -24,27 +24,27 @@ under the License.
 
 	<parent>
 		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-examples_2.10</artifactId>
+		<artifactId>flink-examples_${scala.binary.version}</artifactId>
 		<version>1.4-SNAPSHOT</version>
 		<relativePath>..</relativePath>
 	</parent>
 
 	<name>flink-examples-table</name>
-	<artifactId>flink-examples-table_2.10</artifactId>
+	<artifactId>flink-examples-table_${scala.binary.version}</artifactId>
 	<packaging>jar</packaging>
 
 	<dependencies>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.10</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_2.10</artifactId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -78,7 +78,7 @@ under the License.
 				<artifactId>maven-shade-plugin</artifactId>
 				<executions>
 					<execution>
-						<id>flink-table-examples_2.10</id>
+						<id>flink-table-examples_${scala.binary.version}</id>
 						<phase>package</phase>
 						<goals>
 							<goal>shade</goal>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/pom.xml b/flink-examples/pom.xml
index 409c8f1..644d9eb 100644
--- a/flink-examples/pom.xml
+++ b/flink-examples/pom.xml
@@ -28,7 +28,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-examples_2.10</artifactId>
+	<artifactId>flink-examples_${scala.binary.version}</artifactId>
 	<name>flink-examples</name>
 	<packaging>pom</packaging>
 
@@ -44,7 +44,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-fs-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-fs-tests/pom.xml b/flink-fs-tests/pom.xml
index c354c80..446d167 100644
--- a/flink-fs-tests/pom.xml
+++ b/flink-fs-tests/pom.xml
@@ -27,7 +27,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-fs-tests_2.10</artifactId>
+	<artifactId>flink-fs-tests_${scala.binary.version}</artifactId>
 	<name>flink-fs-tests</name>
 
 	<packaging>jar</packaging>
@@ -45,7 +45,7 @@ under the License.
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -53,28 +53,28 @@ under the License.
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-examples-batch_2.10</artifactId>
+			<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro_2.10</artifactId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
index 8de3b92..fb7da02 100644
--- a/flink-java8/pom.xml
+++ b/flink-java8/pom.xml
@@ -30,7 +30,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-java8_2.10</artifactId>
+	<artifactId>flink-java8_${scala.binary.version}</artifactId>
 	<name>flink-java8</name>
 
 	<packaging>jar</packaging>
@@ -51,13 +51,13 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-examples-batch_2.10</artifactId>
+			<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -65,14 +65,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-cep_2.10</artifactId>
+			<artifactId>flink-cep_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
@@ -116,7 +116,7 @@ under the License.
 							<artifactItems>
 								<artifactItem>
 									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-examples-batch_2.10</artifactId>
+									<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
 									<version>${project.version}</version>
 									<type>jar</type>
 									<overWrite>false</overWrite>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-libraries/flink-cep-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/pom.xml b/flink-libraries/flink-cep-scala/pom.xml
index ca5bbce..c9b4a46 100644
--- a/flink-libraries/flink-cep-scala/pom.xml
+++ b/flink-libraries/flink-cep-scala/pom.xml
@@ -30,7 +30,7 @@ under the License.
         <relativePath>..</relativePath>
     </parent>
     
-    <artifactId>flink-cep-scala_2.10</artifactId>
+    <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
     <name>flink-cep-scala</name>
     <packaging>jar</packaging>
 
@@ -40,13 +40,13 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-cep_2.10</artifactId>
+            <artifactId>flink-cep_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-scala_2.10</artifactId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -83,14 +83,14 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_2.10</artifactId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-tests_2.10</artifactId>
+            <artifactId>flink-tests_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
             <type>test-jar</type>
@@ -98,7 +98,7 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-scala_2.10</artifactId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
             <type>test-jar</type>
@@ -106,7 +106,7 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-cep_2.10</artifactId>
+            <artifactId>flink-cep_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
             <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/35c08712/flink-libraries/flink-cep/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml
index 7f89c24..35045c0 100644
--- a/flink-libraries/flink-cep/pom.xml
+++ b/flink-libraries/flink-cep/pom.xml
@@ -30,7 +30,7 @@ under the License.
         <relativePath>..</relativePath>
     </parent>
 
-    <artifactId>flink-cep_2.10</artifactId>
+    <artifactId>flink-cep_${scala.binary.version}</artifactId>
     <name>flink-cep</name>
     <packaging>jar</packaging>
 
@@ -47,7 +47,7 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_2.10</artifactId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -62,14 +62,14 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_2.10</artifactId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_2.10</artifactId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
@@ -77,7 +77,7 @@ under the License.
 
        <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_2.10</artifactId>
+            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
@@ -85,7 +85,7 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
+            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>


[09/13] flink git commit: [FLINK-5679] [tests] Refactor PartitionedStateCheckpointingITCase

Posted by se...@apache.org.
[FLINK-5679] [tests] Refactor PartitionedStateCheckpointingITCase

  - Massively speeds up the test by using fewer test elements and better coordination
    of source throttling and checkpointing.

  - Makes the test compatible with Windows by using proper URI encoding

  - Drops use of deprecated ValueState constructors


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72aa2622
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72aa2622
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72aa2622

Branch: refs/heads/master
Commit: 72aa2622299ea492c74c4ab0d3e00f0d323df4a9
Parents: e10cdaa
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 11 10:55:22 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 14:11:07 2017 +0200

----------------------------------------------------------------------
 .../KeyedStateCheckpointingITCase.java          | 401 +++++++++++++++++++
 .../PartitionedStateCheckpointingITCase.java    | 308 --------------
 2 files changed, 401 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72aa2622/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
new file mode 100644
index 0000000..147d385
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * A simple test that runs a streaming topology with checkpointing enabled.
+ * 
+ * The test triggers a failure after a while and verifies that, after
+ * completion, the state reflects the "exactly once" semantics.
+ * 
+ * It is designed to check partitioned states.
+ */
+@SuppressWarnings("serial")
+public class KeyedStateCheckpointingITCase extends TestLogger {
+
+	protected static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
+
+	protected static final int NUM_STRINGS = 10_000;
+	protected static final int NUM_KEYS = 40;
+
+	protected static final int NUM_TASK_MANAGERS = 2;
+	protected static final int NUM_TASK_SLOTS = 2;
+	protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+
+	// ------------------------------------------------------------------------
+
+	private static LocalFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void startCluster() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+
+		cluster = new LocalFlinkMiniCluster(config, false);
+		cluster.start();
+	}
+
+	@AfterClass
+	public static void stopCluster() throws Exception{
+		if (cluster != null) {
+			cluster.stop();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Rule
+	public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+	@Test
+	public void testWithMemoryBackendSync() throws Exception {
+		MemoryStateBackend syncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
+		testProgramWithBackend(syncMemBackend);
+	}
+
+	@Test
+	public void testWithMemoryBackendAsync() throws Exception {
+		MemoryStateBackend asyncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
+		testProgramWithBackend(asyncMemBackend);
+	}
+
+	@Test
+	public void testWithFsBackendSync() throws Exception {
+		FsStateBackend syncFsBackend = new FsStateBackend(tmpFolder.newFolder().toURI().toString(), false);
+		testProgramWithBackend(syncFsBackend);
+	}
+
+	@Test
+	public void testWithFsBackendAsync() throws Exception {
+		FsStateBackend asyncFsBackend = new FsStateBackend(tmpFolder.newFolder().toURI().toString(), true);
+		testProgramWithBackend(asyncFsBackend);
+	}
+
+	@Test
+	public void testWithRocksDbBackendFull() throws Exception {
+		RocksDBStateBackend fullRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), false);
+		fullRocksDbBackend.setDbStoragePath(tmpFolder.newFolder().getAbsolutePath());
+
+		testProgramWithBackend(fullRocksDbBackend);
+	}
+
+	@Test
+	public void testWithRocksDbBackendIncremental() throws Exception {
+		RocksDBStateBackend incRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
+		incRocksDbBackend.setDbStoragePath(tmpFolder.newFolder().getAbsolutePath());
+
+		testProgramWithBackend(incRocksDbBackend);
+	}
+
+	// ------------------------------------------------------------------------
+
+	protected void testProgramWithBackend(AbstractStateBackend stateBackend) throws Exception {
+		assertEquals("Broken test setup", 0, (NUM_STRINGS / 2) % NUM_KEYS);
+
+		final StreamExecutionEnvironment env = new TestStreamEnvironment(cluster, PARALLELISM);
+		env.setParallelism(PARALLELISM);
+		env.enableCheckpointing(500);
+		env.getConfig().disableSysoutLogging();
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
+
+		env.setStateBackend(stateBackend);
+
+		// compute when (randomly) the failure should happen
+		final int failurePosMin = (int) (0.6 * NUM_STRINGS / PARALLELISM);
+		final int failurePosMax = (int) (0.8 * NUM_STRINGS / PARALLELISM);
+		final int failurePos = (new Random().nextInt(failurePosMax - failurePosMin) + failurePosMin);
+
+		final DataStream<Integer> stream1 = env.addSource(
+				new IntGeneratingSourceFunction(NUM_STRINGS / 2, NUM_STRINGS / 4));
+
+		final DataStream<Integer> stream2 = env.addSource(
+				new IntGeneratingSourceFunction(NUM_STRINGS / 2, NUM_STRINGS / 4));
+
+		stream1.union(stream2)
+				.keyBy(new IdentityKeySelector<Integer>())
+				.map(new OnceFailingPartitionedSum(failurePos))
+				.keyBy(0)
+				.addSink(new CounterSink());
+
+		env.execute();
+
+		// verify that we counted exactly right
+		assertEquals(NUM_KEYS, CounterSink.ALL_COUNTS.size());
+		assertEquals(NUM_KEYS, OnceFailingPartitionedSum.ALL_SUMS.size());
+
+		for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.ALL_SUMS.entrySet()) {
+			assertEquals((long) sum.getKey() * NUM_STRINGS / NUM_KEYS, sum.getValue().longValue());
+		}
+		for (long count : CounterSink.ALL_COUNTS.values()) {
+			assertEquals(NUM_STRINGS / NUM_KEYS, count);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Custom Functions
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * A source that generates a sequence of integers and throttles down until a checkpoint
+	 * has happened.
+	 */
+	private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer> 
+		implements ListCheckpointed<Integer>, CheckpointListener {
+
+		private final int numElements;
+		private final int checkpointLatestAt;
+
+		private int lastEmitted = -1;
+
+		private boolean checkpointHappened;
+
+		private volatile boolean isRunning = true;
+
+		IntGeneratingSourceFunction(int numElements, int checkpointLatestAt) {
+			this.numElements = numElements;
+			this.checkpointLatestAt = checkpointLatestAt;
+		}
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			final Object lockingObject = ctx.getCheckpointLock();
+			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+
+			int nextElement = lastEmitted >= 0 ? lastEmitted + step :
+				getRuntimeContext().getIndexOfThisSubtask();
+
+			while (isRunning && nextElement < numElements) {
+
+				// throttle / block if we are still waiting for the checkpoint
+				if (!checkpointHappened) {
+					if (nextElement < checkpointLatestAt) {
+						// only throttle
+						Thread.sleep(1);
+					} else {
+						// hard block
+						synchronized (this) {
+							while (!checkpointHappened) {
+								this.wait();
+							}
+						}
+					}
+				}
+
+				//noinspection SynchronizationOnLocalVariableOrMethodParameter
+				synchronized (lockingObject) {
+					ctx.collect(nextElement % NUM_KEYS);
+					lastEmitted = nextElement;
+				}
+
+				nextElement += step;
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		@Override
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(lastEmitted);
+		}
+
+		@Override
+		public void restoreState(List<Integer> state) throws Exception {
+			assertEquals("Test failed due to unexpected recovered state size", 1, state.size());
+			lastEmitted = state.get(0);
+			checkpointHappened = true;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			synchronized (this) {
+				checkpointHappened = true;
+				this.notifyAll();
+			}
+		}
+	}
+
+	private static class OnceFailingPartitionedSum
+			extends RichMapFunction<Integer, Tuple2<Integer, Long>>
+			implements ListCheckpointed<Integer> {
+
+		private static final Map<Integer, Long> ALL_SUMS = new ConcurrentHashMap<>();
+
+		private final int failurePos;
+		private int count;
+
+		private boolean shouldFail = true;
+
+		private transient ValueState<Long> sum;
+
+		OnceFailingPartitionedSum(int failurePos) {
+			this.failurePos = failurePos;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws IOException {
+			sum = getRuntimeContext().getState(new ValueStateDescriptor<>("my_state", Long.class));
+		}
+
+		@Override
+		public Tuple2<Integer, Long> map(Integer value) throws Exception {
+			if (shouldFail && count++ >= failurePos) {
+				shouldFail = false;
+				throw new Exception("Test Failure");
+			}
+
+			Long oldSum = sum.value();
+			long currentSum = (oldSum == null ? 0L : oldSum) + value;
+
+			sum.update(currentSum);
+			ALL_SUMS.put(value, currentSum);
+			return new Tuple2<>(value, currentSum);
+		}
+
+		@Override
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(count);
+		}
+
+		@Override
+		public void restoreState(List<Integer> state) throws Exception {
+			assertEquals("Test failed due to unexpected recovered state size", 1, state.size());
+			count = state.get(0);
+			shouldFail = false;
+		}
+
+		@Override
+		public void close() throws Exception {
+			if (shouldFail) {
+				fail("Test ineffective: Function cleanly finished without ever failing.");
+			}
+		}
+	}
+
+	private static class CounterSink extends RichSinkFunction<Tuple2<Integer, Long>> {
+
+		private static final Map<Integer, Long> ALL_COUNTS = new ConcurrentHashMap<>();
+
+		private transient ValueState<NonSerializableLong> aCounts;
+		private transient ValueState<Long> bCounts;
+
+		@Override
+		public void open(Configuration parameters) throws IOException {
+			aCounts = getRuntimeContext().getState(new ValueStateDescriptor<>("a", NonSerializableLong.class));
+			bCounts = getRuntimeContext().getState(new ValueStateDescriptor<>("b", Long.class));
+		}
+
+		@Override
+		public void invoke(Tuple2<Integer, Long> value) throws Exception {
+			final NonSerializableLong acRaw = aCounts.value();
+			final Long bcRaw = bCounts.value();
+
+			final long ac = acRaw == null ? 0L : acRaw.value;
+			final long bc = bcRaw == null ? 0L : bcRaw;
+
+			assertEquals(ac, bc);
+
+			long currentCount = ac + 1;
+			aCounts.update(NonSerializableLong.of(currentCount));
+			bCounts.update(currentCount);
+
+			ALL_COUNTS.put(value.f0, currentCount);
+		}
+	}
+
+	public static class IdentityKeySelector<T> implements KeySelector<T, T> {
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  data types
+	// ------------------------------------------------------------------------
+
+	public static class NonSerializableLong {
+
+		public long value;
+
+		private NonSerializableLong(long value) {
+			this.value = value;
+		}
+
+		public static NonSerializableLong of(long value) {
+			return new NonSerializableLong(value);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return this == obj || 
+					obj != null && obj.getClass() == getClass() && ((NonSerializableLong) obj).value == this.value;
+		}
+
+		@Override
+		public int hashCode() {
+			return (int) (value ^ (value >>> 32));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/72aa2622/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
deleted file mode 100644
index 517c82b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-/**
- * A simple test that runs a streaming topology with checkpointing enabled.
- * 
- * The test triggers a failure after a while and verifies that, after
- * completion, the state reflects the "exactly once" semantics.
- * 
- * It is designed to check partitioned states.
- */
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTestBase {
-
-	private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
-
-	final long NUM_STRINGS = 10_000_000L;
-	final static int NUM_KEYS = 40;
-
-	@Parameterized.Parameters
-	public static Collection<AbstractStateBackend> parameters() throws IOException {
-		TemporaryFolder tempFolder = new TemporaryFolder();
-		tempFolder.create();
-
-		MemoryStateBackend syncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
-		MemoryStateBackend asyncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
-
-		FsStateBackend syncFsBackend = new FsStateBackend("file://" + tempFolder.newFolder().getAbsolutePath(), false);
-		FsStateBackend asyncFsBackend = new FsStateBackend("file://" + tempFolder.newFolder().getAbsolutePath(), true);
-
-		RocksDBStateBackend fullRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), false);
-		fullRocksDbBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());
-
-		RocksDBStateBackend incRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
-		incRocksDbBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());
-
-		return Arrays.asList(
-			syncMemBackend,
-			asyncMemBackend,
-			syncFsBackend,
-			asyncFsBackend,
-			fullRocksDbBackend,
-			incRocksDbBackend);
-	}
-
-	@Parameterized.Parameter
-	public AbstractStateBackend stateBackend;
-
-	@Override
-	public void testProgram(StreamExecutionEnvironment env) {
-		assertTrue("Broken test setup", (NUM_STRINGS/2) % NUM_KEYS == 0);
-
-		env.setStateBackend(stateBackend);
-
-		DataStream<Integer> stream1 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
-		DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
-
-		stream1.union(stream2)
-				.keyBy(new IdentityKeySelector<Integer>())
-				.map(new OnceFailingPartitionedSum(NUM_STRINGS))
-				.keyBy(0)
-				.addSink(new CounterSink());
-	}
-
-	@Override
-	public void postSubmit() {
-		// verify that we counted exactly right
-		for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.allSums.entrySet()) {
-			assertEquals(new Long(sum.getKey() * NUM_STRINGS / NUM_KEYS), sum.getValue());
-		}
-		for (Long count : CounterSink.allCounts.values()) {
-			assertEquals(new Long(NUM_STRINGS / NUM_KEYS), count);
-		}
-
-		assertEquals(NUM_KEYS, CounterSink.allCounts.size());
-		assertEquals(NUM_KEYS, OnceFailingPartitionedSum.allSums.size());
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Custom Functions
-	// --------------------------------------------------------------------------------------------
-
-	private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer> 
-		implements ListCheckpointed<Integer> {
-
-		private final long numElements;
-
-		private int index;
-		private int step;
-
-		private volatile boolean isRunning = true;
-
-		static final long[] counts = new long[PARALLELISM];
-
-		@Override
-		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
-		}
-
-		IntGeneratingSourceFunction(long numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws IOException {
-			step = getRuntimeContext().getNumberOfParallelSubtasks();
-			if (index == 0) {
-				index = getRuntimeContext().getIndexOfThisSubtask();
-			}
-		}
-
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			final Object lockingObject = ctx.getCheckpointLock();
-
-			while (isRunning && index < numElements) {
-
-				synchronized (lockingObject) {
-					index += step;
-					ctx.collect(index % NUM_KEYS);
-				}
-			}
-		}
-
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-
-		@Override
-		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
-			return Collections.singletonList(this.index);
-		}
-
-		@Override
-		public void restoreState(List<Integer> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
-			}
-			this.index = state.get(0);
-		}
-	}
-
-	private static class OnceFailingPartitionedSum extends RichMapFunction<Integer, Tuple2<Integer, Long>> {
-
-		private static Map<Integer, Long> allSums = new ConcurrentHashMap<Integer, Long>();
-		
-		private static volatile boolean hasFailed = false;
-
-		private final long numElements;
-
-		private long failurePos;
-		private long count;
-
-		private ValueState<Long> sum;
-
-		OnceFailingPartitionedSum(long numElements) {
-			this.numElements = numElements;
-			this.hasFailed = false;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws IOException {
-			long failurePosMin = (long) (0.6 * numElements / getRuntimeContext()
-					.getNumberOfParallelSubtasks());
-			long failurePosMax = (long) (0.8 * numElements / getRuntimeContext()
-					.getNumberOfParallelSubtasks());
-
-			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
-			count = 0;
-			sum = getRuntimeContext().getState(
-					new ValueStateDescriptor<>("my_state", Long.class, 0L));
-		}
-
-		@Override
-		public Tuple2<Integer, Long> map(Integer value) throws Exception {
-			count++;
-
-			if (!hasFailed && count >= failurePos) {
-				hasFailed = true;
-				throw new Exception("Test Failure");
-			}
-
-			long currentSum = sum.value() + value;
-			sum.update(currentSum);
-			allSums.put(value, currentSum);
-			return new Tuple2<Integer, Long>(value, currentSum);
-		}
-	}
-
-	private static class CounterSink extends RichSinkFunction<Tuple2<Integer, Long>> {
-
-		private static Map<Integer, Long> allCounts = new ConcurrentHashMap<Integer, Long>();
-		
-		private ValueState<NonSerializableLong> aCounts;
-		private ValueState<Long> bCounts;
-
-		@Override
-		public void open(Configuration parameters) throws IOException {
-			
-			aCounts = getRuntimeContext().getState(
-					new ValueStateDescriptor<>("a", NonSerializableLong.class, NonSerializableLong.of(0L)));
-			
-			bCounts = getRuntimeContext().getState(
-					new ValueStateDescriptor<>("b", Long.class, 0L));
-		}
-
-		@Override
-		public void invoke(Tuple2<Integer, Long> value) throws Exception {
-			long ac = aCounts.value().value;
-			long bc = bCounts.value();
-			assertEquals(ac, bc);
-			
-			long currentCount = ac + 1;
-			aCounts.update(NonSerializableLong.of(currentCount));
-			bCounts.update(currentCount);
-			
-			allCounts.put(value.f0, currentCount);
-		}
-	}
-	
-	public static class NonSerializableLong {
-		public Long value;
-
-		private NonSerializableLong(long value) {
-			this.value = value;
-		}
-
-		public static NonSerializableLong of(long value) {
-			return new NonSerializableLong(value);
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) return true;
-			if (o == null || getClass() != o.getClass()) return false;
-
-			NonSerializableLong that = (NonSerializableLong) o;
-
-			return value.equals(that.value);
-
-		}
-
-		@Override
-		public int hashCode() {
-			return value.hashCode();
-		}
-	}
-	
-	public static class IdentityKeySelector<T> implements KeySelector<T, T> {
-
-		@Override
-		public T getKey(T value) throws Exception {
-			return value;
-		}
-
-	}
-}


[08/13] flink git commit: [FLINK-6531] [checkpoints] Ensure proper classloading for user-defined checkpoint hooks

Posted by se...@apache.org.
[FLINK-6531] [checkpoints] Ensure proper classloading for user-defined checkpoint hooks

This closes #3868


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa8a90a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa8a90a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa8a90a5

Branch: refs/heads/master
Commit: aa8a90a588b4d72fc585731bea233495f0690364
Parents: 72aa262
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 10 22:03:49 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 14:11:07 2017 +0200

----------------------------------------------------------------------
 .../executiongraph/ExecutionGraphBuilder.java   |  13 +-
 .../tasks/JobCheckpointingSettings.java         |  12 +-
 .../CheckpointSettingsSerializableTest.java     | 122 +++++++++++++++++++
 .../api/graph/StreamingJobGraphGenerator.java   |  20 ++-
 .../WithMasterCheckpointHookConfigTest.java     |  10 +-
 5 files changed, 167 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa8a90a5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index aa28fbc..0e76cfb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -240,13 +241,21 @@ public class ExecutionGraphBuilder {
 
 			// instantiate the user-defined checkpoint hooks
 
-			final MasterTriggerRestoreHook.Factory[] hookFactories = snapshotSettings.getMasterHooks();
+			final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = snapshotSettings.getMasterHooks();
 			final List<MasterTriggerRestoreHook<?>> hooks;
 
-			if (hookFactories == null || hookFactories.length == 0) {
+			if (serializedHooks == null) {
 				hooks = Collections.emptyList();
 			}
 			else {
+				final MasterTriggerRestoreHook.Factory[] hookFactories;
+				try {
+					hookFactories = serializedHooks.deserializeValue(classLoader);
+				}
+				catch (IOException | ClassNotFoundException e) {
+					throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
+				}
+
 				hooks = new ArrayList<>(hookFactories.length);
 				for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
 					hooks.add(factory.create());

http://git-wip-us.apache.org/repos/asf/flink/blob/aa8a90a5/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
index 3dd037e..a30a2ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph.tasks;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -58,7 +59,8 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 	private final StateBackend defaultStateBackend;
 
 	/** (Factories for) hooks that are executed on the checkpoint coordinator */
-	private final MasterTriggerRestoreHook.Factory[] masterHooks;
+	@Nullable
+	private final SerializedValue<MasterTriggerRestoreHook.Factory[]> masterHooks;
 
 	/**
 	 * Flag indicating whether exactly once checkpoint mode has been configured.
@@ -96,7 +98,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 			int maxConcurrentCheckpoints,
 			ExternalizedCheckpointSettings externalizedCheckpointSettings,
 			@Nullable StateBackend defaultStateBackend,
-			@Nullable MasterTriggerRestoreHook.Factory[] masterHooks,
+			@Nullable SerializedValue<MasterTriggerRestoreHook.Factory[]> masterHooks,
 			boolean isExactlyOnce) {
 
 		// sanity checks
@@ -115,8 +117,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 		this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings);
 		this.defaultStateBackend = defaultStateBackend;
 		this.isExactlyOnce = isExactlyOnce;
-
-		this.masterHooks = masterHooks != null ? masterHooks : new MasterTriggerRestoreHook.Factory[0];
+		this.masterHooks = masterHooks;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -158,7 +159,8 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 		return defaultStateBackend;
 	}
 
-	public MasterTriggerRestoreHook.Factory[] getMasterHooks() {
+	@Nullable
+	public SerializedValue<MasterTriggerRestoreHook.Factory[]> getMasterHooks() {
 		return masterHooks;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aa8a90a5/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
new file mode 100644
index 0000000..0246180
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test validates that the checkpoint settings serialize correctly
+ * in the presence of user-defined objects.
+ */
+public class CheckpointSettingsSerializableTest extends TestLogger {
+
+	@Test
+	public void testClassLoaderForCheckpointHooks() throws Exception {
+		final ClassLoader classLoader = new URLClassLoader(new URL[0], getClass().getClassLoader());
+		final Serializable outOfClassPath = CommonTestUtils.createObjectForClassNotInClassPath(classLoader);
+
+		final MasterTriggerRestoreHook.Factory[] hooks = {
+				new TestFactory(outOfClassPath) };
+		final SerializedValue<MasterTriggerRestoreHook.Factory[]> serHooks = new SerializedValue<>(hooks);
+
+		final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
+				Collections.<JobVertexID>emptyList(),
+				Collections.<JobVertexID>emptyList(),
+				Collections.<JobVertexID>emptyList(),
+				1000L,
+				10000L,
+				0L,
+				1,
+				ExternalizedCheckpointSettings.none(),
+				null,
+				serHooks,
+				true);
+
+		final JobGraph jobGraph = new JobGraph(new JobID(), "test job");
+		jobGraph.setSnapshotSettings(checkpointingSettings);
+
+		// to serialize/deserialize the job graph to see if the behavior is correct under
+		// distributed execution
+		final JobGraph copy = CommonTestUtils.createCopySerializable(jobGraph);
+
+		final ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
+				null,
+				copy,
+				new Configuration(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				mock(SlotProvider.class),
+				classLoader,
+				new StandaloneCheckpointRecoveryFactory(),
+				Time.seconds(10),
+				new NoRestartStrategy(),
+				new UnregisteredMetricsGroup(),
+				10,
+				log);
+
+		assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestFactory implements MasterTriggerRestoreHook.Factory {
+
+		private static final long serialVersionUID = -612969579110202607L;
+		
+		private final Serializable payload;
+
+		TestFactory(Serializable payload) {
+			this.payload = payload;
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public <V> MasterTriggerRestoreHook<V> create() {
+			MasterTriggerRestoreHook<V> hook = mock(MasterTriggerRestoreHook.class);
+			when(hook.getIdentifier()).thenReturn("id");
+			return hook;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa8a90a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index b3a6cf8..6d1af72 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -66,6 +66,8 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -642,6 +644,22 @@ public class StreamingJobGraphGenerator {
 			}
 		}
 
+		// because the hooks can have user-defined code, they need to be stored as
+		// eagerly serialized values
+		final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks;
+		if (hooks.isEmpty()) {
+			serializedHooks = null;
+		} else {
+			try {
+				MasterTriggerRestoreHook.Factory[] asArray =
+						hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]);
+				serializedHooks = new SerializedValue<>(asArray);
+			}
+			catch (IOException e) {
+				throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
+			}
+		}
+
 		//  --- done, put it all together ---
 
 		JobCheckpointingSettings settings = new JobCheckpointingSettings(
@@ -650,7 +668,7 @@ public class StreamingJobGraphGenerator {
 				cfg.getMaxConcurrentCheckpoints(),
 				externalizedCheckpointSettings,
 				streamGraph.getStateBackend(),
-				hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]),
+				serializedHooks,
 				isExactlyOnce);
 
 		jobGraph.setSnapshotSettings(settings);

http://git-wip-us.apache.org/repos/asf/flink/blob/aa8a90a5/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
index b5a95eb..8065cf1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
@@ -80,10 +81,15 @@ public class WithMasterCheckpointHookConfigTest {
 			.addSink(new DiscardingSink<String>());
 
 		final JobGraph jg = env.getStreamGraph().getJobGraph();
-		assertEquals(hooks.size(), jg.getCheckpointingSettings().getMasterHooks().length);
+
+		SerializedValue<Factory[]> serializedConfiguredHooks = jg.getCheckpointingSettings().getMasterHooks();
+		assertNotNull(serializedConfiguredHooks);
+
+		Factory[] configuredHooks = serializedConfiguredHooks.deserializeValue(getClass().getClassLoader());
+		assertEquals(hooks.size(), configuredHooks.length);
 
 		// check that all hooks are contained and exist exactly once
-		for (Factory f : jg.getCheckpointingSettings().getMasterHooks()) {
+		for (Factory f : configuredHooks) {
 			MasterTriggerRestoreHook<?> hook = f.create();
 			assertTrue(hooks.remove(hook));
 		}


[03/13] flink git commit: [hotfix] [core] Minor code cleanups in JavaSerializer and SerializerTestBase

Posted by se...@apache.org.
[hotfix] [core] Minor code cleanups in JavaSerializer and SerializerTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70c48aaa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70c48aaa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70c48aaa

Branch: refs/heads/master
Commit: 70c48aaa65a33d1a375ffa5838cf0e9532a4c202
Parents: 6f8022e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 10 11:28:55 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 12:45:51 2017 +0200

----------------------------------------------------------------------
 .../java/typeutils/runtime/kryo/JavaSerializer.java |  4 ++--
 .../api/common/typeutils/SerializerTestBase.java    | 16 ++++++++++++++--
 2 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70c48aaa/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
index a51647c..711c814 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
@@ -45,7 +45,7 @@ public class JavaSerializer<T> extends Serializer<T> {
 
 	public JavaSerializer() {}
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	@Override
 	public void write(Kryo kryo, Output output, T o) {
 		try {
@@ -62,7 +62,7 @@ public class JavaSerializer<T> extends Serializer<T> {
 		}
 	}
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	@Override
 	public T read(Kryo kryo, Input input, Class aClass) {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/70c48aaa/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index a846703..f2879ac 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -55,7 +55,13 @@ import org.junit.Test;
 public abstract class SerializerTestBase<T> extends TestLogger {
 	
 	protected abstract TypeSerializer<T> createSerializer();
-	
+
+	/**
+	 * Gets the expected length for the serializer's {@link TypeSerializer#getLength()} method.
+	 * 
+	 * <p>The expected length should be positive, for fix-length data types, or {@code -1} for
+	 * variable-length types.
+	 */
 	protected abstract int getLength();
 	
 	protected abstract Class<T> getTypeClass();
@@ -124,9 +130,15 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 	
 	@Test
 	public void testGetLength() {
+		final int len = getLength();
+
+		if (len == 0) {
+			fail("Broken serializer test base - zero length cannot be the expected length");
+		}
+
 		try {
 			TypeSerializer<T> serializer = getSerializer();
-			assertEquals(getLength(), serializer.getLength());
+			assertEquals(len, serializer.getLength());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());


[06/13] flink git commit: [hotfix] [gelly] Support log output when running examples in the IDE

Posted by se...@apache.org.
[hotfix] [gelly] Support log output when running examples in the IDE

This adds the dependencies / config files necessary for log output


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aafa5794
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aafa5794
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aafa5794

Branch: refs/heads/master
Commit: aafa579402c4e1b960e929d1b449505ba820a60a
Parents: bc27963
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 10 16:39:53 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 12:45:52 2017 +0200

----------------------------------------------------------------------
 flink-libraries/flink-gelly-examples/pom.xml    | 15 ++++++++++
 .../src/main/resources/log4j.properties         | 23 ++++++++++++++++
 .../src/main/resources/logback.xml              | 29 ++++++++++++++++++++
 3 files changed, 67 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aafa5794/flink-libraries/flink-gelly-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml
index 6fb0979..009781f 100644
--- a/flink-libraries/flink-gelly-examples/pom.xml
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -75,6 +75,21 @@
 			<scope>compile</scope>
 		</dependency>
 
+		<!-- Add a logging Framework, to make the examples produce -->
+		<!--             logs when executing in the IDE            -->
+
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<scope>compile</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<scope>compile</scope>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/aafa5794/flink-libraries/flink-gelly-examples/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/resources/log4j.properties b/flink-libraries/flink-gelly-examples/src/main/resources/log4j.properties
new file mode 100644
index 0000000..da32ea0
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/aafa5794/flink-libraries/flink-gelly-examples/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/resources/logback.xml b/flink-libraries/flink-gelly-examples/src/main/resources/logback.xml
new file mode 100644
index 0000000..95f2d04
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/resources/logback.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file


[13/13] flink git commit: [hotfix] [build] Drop transitive jersey/jettison/servlet dependencies pulled via Hadoop

Posted by se...@apache.org.
[hotfix] [build] Drop transitive jersey/jettison/servlet dependencies pulled via Hadoop


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fca8caea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fca8caea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fca8caea

Branch: refs/heads/master
Commit: fca8caea70cb5b76d4d45bdac438d8e3c3fd3315
Parents: aa8a90a
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 11 15:12:10 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 16:24:45 2017 +0200

----------------------------------------------------------------------
 .../flink-shaded-hadoop2/pom.xml                | 61 ++++++++++++++++----
 1 file changed, 50 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fca8caea/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
index 9ac033e..a1159d6 100644
--- a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
+++ b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
@@ -70,20 +70,23 @@ under the License.
 				</exclusion>
 				<exclusion>
 					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
 					<artifactId>jsp-api-2.1</artifactId>
 				</exclusion>
 				<exclusion>
 					<groupId>org.mortbay.jetty</groupId>
 					<artifactId>jsp-2.1</artifactId>
 				</exclusion>
-
 				<exclusion>
 					<groupId>org.eclipse.jdt</groupId>
 					<artifactId>core</artifactId>
 				</exclusion>
 				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-core</artifactId>
 				</exclusion>
 				<exclusion>
 					<groupId>com.sun.jersey</groupId>
@@ -106,6 +109,10 @@ under the License.
 					<artifactId>jasper-runtime</artifactId>
 				</exclusion>
 				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
 					<groupId>javax.servlet.jsp</groupId>
 					<artifactId>jsp-api</artifactId>
 				</exclusion>
@@ -191,6 +198,10 @@ under the License.
 				</exclusion>
 				<exclusion>
 					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
 					<artifactId>jsp-api-2.1</artifactId>
 				</exclusion>
 				<exclusion>
@@ -202,8 +213,8 @@ under the License.
 					<artifactId>core</artifactId>
 				</exclusion>
 				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-core</artifactId>
 				</exclusion>
 				<exclusion>
 					<groupId>com.sun.jersey</groupId>
@@ -226,6 +237,10 @@ under the License.
 					<artifactId>jasper-runtime</artifactId>
 				</exclusion>
 				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
 					<groupId>javax.servlet.jsp</groupId>
 					<artifactId>jsp-api</artifactId>
 				</exclusion>
@@ -299,6 +314,10 @@ under the License.
 				</exclusion>
 				<exclusion>
 					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
 					<artifactId>jsp-api-2.1</artifactId>
 				</exclusion>
 				<exclusion>
@@ -322,8 +341,8 @@ under the License.
 					<artifactId>netty</artifactId>
 				</exclusion>
 				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-core</artifactId>
 				</exclusion>
 				<exclusion>
 					<groupId>com.sun.jersey</groupId>
@@ -346,6 +365,10 @@ under the License.
 					<artifactId>jasper-runtime</artifactId>
 				</exclusion>
 				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
 					<groupId>javax.servlet.jsp</groupId>
 					<artifactId>jsp-api</artifactId>
 				</exclusion>
@@ -419,6 +442,10 @@ under the License.
 				</exclusion>
 				<exclusion>
 					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
 					<artifactId>jsp-api-2.1</artifactId>
 				</exclusion>
 				<exclusion>
@@ -442,8 +469,8 @@ under the License.
 					<artifactId>netty</artifactId>
 				</exclusion>
 				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-core</artifactId>
 				</exclusion>
 				<exclusion>
 					<groupId>com.sun.jersey</groupId>
@@ -466,6 +493,10 @@ under the License.
 					<artifactId>jasper-runtime</artifactId>
 				</exclusion>
 				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
 					<groupId>javax.servlet.jsp</groupId>
 					<artifactId>jsp-api</artifactId>
 				</exclusion>
@@ -539,6 +570,10 @@ under the License.
 				</exclusion>
 				<exclusion>
 					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
 					<artifactId>jsp-api-2.1</artifactId>
 				</exclusion>
 				<exclusion>
@@ -562,8 +597,8 @@ under the License.
 					<artifactId>netty</artifactId>
 				</exclusion>
 				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-core</artifactId>
 				</exclusion>
 				<exclusion>
 					<groupId>com.sun.jersey</groupId>
@@ -586,6 +621,10 @@ under the License.
 					<artifactId>jasper-runtime</artifactId>
 				</exclusion>
 				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
 					<groupId>javax.servlet.jsp</groupId>
 					<artifactId>jsp-api</artifactId>
 				</exclusion>


[12/13] flink git commit: [hotfix] [tests] Share proper test mini cluster for tests in DistributedCacheTest

Posted by se...@apache.org.
[hotfix] [tests] Share proper test mini cluster for tests in DistributedCacheTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e10cdaa1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e10cdaa1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e10cdaa1

Branch: refs/heads/master
Commit: e10cdaa11c514d017d9cf92f88184f9e29d51fa9
Parents: 35c0871
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 10 20:57:08 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 14:11:07 2017 +0200

----------------------------------------------------------------------
 .../distributedCache/DistributedCacheTest.java  | 56 ++++++++++++++++----
 1 file changed, 46 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e10cdaa1/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
index 19bcf76..21aa40a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
@@ -22,19 +22,30 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
-import java.io.*;
+import static org.junit.Assert.assertTrue;
 
-import java.util.*;
 
+public class DistributedCacheTest extends AbstractTestBase {
 
-public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
 	public static final String data
 			= "machen\n"
 			+ "zeit\n"
@@ -42,6 +53,31 @@ public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
 			+ "keiner\n"
 			+ "meine\n";
 
+	private static final int PARALLELISM = 4;
+
+	private static LocalFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		cluster = TestBaseUtils.startCluster(1, PARALLELISM, false, false, true);
+		TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
+		TestEnvironment.setAsContext(cluster, PARALLELISM);
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		TestStreamEnvironment.unsetAsContext();
+		TestEnvironment.unsetAsContext();
+		TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public DistributedCacheTest() {
+		super(new Configuration());
+	}
+	
+	// ------------------------------------------------------------------------
 
 	@Test
 	public void testStreamingDistributedCache() throws Exception {
@@ -68,12 +104,12 @@ public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
 		@Override
 		public void open(Configuration conf) throws IOException {
 			File file = getRuntimeContext().getDistributedCache().getFile("cache_test");
-			BufferedReader reader = new BufferedReader(new FileReader(file));
-			String tempString;
-			while ((tempString = reader.readLine()) != null) {
-				wordList.add(tempString);
+			try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
+				String tempString;
+				while ((tempString= reader.readLine()) != null) {
+					wordList.add(tempString);
+				}
 			}
-			reader.close();
 		}
 
 		@Override