You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/08/10 11:22:55 UTC

[1/3] flink git commit: [FLINK-7026] [asm] Introduce flink-shaded-asm-5

Repository: flink
Updated Branches:
  refs/heads/master f59de67d9 -> d6126e7ca


[FLINK-7026] [asm] Introduce flink-shaded-asm-5

This closes #4494.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65391805
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65391805
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65391805

Branch: refs/heads/master
Commit: 65391805933f52e9c99de4210c2f422bdc652a15
Parents: f59de67
Author: zentol <ch...@apache.org>
Authored: Wed Jun 28 13:23:53 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Aug 10 11:36:26 2017 +0200

----------------------------------------------------------------------
 .../flink-connector-kinesis/pom.xml             |  4 ----
 flink-core/pom.xml                              | 12 +++++------
 .../api/java/typeutils/TypeExtractionUtils.java |  4 ++--
 flink-java/pom.xml                              |  5 ++---
 .../apache/flink/api/java/ClosureCleaner.java   |  9 ++++----
 .../flink/api/java/sca/ModifiedASMAnalyzer.java | 12 +++++------
 .../flink/api/java/sca/ModifiedASMFrame.java    | 10 ++++-----
 .../api/java/sca/NestedMethodAnalyzer.java      | 22 ++++++++++----------
 .../apache/flink/api/java/sca/TaggedValue.java  |  4 ++--
 .../apache/flink/api/java/sca/UdfAnalyzer.java  |  5 +++--
 .../flink/api/java/sca/UdfAnalyzerUtils.java    | 12 +++++------
 flink-libraries/flink-cep-scala/pom.xml         |  8 -------
 flink-libraries/flink-gelly-scala/pom.xml       |  7 -------
 flink-runtime/pom.xml                           | 11 +++++-----
 .../flink/runtime/util/DependencyVisitor.java   | 20 +++++++++---------
 .../flink/runtime/util/JarFileCreator.java      |  5 +++--
 flink-scala/pom.xml                             | 11 +++++-----
 .../apache/flink/api/scala/ClosureCleaner.scala |  4 ++--
 .../flink-shaded-curator-recipes/pom.xml        |  1 -
 flink-streaming-scala/pom.xml                   |  6 ------
 pom.xml                                         | 12 +++++------
 tools/maven/checkstyle.xml                      |  2 +-
 tools/travis_mvn_watchdog.sh                    |  2 +-
 23 files changed, 80 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 4628937..41daaa7 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -159,10 +159,6 @@ under the License.
 							<relocations combine.children="override">
 								<!-- DO NOT RELOCATE GUAVA IN THIS PACKAGE -->
 								<relocation>
-									<pattern>org.objectweb.asm</pattern>
-									<shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern>
-								</relocation>
-								<relocation>
 									<pattern>com.google.protobuf</pattern>
 									<shadedPattern>org.apache.flink.kinesis.shaded.com.google.protobuf</shadedPattern>
 								</relocation>

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 6a7e78d..7039e48 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -47,6 +47,11 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-asm</artifactId>
+		</dependency>
+
 		<!-- standard utilities -->
 		<dependency>
 			<groupId>org.apache.commons</groupId>
@@ -87,13 +92,6 @@ under the License.
 			<artifactId>snappy-java</artifactId>
 		</dependency>
 
-		<!-- ASM is needed for type extraction -->
-		<dependency>
-			<groupId>org.ow2.asm</groupId>
-			<artifactId>asm-all</artifactId>
-			<version>${asm.version}</version>
-		</dependency>
-
 		<!-- ================== test dependencies ================== -->
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
index c2a01c3..41d260d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -31,8 +31,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 
-import static org.objectweb.asm.Type.getConstructorDescriptor;
-import static org.objectweb.asm.Type.getMethodDescriptor;
+import static org.apache.flink.shaded.asm5.org.objectweb.asm.Type.getConstructorDescriptor;
+import static org.apache.flink.shaded.asm5.org.objectweb.asm.Type.getMethodDescriptor;
 
 @Internal
 public class TypeExtractionUtils {

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index a996cbe..11fd89f 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -48,9 +48,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.ow2.asm</groupId>
-			<artifactId>asm-all</artifactId>
-			<version>${asm.version}</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-asm</artifactId>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index dd4b5c5..6160094 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -22,10 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.util.InstantiationUtil;
 
-import org.objectweb.asm.ClassReader;
-import org.objectweb.asm.ClassVisitor;
-import org.objectweb.asm.MethodVisitor;
-import org.objectweb.asm.Opcodes;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.ClassVisitor;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.MethodVisitor;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.Opcodes;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
index ef7f18d..ee8242b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMAnalyzer.java
@@ -20,12 +20,12 @@ package org.apache.flink.api.java.sca;
 
 import org.apache.flink.annotation.Internal;
 
-import org.objectweb.asm.tree.AbstractInsnNode;
-import org.objectweb.asm.tree.InsnList;
-import org.objectweb.asm.tree.JumpInsnNode;
-import org.objectweb.asm.tree.analysis.Analyzer;
-import org.objectweb.asm.tree.analysis.Frame;
-import org.objectweb.asm.tree.analysis.Interpreter;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.AbstractInsnNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.InsnList;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.JumpInsnNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.Analyzer;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.Frame;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.Interpreter;
 
 import java.lang.reflect.Field;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
index 685a91f..f3bce3e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/ModifiedASMFrame.java
@@ -20,11 +20,11 @@ package org.apache.flink.api.java.sca;
 
 import org.apache.flink.annotation.Internal;
 
-import org.objectweb.asm.tree.AbstractInsnNode;
-import org.objectweb.asm.tree.analysis.AnalyzerException;
-import org.objectweb.asm.tree.analysis.Frame;
-import org.objectweb.asm.tree.analysis.Interpreter;
-import org.objectweb.asm.tree.analysis.Value;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.AbstractInsnNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.AnalyzerException;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.Frame;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.Interpreter;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.Value;
 
 import java.lang.reflect.Field;
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
index d56d60a..c4e3ac4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
@@ -21,17 +21,17 @@ package org.apache.flink.api.java.sca;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.sca.TaggedValue.Tag;
 
-import org.objectweb.asm.Type;
-import org.objectweb.asm.tree.AbstractInsnNode;
-import org.objectweb.asm.tree.FieldInsnNode;
-import org.objectweb.asm.tree.IntInsnNode;
-import org.objectweb.asm.tree.LdcInsnNode;
-import org.objectweb.asm.tree.MethodInsnNode;
-import org.objectweb.asm.tree.MethodNode;
-import org.objectweb.asm.tree.TypeInsnNode;
-import org.objectweb.asm.tree.analysis.AnalyzerException;
-import org.objectweb.asm.tree.analysis.BasicInterpreter;
-import org.objectweb.asm.tree.analysis.BasicValue;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.Type;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.AbstractInsnNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.FieldInsnNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.IntInsnNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.LdcInsnNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.MethodInsnNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.MethodNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.TypeInsnNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.AnalyzerException;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.BasicInterpreter;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.BasicValue;
 
 import java.util.ArrayList;
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
index aab2cf5..ece022a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.java.sca;
 
 import org.apache.flink.annotation.Internal;
 
-import org.objectweb.asm.Type;
-import org.objectweb.asm.tree.analysis.BasicValue;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.Type;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.BasicValue;
 
 import java.util.HashMap;
 import java.util.Iterator;

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
index 6022959..0a0f0f9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
@@ -37,8 +37,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.sca.TaggedValue.Input;
 
-import org.objectweb.asm.Type;
-import org.objectweb.asm.tree.MethodNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.Type;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.MethodNode;
+
 import org.slf4j.Logger;
 
 import java.lang.reflect.Method;

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
index 2844aea..8a76ed2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
@@ -26,12 +26,12 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 
-import org.objectweb.asm.ClassReader;
-import org.objectweb.asm.Type;
-import org.objectweb.asm.tree.ClassNode;
-import org.objectweb.asm.tree.MethodNode;
-import org.objectweb.asm.tree.analysis.BasicValue;
-import org.objectweb.asm.tree.analysis.Value;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.Type;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.ClassNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.MethodNode;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.BasicValue;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.tree.analysis.Value;
 
 import java.io.IOException;
 import java.io.InputStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-libraries/flink-cep-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/pom.xml b/flink-libraries/flink-cep-scala/pom.xml
index c9b4a46..7820c6b 100644
--- a/flink-libraries/flink-cep-scala/pom.xml
+++ b/flink-libraries/flink-cep-scala/pom.xml
@@ -51,14 +51,6 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
-        <!-- We need to add this explicitly due to shading -->
-
-        <dependency>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm</artifactId>
-            <version>${asm.version}</version>
-        </dependency>
-
         <!-- the dependencies below are already provided in Flink -->
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-libraries/flink-gelly-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/pom.xml b/flink-libraries/flink-gelly-scala/pom.xml
index 35b2188..59840b0 100644
--- a/flink-libraries/flink-gelly-scala/pom.xml
+++ b/flink-libraries/flink-gelly-scala/pom.xml
@@ -75,13 +75,6 @@ under the License.
             <artifactId>scala-compiler</artifactId>
             <scope>provided</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm</artifactId>
-            <version>${asm.version}</version>
-            <scope>provided</scope>
-        </dependency>
         
         <!-- test dependencies -->
         

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index e37e9c5..368e526 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -62,6 +62,11 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-asm</artifactId>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-lang3</artifactId>
 		</dependency>
@@ -83,12 +88,6 @@ under the License.
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
 		</dependency>
-
-		<dependency>
-			<groupId>org.ow2.asm</groupId>
-			<artifactId>asm-all</artifactId>
-			<version>${asm.version}</version>
-		</dependency>
 		
 		<dependency>
 			<groupId>org.scala-lang</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
index 8fef204..3a521a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
@@ -18,16 +18,16 @@
 
 package org.apache.flink.runtime.util;
 
-import org.objectweb.asm.AnnotationVisitor;
-import org.objectweb.asm.ClassVisitor;
-import org.objectweb.asm.Opcodes;
-import org.objectweb.asm.FieldVisitor;
-import org.objectweb.asm.MethodVisitor;
-import org.objectweb.asm.Type;
-import org.objectweb.asm.TypePath;
-import org.objectweb.asm.Label;
-import org.objectweb.asm.signature.SignatureReader;
-import org.objectweb.asm.signature.SignatureVisitor;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.AnnotationVisitor;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.ClassVisitor;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.Opcodes;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.FieldVisitor;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.MethodVisitor;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.Type;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.TypePath;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.Label;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.signature.SignatureReader;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.signature.SignatureVisitor;
 
 import java.util.HashSet;
 import java.util.Set;

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
index c877d74..ad7906a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.runtime.util;
 
-import org.objectweb.asm.ClassReader;
-import org.objectweb.asm.Opcodes;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader;
+import org.apache.flink.shaded.asm5.org.objectweb.asm.Opcodes;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 654e15c..c3cc554 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -46,6 +46,11 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-asm</artifactId>
+		</dependency>
+
+		<dependency>
 			<groupId>org.scala-lang</groupId>
 			<artifactId>scala-reflect</artifactId>
 		</dependency>
@@ -60,12 +65,6 @@ under the License.
 			<artifactId>scala-compiler</artifactId>
 		</dependency>
 
-		<dependency>
-			<groupId>org.ow2.asm</groupId>
-			<artifactId>asm</artifactId>
-			<version>${asm.version}</version>
-		</dependency>
-
 		<!-- test dependencies -->
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
index 53bffff..7965346 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
@@ -27,8 +27,8 @@ import org.slf4j.LoggerFactory
 import scala.collection.mutable.Map
 import scala.collection.mutable.Set
 
-import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
-import org.objectweb.asm.Opcodes._
+import org.apache.flink.shaded.asm5.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
+import org.apache.flink.shaded.asm5.org.objectweb.asm.Opcodes._
 
 /* This code is originally from the Apache Spark project. */
 @Internal

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml b/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
index fcc2759..b539f96 100644
--- a/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
+++ b/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
@@ -66,7 +66,6 @@ under the License.
 							<artifactSet combine.self="override">
 								<includes>
 									<include>com.google.guava:*</include>
-									<include>org.ow2.asm:*</include>
 									<include>org.apache.curator:*</include>
 								</includes>
 							</artifactSet>

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index 57ab1cb..df58d6d 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -63,12 +63,6 @@ under the License.
 			<artifactId>scala-compiler</artifactId>
 		</dependency>
 
-		<dependency>
-			<groupId>org.ow2.asm</groupId>
-			<artifactId>asm</artifactId>
-			<version>${asm.version}</version>
-		</dependency>
-
 		<!-- test dependencies -->
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bb157e9..6ed08fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,7 +109,6 @@ under the License.
 		<scala.version>2.11.11</scala.version>
 		<scala.binary.version>2.11</scala.binary.version>
 		<chill.version>0.7.4</chill.version>
-		<asm.version>5.0.4</asm.version>
 		<zookeeper.version>3.4.10</zookeeper.version>
 		<curator.version>2.12.0</curator.version>
 		<jackson.version>2.7.4</jackson.version>
@@ -256,6 +255,12 @@ under the License.
 				<version>3.3.2</version>
 			</dependency>
 
+			<dependency>
+				<groupId>org.apache.flink</groupId>
+				<artifactId>flink-shaded-asm</artifactId>
+				<version>5.0.4-1.0</version>
+			</dependency>
+
 			<!-- Make sure we use a consistent avro version throughout the project -->
 			<dependency>
 				<groupId>org.apache.avro</groupId>
@@ -1255,7 +1260,6 @@ under the License.
 									-->
 									<include>org.apache.flink:force-shading</include>
 									<include>com.google.guava:*</include>
-									<include>org.ow2.asm:*</include>
 								</includes>
 							</artifactSet>
 							<relocations>
@@ -1267,10 +1271,6 @@ under the License.
 										<exclude>com.google.inject.**</exclude>
 									</excludes>
 								</relocation>
-								<relocation>
-									<pattern>org.objectweb.asm</pattern>
-									<shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern>
-								</relocation>
 							</relocations>
 							<transformers>
 								<!-- The service transformer is needed to merge META-INF/services files -->

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/tools/maven/checkstyle.xml
----------------------------------------------------------------------
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index ef383f3..3f78054 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -211,7 +211,7 @@ This file is based on the checkstyle file of Apache Beam.
     </module>
 
     <module name="IllegalImport">
-      <property name="illegalPkgs" value="autovalue.shaded, avro.shaded, com.google.api.client.repackaged, com.google.appengine.repackaged, io.netty"/>
+      <property name="illegalPkgs" value="autovalue.shaded, avro.shaded, com.google.api.client.repackaged, com.google.appengine.repackaged, io.netty, org.objectweb.asm"/>
     </module>
 
     <module name="RedundantModifier">

http://git-wip-us.apache.org/repos/asf/flink/blob/65391805/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index fca3c7d..a379845 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -272,7 +272,7 @@ check_shaded_artifacts() {
 	ASM=`cat allClasses | grep '^org/objectweb/asm/' | wc -l`
 	if [ $ASM != "0" ]; then
 		echo "=============================================================================="
-		echo "Detected $ASM asm dependencies in fat jar"
+		echo "Detected $ASM unshaded asm dependencies in fat jar"
 		echo "=============================================================================="
 		return 1
 	fi


[2/3] flink git commit: [FLINK-6494] [RM][Yarn][Mesos] Migrate ResourceManager/Yarn/Mesos configuration options

Posted by ch...@apache.org.
[FLINK-6494] [RM][Yarn][Mesos] Migrate ResourceManager/Yarn/Mesos configuration options

This closes #4075.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d63d704e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d63d704e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d63d704e

Branch: refs/heads/master
Commit: d63d704efb2bc28dd7a33ee9027f4d447acbd209
Parents: 6539180
Author: zjureel <zj...@gmail.com>
Authored: Thu Jun 8 11:38:56 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu Aug 10 11:36:30 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  89 ++++++++++++++--
 .../configuration/ResourceManagerOptions.java   |  39 +++++++
 .../flink/mesos/configuration/MesosOptions.java | 106 +++++++++++++++++++
 .../MesosApplicationMasterRunner.java           |  31 +++---
 .../MesosFlinkResourceManager.java              |  10 +-
 .../flink/mesos/util/MesosArtifactServer.java   |   5 +-
 .../MesosFlinkResourceManagerTest.java          |   6 +-
 .../ContaineredTaskManagerParameters.java       |  14 ++-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   9 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   8 +-
 .../java/org/apache/flink/yarn/UtilsTest.java   |  19 ++--
 .../YARNSessionCapacitySchedulerITCase.java     |   6 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |  10 +-
 .../main/java/org/apache/flink/yarn/Utils.java  |  17 +--
 .../flink/yarn/YarnApplicationMasterRunner.java |  17 +--
 .../flink/yarn/YarnFlinkResourceManager.java    |  10 +-
 .../apache/flink/yarn/YarnResourceManager.java  |   3 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   3 +-
 .../yarn/configuration/YarnConfigOptions.java   |  65 ++++++++++++
 .../yarn/entrypoint/YarnEntrypointUtils.java    |  13 +--
 .../org/apache/flink/yarn/YarnJobManager.scala  |   5 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   |   3 +-
 22 files changed, 375 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f817344..4c6c62a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -135,7 +135,9 @@ public final class ConfigConstants {
 	/**
 	 * The config parameter defining the network port to connect to
 	 * for communication with the resource manager.
+	 * @deprecated Use {@link ResourceManagerOptions#IPC_PORT} instead.
 	 */
+	@Deprecated
 	public static final String RESOURCE_MANAGER_IPC_PORT_KEY = "resourcemanager.rpc.port";
 
 	/**
@@ -349,12 +351,16 @@ public final class ConfigConstants {
 	/**
 	 * Percentage of heap space to remove from containers (YARN / Mesos), to compensate
 	 * for other JVM memory usage.
+	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead.
 	 */
+	@Deprecated
 	public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = "containerized.heap-cutoff-ratio";
 
 	/**
 	 * Minimum amount of heap memory to remove in containers, as a safety margin.
+	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead.
 	 */
+	@Deprecated
 	public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = "containerized.heap-cutoff-min";
 
 	/**
@@ -362,13 +368,17 @@ public final class ConfigConstants {
 	 * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
 	 * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 	 * in the flink-conf.yaml.
+	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_MASTER_ENV_PREFIX} instead.
 	 */
+	@Deprecated
 	public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env.";
 
 	/**
 	 * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows
 	 * setting custom environment variables for the workers (TaskManagers)
+	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_TASK_MANAGER_ENV_PREFIX} instead.
 	 */
+	@Deprecated
 	public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";
 
 	
@@ -376,7 +386,9 @@ public final class ConfigConstants {
 
 	/**
 	 * The vcores exposed by YARN.
+	 * @deprecated in favor of {@code YarnConfigOptions#VCORES}.
 	 */
+	@Deprecated
 	public static final String YARN_VCORES = "yarn.containers.vcores";
 
 	/**
@@ -406,7 +418,9 @@ public final class ConfigConstants {
 	 * the YARN session / job on YARN.
 	 *
 	 * By default, we take the number of of initially requested containers.
+	 * @deprecated in favor of {@code YarnConfigOptions#MAX_FAILED_CONTAINERS}.
 	 */
+	@Deprecated
 	public static final String YARN_MAX_FAILED_CONTAINERS = "yarn.maximum-failed-containers";
 
 	/**
@@ -414,14 +428,18 @@ public final class ConfigConstants {
 	 * availability mode. This value is usually limited by YARN.
 	 *
 	 * By default, it's 1 in the standalone case and 2 in the high availability case.
+	 * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_ATTEMPTS}.
 	 */
+	@Deprecated
 	public static final String YARN_APPLICATION_ATTEMPTS = "yarn.application-attempts";
 
 	/**
 	 * The heartbeat interval between the Application Master and the YARN Resource Manager.
 	 *
 	 * The default value is 5 (seconds).
+	 * @deprecated in favor of {@code YarnConfigOptions#HEARTBEAT_DELAY_SECONDS}.
 	 */
+	@Deprecated
 	public static final String YARN_HEARTBEAT_DELAY_SECONDS = "yarn.heartbeat-delay";
 
 	/**
@@ -429,8 +447,10 @@ public final class ConfigConstants {
 	 * processing slots is written into a properties file, so that the Flink client is able
 	 * to pick those details up.
 	 * This configuration parameter allows changing the default location of that file (for example
-	 * for environments sharing a Flink installation between users)
+	 * for environments sharing a Flink installation between users).
+	 * @deprecated in favor of {@code YarnConfigOptions#PROPERTIES_FILE_LOCATION}.
 	 */
+	@Deprecated
 	public static final String YARN_PROPERTIES_FILE_LOCATION = "yarn.properties-file.location";
 
 	/**
@@ -474,12 +494,16 @@ public final class ConfigConstants {
 	 * or a list of ranges and or points: "50100-50200,50300-50400,51234"
 	 *
 	 * Setting the port to 0 will let the OS choose an available port.
+	 * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_MASTER_PORT}.
 	 */
+	@Deprecated
 	public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port";
 
 	/**
 	 * A comma-separated list of strings to use as YARN application tags.
+	 * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_TAGS}.
 	 */
+	@Deprecated
 	public static final String YARN_APPLICATION_TAGS = "yarn.tags";
 
 
@@ -487,7 +511,9 @@ public final class ConfigConstants {
 
 	/**
 	 * The initial number of Mesos tasks to allocate.
+	 * @deprecated in favor of {@code MesosOptions#INITIAL_TASKS}.
 	 */
+	@Deprecated
 	public static final String MESOS_INITIAL_TASKS = "mesos.initial-tasks";
 
 	/**
@@ -495,7 +521,9 @@ public final class ConfigConstants {
 	 * the Mesos session / job on Mesos.
 	 *
 	 * By default, we take the number of of initially requested tasks.
+	 * @deprecated in favor of {@code MesosOptions#MAX_FAILED_TASKS}.
 	 */
+	@Deprecated
 	public static final String MESOS_MAX_FAILED_TASKS = "mesos.maximum-failed-tasks";
 
 	/**
@@ -510,36 +538,53 @@ public final class ConfigConstants {
 	 *     file:///path/to/file (where file contains one of the above)
 	 * }
 	 * </pre>
-	 *
+	 * @deprecated in favor of {@code MesosOptions#MASTER_URL}.
 	 */
+	@Deprecated
 	public static final String MESOS_MASTER_URL = "mesos.master";
 
 	/**
 	 * The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down.
 	 *
 	 * The default value is 600 (seconds).
+	 * @deprecated in favor of {@code MesosOptions#FAILOVER_TIMEOUT_SECONDS}.
 	 */
+	@Deprecated
 	public static final String MESOS_FAILOVER_TIMEOUT_SECONDS = "mesos.failover-timeout";
 
 	/**
 	 * The config parameter defining the Mesos artifact server port to use.
 	 * Setting the port to 0 will let the OS choose an available port.
+	 * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_PORT_KEY}.
 	 */
+	@Deprecated
 	public static final String MESOS_ARTIFACT_SERVER_PORT_KEY = "mesos.resourcemanager.artifactserver.port";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_NAME}. */
+	@Deprecated
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "mesos.resourcemanager.framework.name";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_ROLE}. */
+	@Deprecated
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "mesos.resourcemanager.framework.role";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_PRINCIPAL}. */
+	@Deprecated
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL = "mesos.resourcemanager.framework.principal";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_SECRET}. */
+	@Deprecated
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = "mesos.resourcemanager.framework.secret";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_USER}. */
+	@Deprecated
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "mesos.resourcemanager.framework.user";
 
 	/**
 	 * Config parameter to override SSL support for the Artifact Server
+	 * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}.
 	 */
+	@Deprecated
 	public static final String MESOS_ARTIFACT_SERVER_SSL_ENABLED = "mesos.resourcemanager.artifactserver.ssl.enabled";
 
 	// ------------------------ Hadoop Configuration ------------------------
@@ -1218,7 +1263,9 @@ public final class ConfigConstants {
 
 	/**
 	 * The default network port of the resource manager.
+	 * @deprecated Use {@link ResourceManagerOptions#IPC_PORT} instead.
 	 */
+	@Deprecated
 	public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;
 
 	/**
@@ -1378,13 +1425,17 @@ public final class ConfigConstants {
 	/**
 	 * Minimum amount of memory to subtract from the process memory to get the TaskManager
 	 * heap size. We came up with these values experimentally.
+	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead.
 	 */
+	@Deprecated
 	public static final int DEFAULT_YARN_HEAP_CUTOFF = 600;
 
 	/**
 	 * Relative amount of memory to subtract from Java process memory to get the TaskManager
-	 * heap size
+	 * heap size.
+	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead.
 	 */
+	@Deprecated
 	public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
 
 	/**
@@ -1395,31 +1446,49 @@ public final class ConfigConstants {
 
 	/**
 	 * Default port for the application master is 0, which means
-	 * the operating system assigns an ephemeral port
+	 * the operating system assigns an ephemeral port.
+	 * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_MASTER_PORT}.
 	 */
+	@Deprecated
 	public static final String DEFAULT_YARN_JOB_MANAGER_PORT = "0";
 
 	// ------ Mesos-Specific Configuration ------
 	// For more configuration entries please see {@code MesosTaskManagerParameters}.
 
-	/** The default failover timeout provided to Mesos (10 mins) */
+	/**
+	 * The default failover timeout provided to Mesos (10 mins)
+	 * @deprecated in favor of {@code MesosOptions#FAILOVER_TIMEOUT_SECONDS}.
+	 */
+	@Deprecated
 	public static final int DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS = 10 * 60;
 
 	/**
 	 * The default network port to listen on for the Mesos artifact server.
+	 * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_PORT_KEY}.
 	 */
+	@Deprecated
 	public static final int DEFAULT_MESOS_ARTIFACT_SERVER_PORT = 0;
 
 	/**
 	 * The default Mesos framework name for the ResourceManager to use.
+	 * @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_NAME}.
 	 */
+	@Deprecated
 	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "Flink";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_ROLE}. */
+	@Deprecated
 	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_USER}. */
+	@Deprecated
 	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
 
-	/** Default value to override SSL support for the Artifact Server */
+	/**
+	 * Default value to override SSL support for the Artifact Server.
+	 * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}.
+	 */
+	@Deprecated
 	public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
 
 	// ------------------------ File System Behavior ------------------------
@@ -1659,8 +1728,16 @@ public final class ConfigConstants {
 
 	public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1;
 
+	/**
+	 * @deprecated Use {@link ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead.
+	 */
+	@Deprecated
 	public static final String LOCAL_NUMBER_RESOURCE_MANAGER = "local.number-resourcemanager";
 
+	/**
+	 * @deprecated Use {@link ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead.
+	 */
+	@Deprecated
 	public static final int DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER = 1;
 
 	public static final String LOCAL_START_WEBSERVER = "local.start-webserver";

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index 6a09f19..e2d96bb 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -33,6 +33,45 @@ public class ResourceManagerOptions {
 		.key("resourcemanager.job.timeout")
 		.defaultValue("5 minutes");
 
+	public static final ConfigOption<Integer> LOCAL_NUMBER_RESOURCE_MANAGER = ConfigOptions
+		.key("local.number-resourcemanager")
+		.defaultValue(1);
+
+	public static final ConfigOption<Integer> IPC_PORT = ConfigOptions
+		.key("resourcemanager.rpc.port")
+		.defaultValue(0);
+
+	/**
+	 * Percentage of heap space to remove from containers (YARN / Mesos), to compensate
+	 * for other JVM memory usage.
+	 */
+	public static final ConfigOption<Float> CONTAINERIZED_HEAP_CUTOFF_RATIO = ConfigOptions
+		.key("containerized.heap-cutoff-ratio")
+		.defaultValue(0.25f)
+		.withDeprecatedKeys("yarn.heap-cutoff-ratio");
+
+	/**
+	 * Minimum amount of heap memory to remove in containers, as a safety margin.
+	 */
+	public static final ConfigOption<Integer> CONTAINERIZED_HEAP_CUTOFF_MIN = ConfigOptions
+		.key("containerized.heap-cutoff-min")
+		.defaultValue(600)
+		.withDeprecatedKeys("yarn.heap-cutoff-min");
+
+	/**
+	 * Prefix for passing custom environment variables to Flink's master process.
+	 * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
+	 * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
+	 * in the flink-conf.yaml.
+	 */
+	public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env.";
+
+	/**
+	 * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows
+	 * setting custom environment variables for the workers (TaskManagers)
+	 */
+	public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";
+	
 	// ---------------------------------------------------------------------------------------------
 
 	/** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
new file mode 100644
index 0000000..8616cad
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.configuration;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to mesos settings.
+ */
+public class MesosOptions {
+
+	/**
+	 * The initial number of Mesos tasks to allocate.
+	 */
+	public static final ConfigOption<Integer> INITIAL_TASKS =
+		key("mesos.initial-tasks")
+			.defaultValue(0);
+
+	/**
+	 * The maximum number of failed Mesos tasks before entirely stopping
+	 * the Mesos session / job on Mesos.
+	 *
+	 * <p>By default, we take the number of initially requested tasks.
+	 */
+	public static final ConfigOption<Integer> MAX_FAILED_TASKS =
+		key("mesos.maximum-failed-tasks")
+			.defaultValue(-1);
+
+	/**
+	 * The Mesos master URL.
+	 *
+	 * <p>The value should be in one of the following forms:
+	 * <pre>
+	 * {@code
+	 *     host:port
+	 *     zk://host1:port1,host2:port2,.../path
+	 *     zk://username:password@host1:port1,host2:port2,.../path
+	 *     file:///path/to/file (where file contains one of the above)
+	 * }
+	 * </pre>
+	 */
+	public static final ConfigOption<String> MASTER_URL =
+		key("mesos.master")
+			.noDefaultValue();
+
+	/**
+	 * The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down.
+	 */
+	public static final ConfigOption<Integer> FAILOVER_TIMEOUT_SECONDS =
+		key("mesos.failover-timeout")
+			.defaultValue(600);
+
+	/**
+	 * The config parameter defining the Mesos artifact server port to use.
+	 * Setting the port to 0 will let the OS choose an available port.
+	 */
+	public static final ConfigOption<Integer> ARTIFACT_SERVER_PORT =
+		key("mesos.resourcemanager.artifactserver.port")
+			.defaultValue(0);
+
+	public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_NAME =
+		key("mesos.resourcemanager.framework.name")
+			.defaultValue("Flink");
+
+	public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_ROLE =
+		key("mesos.resourcemanager.framework.role")
+			.defaultValue("*");
+
+	public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_PRINCIPAL =
+		key("mesos.resourcemanager.framework.principal")
+			.noDefaultValue();
+
+	public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_SECRET =
+		key("mesos.resourcemanager.framework.secret")
+			.noDefaultValue();
+
+	public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_USER =
+		key("mesos.resourcemanager.framework.user")
+			.defaultValue("");
+
+	/**
+	 * Config parameter to override SSL support for the Artifact Server.
+	 */
+	public static final ConfigOption<Boolean> ARTIFACT_SERVER_SSL_ENABLED =
+		key("mesos.resourcemanager.artifactserver.ssl.enabled")
+			.defaultValue(true);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index d4e2f0d..260b7f3 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
@@ -264,8 +264,7 @@ public class MesosApplicationMasterRunner {
 
 			// try to start the artifact server
 			LOG.debug("Starting Artifact Server");
-			final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
-				ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
+			final int artifactServerPort = config.getInteger(MesosOptions.ARTIFACT_SERVER_PORT);
 			final String artifactServerPrefix = UUID.randomUUID().toString();
 			artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config);
 
@@ -491,42 +490,38 @@ public class MesosApplicationMasterRunner {
 			.setHostname(hostname);
 		Protos.Credential.Builder credential = null;
 
-		if (!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
-			throw new IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be configured.");
+		if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
+			throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured.");
 		}
-		String masterUrl = flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null);
+		String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL);
 
 		Duration failoverTimeout = FiniteDuration.apply(
 			flinkConfig.getInteger(
-				ConfigConstants.MESOS_FAILOVER_TIMEOUT_SECONDS,
-				ConfigConstants.DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS),
+				MesosOptions.FAILOVER_TIMEOUT_SECONDS),
 			TimeUnit.SECONDS);
 		frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
 
 		frameworkInfo.setName(flinkConfig.getString(
-			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_NAME,
-			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME));
+			MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
 
 		frameworkInfo.setRole(flinkConfig.getString(
-			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE,
-			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE));
+			MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
 
 		frameworkInfo.setUser(flinkConfig.getString(
-			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
-			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
+			MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
 
-		if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+		if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
 			frameworkInfo.setPrincipal(flinkConfig.getString(
-				ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
+				MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL));
 
 			credential = Protos.Credential.newBuilder();
 			credential.setPrincipal(frameworkInfo.getPrincipal());
 
 			// some environments use a side-channel to communicate the secret to Mesos,
 			// and thus don't set the 'secret' configuration setting
-			if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+			if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {
 				credential.setSecret(flinkConfig.getString(
-					ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
+					MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index d6b5c9d..05d7e1f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -19,9 +19,9 @@
 package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -641,7 +641,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 				String msg = "Stopping Mesos session because the number of failed tasks ("
 					+ failedTasksSoFar + ") exceeded the maximum failed tasks ("
 					+ maxFailedTasks + "). This number is controlled by the '"
-					+ ConfigConstants.MESOS_MAX_FAILED_TASKS + "' configuration setting. "
+					+ MesosOptions.MAX_FAILED_TASKS.key() + "' configuration setting. "
 					+ "By default its the number of requested tasks.";
 
 				LOG.error(msg);
@@ -757,18 +757,18 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			Logger log) {
 
 		final int numInitialTaskManagers = flinkConfig.getInteger(
-			ConfigConstants.MESOS_INITIAL_TASKS, 0);
+			MesosOptions.INITIAL_TASKS);
 		if (numInitialTaskManagers >= 0) {
 			log.info("Mesos framework to allocate {} initial tasks",
 				numInitialTaskManagers);
 		}
 		else {
 			throw new IllegalConfigurationException("Invalid value for " +
-				ConfigConstants.MESOS_INITIAL_TASKS + ", which must be at least zero.");
+				MesosOptions.INITIAL_TASKS.key() + ", which must be at least zero.");
 		}
 
 		final int maxFailedTasks = flinkConfig.getInteger(
-			ConfigConstants.MESOS_MAX_FAILED_TASKS, numInitialTaskManagers);
+			MesosOptions.MAX_FAILED_TASKS.key(), numInitialTaskManagers);
 		if (maxFailedTasks >= 0) {
 			log.info("Mesos framework tolerates {} failed tasks before giving up",
 				maxFailedTasks);

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 2627d25..3a6f77a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.mesos.util;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.runtime.net.SSLUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -115,8 +115,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 
 		// Config to enable https access to the artifact server
 		boolean enableSSL = config.getBoolean(
-				ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED,
-				ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) &&
+				MesosOptions.ARTIFACT_SERVER_SSL_ENABLED) &&
 				SSLUtils.getSSLEnabled(config);
 
 		if (enableSSL) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index af3f7ef..8bfb4d1 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -105,8 +105,8 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 		private static final long serialVersionUID = -952579203067648838L;
 
 		{
-			setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
-			setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
+			setInteger(MesosOptions.MAX_FAILED_TASKS, -1);
+			setInteger(MesosOptions.INITIAL_TASKS, 0);
 	}};
 
 	@BeforeClass

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index 9d679cf..7e9891f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 
 import java.util.HashMap;
@@ -115,22 +115,20 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 		// (1) compute how much memory we subtract from the total memory, to get the Java memory
 
 		final float memoryCutoffRatio = config.getFloat(
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
-			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
+			ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
 
 		final int minCutoff = config.getInteger(
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
-			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
+			ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
 
 		if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
 			throw new IllegalArgumentException("The configuration value '"
-				+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given="
+				+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given="
 				+ memoryCutoffRatio);
 		}
 
 		if (minCutoff >= containerMemoryMB) {
 			throw new IllegalArgumentException("The configuration value '"
-				+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN + "'='" + minCutoff
+				+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff
 				+ "' is larger than the total container memory " + containerMemoryMB);
 		}
 
@@ -147,7 +145,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
 		// (3) obtain the additional environment variables from the configuration
 		final HashMap<String, String> envVars = new HashMap<>();
-		final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
+		final String prefix = ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
 		
 		for (String key : config.keySet()) {
 			if (key.startsWith(prefix) && key.length() > prefix.length()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index c5c87ac..6f13b9f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -28,7 +28,7 @@ import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, TaskManagerOptions}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, ResourceManagerOptions, TaskManagerOptions}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
@@ -168,8 +168,7 @@ abstract class FlinkMiniCluster(
 
   def getNumberOfResourceManagers: Int = {
     originalConfiguration.getInteger(
-      ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER,
-      ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER
+      ResourceManagerOptions.LOCAL_NUMBER_RESOURCE_MANAGER
     )
   }
 
@@ -226,8 +225,8 @@ abstract class FlinkMiniCluster(
     if (useSingleActorSystem) {
       AkkaUtils.getAkkaConfig(originalConfiguration, None)
     } else {
-      val port = originalConfiguration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-                                                  ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+      val port = originalConfiguration.getInteger(
+        ResourceManagerOptions.IPC_PORT)
 
       val resolvedPort = if(port != 0) port + index else port
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 27a8ee1..0ae00a9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
 import akka.actor.{ActorRef, ActorSystem, Props}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, TaskManagerOptions}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, ResourceManagerOptions, TaskManagerOptions}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -183,11 +183,11 @@ class LocalFlinkMiniCluster(
     val resourceManagerName = getResourceManagerName(index)
 
     val resourceManagerPort = config.getInteger(
-      ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+      ResourceManagerOptions.IPC_PORT)
 
     if(resourceManagerPort > 0) {
-      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
+      config.setInteger(ResourceManagerOptions.IPC_PORT,
+        resourceManagerPort + index)
     }
 
     val resourceManagerProps = getResourceManagerProps(

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 82a656c..275bcc9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
 
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Level;
@@ -62,8 +63,8 @@ public class UtilsTest {
 	@Test
 	public void testHeapCutoff() {
 		Configuration conf = new Configuration();
-		conf.setDouble(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15);
-		conf.setInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
+		conf.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15F);
+		conf.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
 
 		Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf));
 		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf));
@@ -71,14 +72,14 @@ public class UtilsTest {
 		// test different configuration
 		Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
 
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, "1000");
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.1");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key(), "1000");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.1");
 		Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
 
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.5");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.5");
 		Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
 
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 
 		// test also deprecated keys
@@ -93,21 +94,21 @@ public class UtilsTest {
 	@Test(expected = IllegalArgumentException.class)
 	public void illegalArgument() {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1.1");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1.1");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void illegalArgumentNegative() {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "-0.01");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "-0.01");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void tooMuchCutoff() {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "6000");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "6000");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 2e88836..d85aa97 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.client.JobClient;
@@ -26,6 +25,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -145,7 +145,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"-nm", "customName",
 				"-Dfancy-configuration-value=veryFancy",
 				"-Dyarn.maximum-failed-containers=3",
-				"-D" + ConfigConstants.YARN_VCORES + "=2"},
+				"-D" + YarnConfigOptions.VCORES.key() + "=2"},
 			"Number of connected TaskManagers changed to 1. Slots available: 3",
 			RunTypes.YARN_SESSION);
 
@@ -186,7 +186,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 
 			Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
 			Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
-			Assert.assertEquals("2", parsedConfig.get(ConfigConstants.YARN_VCORES));
+			Assert.assertEquals("2", parsedConfig.get(YarnConfigOptions.VCORES.key()));
 
 			// -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface
 			// first, get the hostname/port

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index dfc8b6a..55dc47f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -253,7 +253,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// The number of cores can be configured in the config.
 		// If not configured, it is set to the number of task slots
 		int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
-		int configuredVcores = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, clusterSpecification.getSlotsPerTaskManager());
+		int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
 		// don't configure more than the maximum configured number of vcores
 		if (configuredVcores > numYarnVcores) {
 			throw new IllegalConfigurationException(
@@ -261,7 +261,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 						" but Yarn only has %d virtual cores available. Please note that the number" +
 						" of virtual cores is set to the number of task slots by default unless configured" +
 						" in the Flink config with '%s.'",
-					configuredVcores, numYarnVcores, ConfigConstants.YARN_VCORES));
+					configuredVcores, numYarnVcores, YarnConfigOptions.VCORES.key()));
 		}
 
 		// check if required Hadoop environment variables are set. If not, warn user
@@ -677,7 +677,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			// activate re-execution of failed applications
 			appContext.setMaxAppAttempts(
 				flinkConfiguration.getInteger(
-					ConfigConstants.YARN_APPLICATION_ATTEMPTS,
+					YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
 					YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
 
 			activateHighAvailabilitySupport(appContext);
@@ -685,7 +685,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			// set number of application retries to 1 in the default case
 			appContext.setMaxAppAttempts(
 				flinkConfiguration.getInteger(
-					ConfigConstants.YARN_APPLICATION_ATTEMPTS,
+					YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
 					1));
 		}
 
@@ -1135,7 +1135,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		IllegalAccessException {
 
 		final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
-		final String tagsString = flinkConfiguration.getString(ConfigConstants.YARN_APPLICATION_TAGS, "");
+		final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);
 
 		final Set<String> applicationTags = new HashSet<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 662617f..98d27ab 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -82,24 +82,17 @@ public final class Utils {
 	 */
 	public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) {
 
-		BootstrapTools.substituteDeprecatedConfigKey(conf,
-			ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-		BootstrapTools.substituteDeprecatedConfigKey(conf,
-			ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
-		float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
-			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
-		int minCutoff = conf.getInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
-			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
+		float memoryCutoffRatio = conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
+		int minCutoff = conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
 
 		if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
 			throw new IllegalArgumentException("The configuration value '"
-				+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO
+				+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key()
 				+ "' must be between 0 and 1. Value given=" + memoryCutoffRatio);
 		}
 		if (minCutoff > memory) {
 			throw new IllegalArgumentException("The configuration value '"
-				+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN
+				+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key()
 				+ "' is higher (" + minCutoff + ") than the requested amount of memory " + memory);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index dccbb71..e951df4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -43,6 +44,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -299,8 +301,7 @@ public class YarnApplicationMasterRunner {
 			// try to start the actor system, JobManager and JobManager actor system
 			// using the port range definition from the config.
 			final String amPortRange = config.getString(
-					ConfigConstants.YARN_APPLICATION_MASTER_PORT,
-					ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+					YarnConfigOptions.APPLICATION_MASTER_PORT);
 
 			actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, amPortRange, LOG);
 
@@ -518,21 +519,13 @@ public class YarnApplicationMasterRunner {
 		// corresponding generic config keys instead. that way, later code needs not
 		// deal with deprecated config keys
 
-		BootstrapTools.substituteDeprecatedConfigKey(configuration,
-			ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-
-		BootstrapTools.substituteDeprecatedConfigKey(configuration,
-			ConfigConstants.YARN_HEAP_CUTOFF_MIN,
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
 		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
 			ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
-			ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+			ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
 
 		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
 			ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
-			ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+			ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
 
 		return configuration;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 66e44a6..4d8142f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -28,6 +27,7 @@ import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.messages.ContainersAllocated;
 import org.apache.flink.yarn.messages.ContainersComplete;
 
@@ -337,7 +337,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 
 			// Resource requirements for worker containers
 			int taskManagerSlots = taskManagerParameters.numSlots();
-			int vcores = config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1));
+			int vcores = config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1));
 			Resource capability = Resource.newInstance(containerMemorySizeMB, vcores);
 
 			resourceManagerClient.addContainerRequest(
@@ -550,7 +550,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 					String msg = "Stopping YARN session because the number of failed containers ("
 						+ failedContainersSoFar + ") exceeded the maximum failed containers ("
 						+ maxFailedContainers + "). This number is controlled by the '"
-						+ ConfigConstants.YARN_MAX_FAILED_CONTAINERS + "' configuration setting. "
+						+ YarnConfigOptions.MAX_FAILED_CONTAINERS.key() + "' configuration setting. "
 						+ "By default its the number of requested containers.";
 
 					LOG.error(msg);
@@ -710,7 +710,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 			Logger log) {
 
 		final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
-			ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
+			YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
 
 		final long yarnExpiryIntervalMS = yarnConfig.getLong(
 			YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
@@ -723,7 +723,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 		}
 
 		final int maxFailedContainers = flinkConfig.getInteger(
-			ConfigConstants.YARN_MAX_FAILED_CONTAINERS, numInitialTaskManagers);
+			YarnConfigOptions.MAX_FAILED_CONTAINERS.key(), numInitialTaskManagers);
 		if (maxFailedContainers >= 0) {
 			log.info("YARN application tolerates {} failed TaskManager containers before giving up",
 				maxFailedContainers);

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 8327b6a..fb1a1c3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -134,7 +135,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 		this.yarnConfig = new YarnConfiguration();
 		this.env = env;
 		final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
-				ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
+				YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
 
 		final long yarnExpiryIntervalMS = yarnConfig.getLong(
 				YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 5d8abac..f2968b1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -37,6 +37,7 @@ import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptorV2;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -773,7 +774,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
 		String currentUser = System.getProperty("user.name");
 		String propertiesFileLocation =
-			conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
+			conf.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
 
 		return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 28ef2ab..3773352 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -52,6 +52,71 @@ public class YarnConfigOptions {
 		key("yarn.per-job-cluster.include-user-jar")
 			.defaultValue("ORDER");
 
+	/**
+	 * The vcores exposed by YARN.
+	 */
+	public static final ConfigOption<Integer> VCORES =
+		key("yarn.containers.vcores")
+		.defaultValue(-1);
+
+	/**
+	 * The maximum number of failed YARN containers before entirely stopping
+	 * the YARN session / job on YARN.
+	 * By default, we take the number of of initially requested containers.
+	 *
+	 * <p>Note: This option returns a String since Integer options must have a static default value.
+	 */
+	public static final ConfigOption<String> MAX_FAILED_CONTAINERS =
+		key("yarn.maximum-failed-containers")
+		.noDefaultValue();
+
+	/**
+	 * Set the number of retries for failed YARN ApplicationMasters/JobManagers in high
+	 * availability mode. This value is usually limited by YARN.
+	 * By default, it's 1 in the standalone case and 2 in the high availability case.
+	 *
+	 * <p>>Note: This option returns a String since Integer options must have a static default value.
+	 */
+	public static final ConfigOption<String> APPLICATION_ATTEMPTS =
+		key("yarn.application-attempts")
+		.noDefaultValue();
+
+	/**
+	 * The heartbeat interval between the Application Master and the YARN Resource Manager.
+	 */
+	public static final ConfigOption<Integer> HEARTBEAT_DELAY_SECONDS =
+		key("yarn.heartbeat-delay")
+		.defaultValue(5);
+
+	/**
+	 * When a Flink job is submitted to YARN, the JobManager's host and the number of available
+	 * processing slots is written into a properties file, so that the Flink client is able
+	 * to pick those details up.
+	 * This configuration parameter allows changing the default location of that file (for example
+	 * for environments sharing a Flink installation between users)
+	 */
+	public static final ConfigOption<String> PROPERTIES_FILE_LOCATION =
+		key("yarn.properties-file.location")
+		.noDefaultValue();
+
+	/**
+	 * The config parameter defining the Akka actor system port for the ApplicationMaster and
+	 * JobManager.
+	 * The port can either be a port, such as "9123",
+	 * a range of ports: "50100-50200"
+	 * or a list of ranges and or points: "50100-50200,50300-50400,51234".
+	 * Setting the port to 0 will let the OS choose an available port.
+	 */
+	public static final ConfigOption<String> APPLICATION_MASTER_PORT =
+		key("yarn.application-master.port")
+		.defaultValue("0");
+
+	/**
+	 * A comma-separated list of strings to use as YARN application tags.
+	 */
+	public static final ConfigOption<String> APPLICATION_TAGS =
+		key("yarn.tags")
+		.defaultValue("");
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 9ead775..e8fccac 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.security.SecurityContext;
@@ -112,21 +113,13 @@ public class YarnEntrypointUtils {
 		// corresponding generic config keys instead. that way, later code needs not
 		// deal with deprecated config keys
 
-		BootstrapTools.substituteDeprecatedConfigKey(configuration,
-			ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-
-		BootstrapTools.substituteDeprecatedConfigKey(configuration,
-			ConfigConstants.YARN_HEAP_CUTOFF_MIN,
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
 		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
 			ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
-			ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+			ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
 
 		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
 			ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
-			ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+			ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
 
 		final String keytabPath;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index d78b390..a2d1668 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -22,7 +22,7 @@ import java.io.IOException
 import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
 
 import akka.actor.ActorRef
-import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
+import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
 import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.yarn.configuration.YarnConfigOptions
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -88,7 +89,7 @@ class YarnJobManager(
   val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds
   val YARN_HEARTBEAT_DELAY: FiniteDuration =
     FiniteDuration(
-      flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
+      flinkConfiguration.getInteger(YarnConfigOptions.HEARTBEAT_DELAY_SECONDS),
       TimeUnit.SECONDS)
 
   val yarnFilesPath: Option[String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES))

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index bcb8559..19d1af5 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -87,7 +88,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	public void testConfigOverwrite() {
 		Configuration configuration = new Configuration();
 		// overwrite vcores in config
-		configuration.setInteger(ConfigConstants.YARN_VCORES, Integer.MAX_VALUE);
+		configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
 
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			configuration,


[3/3] flink git commit: [FLINK-7354][tests] ignore "initialSeedUniquifierGenerator" thread in thread list

Posted by ch...@apache.org.
[FLINK-7354][tests] ignore "initialSeedUniquifierGenerator" thread in thread list

Netty may spawn a thread in its ThreadLocalRandom class because of some secure
random use which may be caught in our LocalFlinkMiniClusterITCase as not being
stopped. Let's tolerate this.

Alternatively, we could solve this similarly to
https://issues.apache.org/jira/browse/SOLR-10098 by setting
ThreadLocalRandom.setInitialSeedUniquifier(1L);
but that may be less future proof or even remove some randomness.

This closes #4464.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6126e7c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6126e7c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6126e7c

Branch: refs/heads/master
Commit: d6126e7ca7a973635cc0d4ebaff52f35653df503
Parents: d63d704
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Aug 2 16:20:02 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Aug 10 11:36:30 2017 +0200

----------------------------------------------------------------------
 .../test/runtime/minicluster/LocalFlinkMiniClusterITCase.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d6126e7c/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 40c4e84..770b88c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -49,7 +49,12 @@ import static org.junit.Assert.fail;
  */
 public class LocalFlinkMiniClusterITCase extends TestLogger {
 
-	private static final String[] ALLOWED_THREAD_PREFIXES = { };
+	private static final String[] ALLOWED_THREAD_PREFIXES = {
+		// This is a daemon thread spawned by netty's ThreadLocalRandom class if no
+		// initialSeedUniquifier is set yet and it is sometimes spawned before this test and
+		// sometimes during this test.
+		"initialSeedUniquifierGenerator"
+	};
 
 	@Test
 	public void testLocalFlinkMiniClusterWithMultipleTaskManagers() {