You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/04/21 12:24:15 UTC

[01/13] flink git commit: [FLINK-5623] [runtime] Fix TempBarrier dam has been closed

Repository: flink
Updated Branches:
  refs/heads/master f9eac3afd -> 83061ad0f


[FLINK-5623] [runtime] Fix TempBarrier dam has been closed

Properly reset the "pipeline breaker" upon closing.

This closes #3747


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9746846
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9746846
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9746846

Branch: refs/heads/master
Commit: c9746846b357d8ce538ff872cea60c52b1904b43
Parents: f9eac3a
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Apr 20 08:46:01 2017 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 10:15:26 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/graph/library/link_analysis/PageRank.java  | 5 -----
 .../main/java/org/apache/flink/runtime/operators/BatchTask.java | 1 +
 2 files changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9746846/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
index c5c4178..747735e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
@@ -45,7 +45,6 @@ import org.apache.flink.graph.asm.result.UnaryResult;
 import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
 import org.apache.flink.graph.library.link_analysis.PageRank.Result;
 import org.apache.flink.graph.utils.GraphUtils;
-import org.apache.flink.graph.utils.GraphUtils.IdentityMapper;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.types.DoubleValue;
@@ -176,10 +175,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.run(new VertexDegrees<K, VV, EV>()
 				.setParallelism(parallelism));
 
-		// prevent Exception "The dam has been closed." in TempBarrier
-		// for a simplified Graph as in PageRankITCase (see FLINK-5623)
-		vertexDegree = vertexDegree.map(new IdentityMapper<Vertex<K, Degrees>>());
-
 		// vertex count
 		DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9746846/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index f748079..87b0a76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -884,6 +884,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 					// close the async barrier if there is one
 					if (this.tempBarriers[i] != null) {
 						this.tempBarriers[i].close();
+						this.tempBarriers[i] = null;
 					}
 
 					// recreate the local strategy


[05/13] flink git commit: [misc] Closing commit for outdated pull request

Posted by se...@apache.org.
[misc] Closing commit for outdated pull request

This closes #3592


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a70524a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a70524a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a70524a

Branch: refs/heads/master
Commit: 7a70524acfe85c5b634f18f3cbe6dd32be302e37
Parents: eef85e0
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Apr 21 11:05:00 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 11:05:00 2017 +0200

----------------------------------------------------------------------

----------------------------------------------------------------------



[11/13] flink git commit: [hotfix] [docs] Remove empty Docker docs page

Posted by se...@apache.org.
[hotfix] [docs] Remove empty Docker docs page

This will be re-added when we have a coherent plan for Docker support
and after the official Docker images are available.

This closes #3658


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/918d25ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/918d25ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/918d25ef

Branch: refs/heads/master
Commit: 918d25efffc3a59347cc5ac96e56f9eadff2d716
Parents: 3cad309
Author: Patrick Lucas <me...@patricklucas.com>
Authored: Fri Mar 31 17:00:21 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:35:06 2017 +0200

----------------------------------------------------------------------
 docs/setup/docker.md | 29 -----------------------------
 1 file changed, 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/918d25ef/docs/setup/docker.md
----------------------------------------------------------------------
diff --git a/docs/setup/docker.md b/docs/setup/docker.md
deleted file mode 100644
index 8b9757f..0000000
--- a/docs/setup/docker.md
+++ /dev/null
@@ -1,29 +0,0 @@
----
-title:  "Docker Setup"
-nav-title: Docker
-nav-parent_id: deployment
-nav-pos: 4
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* This will be replaced by the TOC
-{:toc}
-
-


[07/13] flink git commit: [hotfix] [rocksdb] Convert performance benchmarks to unit tests

Posted by se...@apache.org.
[hotfix] [rocksdb] Convert performance benchmarks to unit tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05065451
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05065451
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05065451

Branch: refs/heads/master
Commit: 05065451abf5917f796b00692f43f0a2d2f3ed48
Parents: ea054a7
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Apr 21 12:19:04 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:19:04 2017 +0200

----------------------------------------------------------------------
 .../ListViaMergeSpeedMiniBenchmark.java         | 111 ----------
 .../ListViaRangeSpeedMiniBenchmark.java         | 132 ------------
 .../state/benchmark/RocksDBPerformanceTest.java | 204 +++++++++++++++++++
 3 files changed, 204 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05065451/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
deleted file mode 100644
index f3e084f..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.benchmark;
-
-import org.apache.flink.util.FileUtils;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.StringAppendOperator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-
-public class ListViaMergeSpeedMiniBenchmark {
-	
-	public static void main(String[] args) throws Exception {
-		final File rocksDir = new File("/tmp/rdb");
-		FileUtils.deleteDirectory(rocksDir);
-
-		final Options options = new Options()
-				.setCompactionStyle(CompactionStyle.LEVEL)
-				.setLevelCompactionDynamicLevelBytes(true)
-				.setIncreaseParallelism(4)
-				.setUseFsync(false)
-				.setMaxOpenFiles(-1)
-				.setDisableDataSync(true)
-				.setCreateIfMissing(true)
-				.setMergeOperator(new StringAppendOperator());
-
-		final WriteOptions write_options = new WriteOptions()
-				.setSync(false)
-				.setDisableWAL(true);
-
-		final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
-
-		final String key = "key";
-		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
-
-		try {
-			final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-			final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
-
-			final int num = 50000;
-
-			// ----- insert -----
-			System.out.println("begin insert");
-
-			final long beginInsert = System.nanoTime();
-			for (int i = 0; i < num; i++) {
-				rocksDB.merge(write_options, keyBytes, valueBytes);
-			}
-			final long endInsert = System.nanoTime();
-			System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
-
-			// ----- read (attempt 1) -----
-
-			final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
-			final long beginGet1 = System.nanoTime();
-			rocksDB.get(keyBytes, resultHolder);
-			final long endGet1 = System.nanoTime();
-
-			System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
-
-			// ----- read (attempt 2) -----
-
-			final long beginGet2 = System.nanoTime();
-			rocksDB.get(keyBytes, resultHolder);
-			final long endGet2 = System.nanoTime();
-
-			System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
-
-			// ----- compact -----
-			System.out.println("compacting...");
-			final long beginCompact = System.nanoTime();
-			rocksDB.compactRange();
-			final long endCompact = System.nanoTime();
-
-			System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
-
-			// ----- read (attempt 3) -----
-
-			final long beginGet3 = System.nanoTime();
-			rocksDB.get(keyBytes, resultHolder);
-			final long endGet3 = System.nanoTime();
-
-			System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
-		} finally {
-			rocksDB.close();
-			options.close();
-			write_options.close();
-			FileUtils.deleteDirectory(rocksDir);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05065451/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
deleted file mode 100644
index f46e2cd..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.benchmark;
-
-import org.apache.flink.core.memory.MemoryUtils;
-import org.apache.flink.util.FileUtils;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.StringAppendOperator;
-import org.rocksdb.WriteOptions;
-import sun.misc.Unsafe;
-
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-
-public class ListViaRangeSpeedMiniBenchmark {
-	
-	public static void main(String[] args) throws Exception {
-		final File rocksDir = new File("/tmp/rdb");
-		FileUtils.deleteDirectory(rocksDir);
-
-		final Options options = new Options()
-				.setCompactionStyle(CompactionStyle.LEVEL)
-				.setLevelCompactionDynamicLevelBytes(true)
-				.setIncreaseParallelism(4)
-				.setUseFsync(false)
-				.setMaxOpenFiles(-1)
-				.setDisableDataSync(true)
-				.setCreateIfMissing(true)
-				.setMergeOperator(new StringAppendOperator());
-
-		final WriteOptions write_options = new WriteOptions()
-				.setSync(false)
-				.setDisableWAL(true);
-
-		final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
-
-		final String key = "key";
-		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
-		
-		try {
-
-			final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-			final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
-
-			final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
-
-			final Unsafe unsafe = MemoryUtils.UNSAFE;
-			final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
-
-			final int num = 50000;
-			System.out.println("begin insert");
-
-			final long beginInsert = System.nanoTime();
-			for (int i = 0; i < num; i++) {
-				unsafe.putInt(keyTemplate, offset, i);
-				rocksDB.put(write_options, keyTemplate, valueBytes);
-			}
-			final long endInsert = System.nanoTime();
-			System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
-
-			final byte[] resultHolder = new byte[num * valueBytes.length];
-
-			final long beginGet = System.nanoTime();
-
-			final RocksIterator iterator = rocksDB.newIterator();
-			int pos = 0;
-
-			try {
-				// seek to start
-				unsafe.putInt(keyTemplate, offset, 0);
-				iterator.seek(keyTemplate);
-
-				// mark end
-				unsafe.putInt(keyTemplate, offset, -1);
-
-				// iterate
-				while (iterator.isValid()) {
-					byte[] currKey = iterator.key();
-					if (samePrefix(keyBytes, currKey)) {
-						byte[] currValue = iterator.value();
-						System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
-						pos += currValue.length;
-						iterator.next();
-					} else {
-						break;
-					}
-				}
-			}finally {
-				iterator.close();
-			}
-
-			final long endGet = System.nanoTime();
-
-			System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
-		} finally {
-			rocksDB.close();
-			options.close();
-			write_options.close();
-			FileUtils.deleteDirectory(rocksDir);
-		}
-	}
-
-	private static boolean samePrefix(byte[] prefix, byte[] key) {
-		for (int i = 0; i < prefix.length; i++) {
-			if (prefix[i] != key [i]) {
-				return false;
-			}
-		}
-
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05065451/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
new file mode 100644
index 0000000..011703e
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Test that validates that the performance of RocksDB is as expected.
+ * This test guards against the bug filed as 'FLINK-5756'
+ */
+public class RocksDBPerformanceTest extends TestLogger {
+
+	@Rule
+	public final TemporaryFolder TMP = new TemporaryFolder();
+
+	@Test(timeout = 2000)
+	public void testRocksDbMergePerformance() throws Exception {
+		final File rocksDir = TMP.newFolder("rdb");
+
+		final String key = "key";
+		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+		final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+		final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+		final int num = 50000;
+
+		try (
+			final Options options = new Options()
+					.setCompactionStyle(CompactionStyle.LEVEL)
+					.setLevelCompactionDynamicLevelBytes(true)
+					.setIncreaseParallelism(4)
+					.setUseFsync(false)
+					.setMaxOpenFiles(-1)
+					.setDisableDataSync(true)
+					.setCreateIfMissing(true)
+					.setMergeOperator(new StringAppendOperator());
+
+			final WriteOptions write_options = new WriteOptions()
+					.setSync(false)
+					.setDisableWAL(true);
+
+			final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()))
+		{
+			// ----- insert -----
+			log.info("begin insert");
+
+			final long beginInsert = System.nanoTime();
+			for (int i = 0; i < num; i++) {
+				rocksDB.merge(write_options, keyBytes, valueBytes);
+			}
+			final long endInsert = System.nanoTime();
+			log.info("end insert - duration: {} ms", (endInsert - beginInsert) / 1_000_000);
+
+			// ----- read (attempt 1) -----
+
+			final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
+			final long beginGet1 = System.nanoTime();
+			rocksDB.get(keyBytes, resultHolder);
+			final long endGet1 = System.nanoTime();
+
+			log.info("end get - duration: {} ms", (endGet1 - beginGet1) / 1_000_000);
+
+			// ----- read (attempt 2) -----
+
+			final long beginGet2 = System.nanoTime();
+			rocksDB.get(keyBytes, resultHolder);
+			final long endGet2 = System.nanoTime();
+
+			log.info("end get - duration: {} ms", (endGet2 - beginGet2) / 1_000_000);
+
+			// ----- compact -----
+			log.info("compacting...");
+			final long beginCompact = System.nanoTime();
+			rocksDB.compactRange();
+			final long endCompact = System.nanoTime();
+
+			log.info("end compaction - duration: {} ms", (endCompact - beginCompact) / 1_000_000);
+
+			// ----- read (attempt 3) -----
+
+			final long beginGet3 = System.nanoTime();
+			rocksDB.get(keyBytes, resultHolder);
+			final long endGet3 = System.nanoTime();
+
+			log.info("end get - duration: {} ms", (endGet3 - beginGet3) / 1_000_000);
+		}
+	}
+
+	@Test(timeout = 2000)
+	public void testRocksDbRangeGetPerformance() throws Exception {
+		final File rocksDir = TMP.newFolder("rdb");
+
+		final String key = "key";
+		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+		final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+		final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+		final int num = 50000;
+
+		try (
+			final Options options = new Options()
+					.setCompactionStyle(CompactionStyle.LEVEL)
+					.setLevelCompactionDynamicLevelBytes(true)
+					.setIncreaseParallelism(4)
+					.setUseFsync(false)
+					.setMaxOpenFiles(-1)
+					.setDisableDataSync(true)
+					.setCreateIfMissing(true)
+					.setMergeOperator(new StringAppendOperator());
+
+			final WriteOptions write_options = new WriteOptions()
+					.setSync(false)
+					.setDisableWAL(true);
+
+			final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()))
+		{
+			final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
+
+			final Unsafe unsafe = MemoryUtils.UNSAFE;
+			final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
+
+			log.info("begin insert");
+
+			final long beginInsert = System.nanoTime();
+			for (int i = 0; i < num; i++) {
+				unsafe.putInt(keyTemplate, offset, i);
+				rocksDB.put(write_options, keyTemplate, valueBytes);
+			}
+			final long endInsert = System.nanoTime();
+			log.info("end insert - duration: {} ms", (endInsert - beginInsert) / 1_000_000);
+
+			@SuppressWarnings("MismatchedReadAndWriteOfArray")
+			final byte[] resultHolder = new byte[num * valueBytes.length];
+
+			final long beginGet = System.nanoTime();
+
+			int pos = 0;
+
+			try (final RocksIterator iterator = rocksDB.newIterator()) {
+				// seek to start
+				unsafe.putInt(keyTemplate, offset, 0);
+				iterator.seek(keyTemplate);
+
+				// iterate
+				while (iterator.isValid() && samePrefix(keyBytes, iterator.key())) {
+					byte[] currValue = iterator.value();
+					System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
+					pos += currValue.length;
+					iterator.next();
+				}
+			}
+
+			final long endGet = System.nanoTime();
+
+			log.info("end get - duration: {} ms", (endGet - beginGet) / 1_000_000);
+		}
+	}
+
+
+	private static boolean samePrefix(byte[] prefix, byte[] key) {
+		for (int i = 0; i < prefix.length; i++) {
+			if (prefix[i] != key [i]) {
+				return false;
+			}
+		}
+
+		return true;
+	}
+}


[08/13] flink git commit: [FLINK-6290] [CEP] Fix SharedBuffer release when having multiple edges between entries

Posted by se...@apache.org.
[FLINK-6290] [CEP] Fix SharedBuffer release when having multiple edges between entries

This closes #3706


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0ea74a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0ea74a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0ea74a0

Branch: refs/heads/master
Commit: c0ea74a0a4eedf4fe73e8f383e837ea373216476
Parents: 0506545
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Mon Apr 10 16:57:48 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:22:00 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  5 ++--
 .../apache/flink/cep/nfa/SharedBufferTest.java  | 28 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0ea74a0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index ccc6884..43c2aca 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -274,7 +274,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 	public void release(final K key, final V value, final long timestamp) {
 		SharedBufferEntry<K, V> entry = get(key, value, timestamp);
 		if (entry != null) {
-			entry.decreaseReferenceCounter();
 			internalRemove(entry);
 		}
 	}
@@ -493,13 +492,13 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 
 		while (!entriesToRemove.isEmpty()) {
 			SharedBufferEntry<K, V> currentEntry = entriesToRemove.pop();
+			currentEntry.decreaseReferenceCounter();
 
 			if (currentEntry.getReferenceCounter() == 0) {
 				currentEntry.remove();
 
-				for (SharedBufferEdge<K, V> edge: currentEntry.getEdges()) {
+				for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
 					if (edge.getTarget() != null) {
-						edge.getTarget().decreaseReferenceCounter();
 						entriesToRemove.push(edge.getTarget());
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0ea74a0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index f0a25d2..adc07b3 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -32,6 +32,7 @@ import java.util.Collection;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class SharedBufferTest extends TestLogger {
@@ -138,4 +139,31 @@ public class SharedBufferTest extends TestLogger {
 
 		assertEquals(sharedBuffer, copy);
 	}
+
+	@Test
+	public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
+		SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer());
+		int numberEvents = 8;
+		Event[] events = new Event[numberEvents];
+		final long timestamp = 1L;
+
+		for (int i = 0; i < numberEvents; i++) {
+			events[i] = new Event(i + 1, "e" + (i + 1), i);
+		}
+
+		sharedBuffer.put("start", events[1], timestamp, DeweyNumber.fromString("1"));
+		sharedBuffer.put("branching", events[2], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.0"));
+		sharedBuffer.put("branching", events[3], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.1"));
+		sharedBuffer.put("branching", events[3], timestamp, "branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
+		sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
+		sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));
+
+		//simulate IGNORE (next event can point to events[2])
+		sharedBuffer.lock("branching", events[2], timestamp);
+
+		sharedBuffer.release("branching", events[4], timestamp);
+
+		//There should be still events[1] and events[2] in the buffer
+		assertFalse(sharedBuffer.isEmpty());
+	}
 }


[04/13] flink git commit: [FLINK-6117] [security] Make setting of 'zookeeper.sasl.disable' work correctly

Posted by se...@apache.org.
[FLINK-6117] [security] Make setting of 'zookeeper.sasl.disable' work correctly

This closes #3600


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eef85e09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eef85e09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eef85e09

Branch: refs/heads/master
Commit: eef85e095a8a0e4c4553631b74ba7b9f173cebf0
Parents: daf4038
Author: zcb <20...@qq.com>
Authored: Thu Mar 23 03:44:10 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 10:37:44 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/configuration/SecurityOptions.java   |  4 ++++
 .../org/apache/flink/runtime/security/SecurityUtils.java  |  7 +++++++
 .../flink/runtime/security/modules/ZooKeeperModule.java   | 10 ++++++++++
 .../org/apache/flink/test/util/SecureTestEnvironment.java |  1 +
 4 files changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eef85e09/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 95cf0c7..3763198 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -55,6 +55,10 @@ public class SecurityOptions {
 	//  ZooKeeper Security Options
 	// ------------------------------------------------------------------------
 
+	public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE =
+		key("zookeeper.sasl.disable")
+			.defaultValue(false);
+
 	public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME =
 		key("zookeeper.sasl.service-name")
 			.defaultValue("zookeeper");

http://git-wip-us.apache.org/repos/asf/flink/blob/eef85e09/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index d76e7a5..7a09c32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -125,6 +125,8 @@ public class SecurityUtils {
 
 		private final org.apache.hadoop.conf.Configuration hadoopConf;
 
+		private final boolean isZkSaslDisable;
+
 		private final boolean useTicketCache;
 
 		private final String keytab;
@@ -164,6 +166,7 @@ public class SecurityUtils {
 				org.apache.hadoop.conf.Configuration hadoopConf,
 				List<? extends Class<? extends SecurityModule>> securityModules) {
 			this.hadoopConf = checkNotNull(hadoopConf);
+			this.isZkSaslDisable = flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
 			this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
 			this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
 			this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
@@ -175,6 +178,10 @@ public class SecurityUtils {
 			validate();
 		}
 
+		public boolean isZkSaslDisable() {
+			return isZkSaslDisable;
+		}
+
 		public String getKeytab() {
 			return keytab;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/eef85e09/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
index c0ba4a5..216bdde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
@@ -41,6 +41,8 @@ public class ZooKeeperModule implements SecurityModule {
 	 */
 	private static final String ZK_LOGIN_CONTEXT_NAME = "zookeeper.sasl.clientconfig";
 
+	private String priorSaslEnable;
+
 	private String priorServiceName;
 
 	private String priorLoginContextName;
@@ -48,6 +50,9 @@ public class ZooKeeperModule implements SecurityModule {
 	@Override
 	public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException {
 
+		priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, null);
+		System.setProperty(ZK_ENABLE_CLIENT_SASL, String.valueOf(!configuration.isZkSaslDisable()));
+
 		priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
 		if (!"zookeeper".equals(configuration.getZooKeeperServiceName())) {
 			System.setProperty(ZK_SASL_CLIENT_USERNAME, configuration.getZooKeeperServiceName());
@@ -61,6 +66,11 @@ public class ZooKeeperModule implements SecurityModule {
 
 	@Override
 	public void uninstall() throws SecurityInstallException {
+		if(priorSaslEnable != null) {
+			System.setProperty(ZK_ENABLE_CLIENT_SASL, priorSaslEnable);
+		} else {
+			System.clearProperty(ZK_ENABLE_CLIENT_SASL);
+		}
 		if(priorServiceName != null) {
 			System.setProperty(ZK_SASL_CLIENT_USERNAME, priorServiceName);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/eef85e09/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index febd074..98deee6 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -138,6 +138,7 @@ public class SecureTestEnvironment {
 			//ctx.setHadoopConfiguration() for the UGI implementation to work properly.
 			//See Yarn test case module for reference
 			Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
+			flinkConfig.setBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE, false);
 			flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, testKeytab);
 			flinkConfig.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, false);
 			flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, testPrincipal);


[12/13] flink git commit: [FLINK-6236] [docs] Mention that resume from savepoint is possible via the web UI

Posted by se...@apache.org.
[FLINK-6236] [docs] Mention that resume from savepoint is possible via the web UI

This closes #3657


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6a1551f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6a1551f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6a1551f

Branch: refs/heads/master
Commit: d6a1551f90edfa534a57244a3be46e54e99dfeae
Parents: 918d25e
Author: rami-alisawi <ra...@users.noreply.github.com>
Authored: Fri Mar 31 17:11:29 2017 +0300
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:39:05 2017 +0200

----------------------------------------------------------------------
 docs/setup/savepoints.md | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d6a1551f/docs/setup/savepoints.md
----------------------------------------------------------------------
diff --git a/docs/setup/savepoints.md b/docs/setup/savepoints.md
index da0784d..4bdc43f 100644
--- a/docs/setup/savepoints.md
+++ b/docs/setup/savepoints.md
@@ -69,6 +69,8 @@ In the above example, the print sink is stateless and hence not part of the save
 
 You can use the [command line client]({{ site.baseurl }}/setup/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a savepoint*, *resume from savepoints*, and *dispose savepoints*.
 
+With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the webui.
+
 ### Triggering Savepoints
 
 When triggering a savepoint, a single savepoint file will be created that contains the checkpoint *meta data*. The actual checkpoint state will be kept around in the configured checkpoint directory. For example with a `FsStateBackend` or `RocksDBStateBackend`:


[03/13] flink git commit: [FLINK-6176] [scripts] [yarn] [mesos] Add JARs to CLASSPATH deterministically

Posted by se...@apache.org.
[FLINK-6176] [scripts] [yarn] [mesos] Add JARs to CLASSPATH deterministically

Sorts files read from Flink's lib directory and places the distribution
JAR to the end of the CLASSPATH.

This closes #3632


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daf4038c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daf4038c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daf4038c

Branch: refs/heads/master
Commit: daf4038c88b459084a1232c69fb584a7e2e100da
Parents: f75466f
Author: Greg Hogan <co...@greghogan.com>
Authored: Sun Mar 26 15:46:00 2017 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 10:30:44 2017 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/config.sh     | 18 +++++++++--
 .../main/flink-bin/mesos-bin/mesos-appmaster.sh | 17 +----------
 .../flink-bin/mesos-bin/mesos-taskmanager.sh    | 17 +----------
 .../src/main/flink-bin/yarn-bin/yarn-session.sh | 17 +----------
 .../yarn/AbstractYarnClusterDescriptor.java     | 32 +++++++++++---------
 5 files changed, 35 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 5ef4ef7..dbfdd0e 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -18,16 +18,28 @@
 ################################################################################
 
 constructFlinkClassPath() {
+    local FLINK_DIST
+    local FLINK_CLASSPATH
 
     while read -d '' -r jarfile ; do
-        if [[ $FLINK_CLASSPATH = "" ]]; then
+        if [[ "$jarfile" =~ .*flink-dist.*.jar ]]; then
+            FLINK_DIST="$FLINK_DIST":"$jarfile"
+        elif [[ "$FLINK_CLASSPATH" == "" ]]; then
             FLINK_CLASSPATH="$jarfile";
         else
             FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
         fi
-    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
+    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)
 
-    echo $FLINK_CLASSPATH
+    if [[ "$FLINK_DIST" == "" ]]; then
+        # write error message to stderr since stdout is stored as the classpath
+        (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
+
+        # exit function with empty classpath to force process failure
+        exit 1
+    fi
+
+    echo "$FLINK_CLASSPATH""$FLINK_DIST"
 }
 
 # These are used to mangle paths that are passed to java when using

http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
index 6fae6c2..67eab9d 100755
--- a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
@@ -23,26 +23,11 @@ bin=`cd "$bin"; pwd`
 # get Flink config
 . "$bin"/config.sh
 
-# auxilliary function to construct a lightweight classpath for the
-# Flink AppMaster
-constructAppMasterClassPath() {
-
-    while read -d '' -r jarfile ; do
-        if [[ $CC_CLASSPATH = "" ]]; then
-            CC_CLASSPATH="$jarfile";
-        else
-            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
-        fi
-    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
-
-    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
-}
-
 if [ "$FLINK_IDENT_STRING" = "" ]; then
     FLINK_IDENT_STRING="$USER"
 fi
 
-CC_CLASSPATH=`manglePathList $(constructAppMasterClassPath)`
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
 
 log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
 log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"

http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
index 23a301b..ab2f7b1 100755
--- a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
@@ -23,22 +23,7 @@ bin=`cd "$bin"; pwd`
 # get Flink config
 . "$bin"/config.sh
 
-# auxilliary function to construct a lightweight classpath for the
-# Flink TaskManager
-constructTaskManagerClassPath() {
-
-    while read -d '' -r jarfile ; do
-        if [[ $CC_CLASSPATH = "" ]]; then
-            CC_CLASSPATH="$jarfile";
-        else
-            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
-        fi
-    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
-
-    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
-}
-
-CC_CLASSPATH=`manglePathList $(constructTaskManagerClassPath)`
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
 
 log=flink-taskmanager.log
 log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"

http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
index 1755f32..03b1e3a 100755
--- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
+++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
@@ -29,22 +29,7 @@ fi
 
 JVM_ARGS="$JVM_ARGS -Xmx512m"
 
-# auxilliary function to construct a lightweight classpath for the
-# Flink CLI client
-constructCLIClientClassPath() {
-
-    while read -d '' -r jarfile ; do
-        if [[ $CC_CLASSPATH = "" ]]; then
-            CC_CLASSPATH="$jarfile";
-        else
-            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
-        fi
-    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
-
-    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
-}
-
-CC_CLASSPATH=`manglePathList $(constructCLIClientClassPath)`
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
 
 log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
 log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"

http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ec7af5a..a5a6c36 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -23,10 +23,10 @@ import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -63,10 +63,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.ObjectOutputStream;
+import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URISyntaxException;
@@ -669,12 +669,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		final Map<String, LocalResource> localResources = new HashMap<>(2 + effectiveShipFiles.size());
 		// list of remote paths (after upload)
 		final List<Path> paths = new ArrayList<>(2 + effectiveShipFiles.size());
-		// classpath assembler
-		final StringBuilder classPathBuilder = new StringBuilder();
 		// ship list that enables reuse of resources for task manager containers
 		StringBuilder envShipFileList = new StringBuilder();
 
 		// upload and register ship files
+		final List<String> classPaths = new ArrayList<>();
 		for (File shipFile : effectiveShipFiles) {
 			LocalResource shipResources = Records.newRecord(LocalResource.class);
 
@@ -693,29 +692,32 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 				Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() {
 					@Override
-					public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttributes attrs)
+					public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
 							throws IOException {
-						super.preVisitDirectory(dir, attrs);
-
-						java.nio.file.Path relativePath = parentPath.relativize(dir);
+						java.nio.file.Path relativePath = parentPath.relativize(file);
 
-						classPathBuilder
-							.append(relativePath)
-							.append(File.separator)
-							.append("*")
-							.append(File.pathSeparator);
+						classPaths.add(relativePath.toString());
 
 						return FileVisitResult.CONTINUE;
 					}
 				});
 			} else {
 				// add files to the classpath
-				classPathBuilder.append(shipFile.getName()).append(File.pathSeparator);
+				classPaths.add(shipFile.getName());
 			}
 
 			envShipFileList.append(remotePath).append(",");
 		}
 
+		// normalize classpath by sorting
+		Collections.sort(classPaths);
+
+		// classpath assembler
+		StringBuilder classPathBuilder = new StringBuilder();
+		for (String classPath : classPaths) {
+			classPathBuilder.append(classPath).append(File.pathSeparator);
+		}
+
 		// Setup jar for ApplicationMaster
 		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
 		LocalResource flinkConf = Records.newRecord(LocalResource.class);


[09/13] flink git commit: [FLINK-4769] [metrics] Port metric config parameters to ConfigOptions

Posted by se...@apache.org.
[FLINK-4769] [metrics] Port metric config parameters to ConfigOptions

This closes #3687


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b77cc3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b77cc3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b77cc3f

Branch: refs/heads/master
Commit: 1b77cc3f2682afd839ce6b16cd1658bbd01a6b04
Parents: c0ea74a
Author: zentol <ch...@apache.org>
Authored: Thu Apr 6 13:47:33 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:31:30 2017 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTestBase.java         |  3 +-
 .../flink/configuration/ConfigConstants.java    | 44 +++++-----
 .../flink/configuration/MetricOptions.java      | 88 ++++++++++++++++++++
 .../ScheduledDropwizardReporterTest.java        |  7 +-
 .../DropwizardFlinkHistogramWrapperTest.java    |  3 +-
 .../flink/metrics/jmx/JMXReporterTest.java      |  9 +-
 .../jobmanager/JMXJobManagerMetricTest.java     |  5 +-
 .../metrics/statsd/StatsDReporterTest.java      | 11 +--
 .../metrics/MetricRegistryConfiguration.java    | 24 ++----
 .../runtime/metrics/scope/ScopeFormat.java      | 49 -----------
 .../runtime/metrics/scope/ScopeFormats.java     | 32 +++----
 .../runtime/metrics/MetricRegistryTest.java     | 29 +++----
 .../metrics/groups/AbstractMetricGroupTest.java |  7 +-
 .../metrics/groups/JobManagerGroupTest.java     |  4 +-
 .../metrics/groups/JobManagerJobGroupTest.java  | 10 +--
 .../groups/MetricGroupRegistrationTest.java     |  3 +-
 .../metrics/groups/TaskManagerGroupTest.java    |  4 +-
 .../metrics/groups/TaskManagerJobGroupTest.java | 10 +--
 .../metrics/groups/TaskMetricGroupTest.java     | 10 +--
 .../api/operators/AbstractStreamOperator.java   |  8 +-
 20 files changed, 196 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 0c6bfa9..a21a239 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -116,7 +117,7 @@ public abstract class KafkaTestBase extends TestLogger {
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
-		flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
+		flinkConfig.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		return flinkConfig;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a035beb..61c1b27 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -990,21 +990,8 @@ public final class ConfigConstants {
 
 	// ---------------------------- Metrics -----------------------------------
 
-	/**
-	 * The list of named reporters. Names are defined here and per-reporter configs
-	 * are given with the reporter config prefix and the reporter name.
-	 *
-	 * Example:
-	 * <pre>{@code
-	 * metrics.reporters = foo, bar
-	 *
-	 * metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter
-	 * metrics.reporter.foo.interval = 10
-	 *
-	 * metrics.reporter.bar.class = org.apache.flink.metrics.graphite.GraphiteReporter
-	 * metrics.reporter.bar.port = 1337
-	 * }</pre>
-	 */
+	/** @deprecated Use {@link MetricOptions#REPORTERS_LIST} instead. */
+	@Deprecated
 	public static final String METRICS_REPORTERS_LIST = "metrics.reporters";
 
 	/**
@@ -1022,28 +1009,36 @@ public final class ConfigConstants {
 	/**	The delimiter used to assemble the metric identifier. This is used as a suffix in an actual reporter config. */
 	public static final String METRICS_REPORTER_SCOPE_DELIMITER = "scope.delimiter";
 
-	/** The delimiter used to assemble the metric identifier. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_DELIMITER} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_DELIMITER = "metrics.scope.delimiter";
 
-	/** The scope format string that is applied to all metrics scoped to a JobManager. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_NAMING_JM} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_NAMING_JM = "metrics.scope.jm";
 
-	/** The scope format string that is applied to all metrics scoped to a TaskManager. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TM} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_NAMING_TM = "metrics.scope.tm";
 
-	/** The scope format string that is applied to all metrics scoped to a job on a JobManager. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_NAMING_JM_JOB} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_NAMING_JM_JOB = "metrics.scope.jm.job";
 
-	/** The scope format string that is applied to all metrics scoped to a job on a TaskManager. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TM_JOB} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_NAMING_TM_JOB = "metrics.scope.tm.job";
 
-	/** The scope format string that is applied to all metrics scoped to a task. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TASK} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_NAMING_TASK = "metrics.scope.task";
 
-	/** The scope format string that is applied to all metrics scoped to an operator. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_NAMING_OPERATOR} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator";
 
-	/** The number of measured latencies to maintain at each operator */
+	/** @deprecated Use {@link MetricOptions#LATENCY_HISTORY_SIZE} instead. */
+	@Deprecated
 	public static final String METRICS_LATENCY_HISTORY_SIZE = "metrics.latency.history-size";
 
 
@@ -1498,7 +1493,8 @@ public final class ConfigConstants {
 
 	// ----------------------------- Metrics ----------------------------
 
-	/** The default number of measured latencies to maintain at each operator */
+	/** @deprecated Use {@link MetricOptions#LATENCY_HISTORY_SIZE} instead. */
+	@Deprecated
 	public static final int DEFAULT_METRICS_LATENCY_HISTORY_SIZE = 128;
 
 	// ----------------------------- Environment Variables ----------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
new file mode 100644
index 0000000..8a328ac
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+@PublicEvolving
+public class MetricOptions {
+
+	/**
+	 * The list of named reporters. Names are defined here and per-reporter configs
+	 * are given with the reporter config prefix and the reporter name.
+	 *
+	 * Example:
+	 * <pre>{@code
+	 * metrics.reporters = foo, bar
+	 *
+	 * metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter
+	 * metrics.reporter.foo.interval = 10
+	 *
+	 * metrics.reporter.bar.class = org.apache.flink.metrics.graphite.GraphiteReporter
+	 * metrics.reporter.bar.port = 1337
+	 * }</pre>
+	 */
+	public static final ConfigOption<String> REPORTERS_LIST =
+		key("metrics.reporters")
+			.noDefaultValue();
+
+	/** The delimiter used to assemble the metric identifier. */
+	public static final ConfigOption<String> SCOPE_DELIMITER =
+		key("metrics.scope.delimiter")
+			.defaultValue(".");
+
+	/** The scope format string that is applied to all metrics scoped to a JobManager. */
+	public static final ConfigOption<String> SCOPE_NAMING_JM =
+		key("metrics.scope.jm")
+			.defaultValue("<host>.jobmanager");
+
+	/** The scope format string that is applied to all metrics scoped to a TaskManager. */
+	public static final ConfigOption<String> SCOPE_NAMING_TM =
+		key("metrics.scope.tm")
+			.defaultValue("<host>.taskmanager.<tm_id>");
+
+	/** The scope format string that is applied to all metrics scoped to a job on a JobManager. */
+	public static final ConfigOption<String> SCOPE_NAMING_JM_JOB =
+		key("metrics.scope.jm.job")
+			.defaultValue("<host>.jobmanager.<job_name>");
+
+	/** The scope format string that is applied to all metrics scoped to a job on a TaskManager. */
+	public static final ConfigOption<String> SCOPE_NAMING_TM_JOB =
+		key("metrics.scope.tm.job")
+			.defaultValue("<host>.taskmanager.<tm_id>.<job_name>");
+
+	/** The scope format string that is applied to all metrics scoped to a task. */
+	public static final ConfigOption<String> SCOPE_NAMING_TASK =
+		key("metrics.scope.task")
+			.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>");
+
+	/** The scope format string that is applied to all metrics scoped to an operator. */
+	public static final ConfigOption<String> SCOPE_NAMING_OPERATOR =
+		key("metrics.scope.operator")
+			.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>");
+
+	/** The number of measured latencies to maintain at each operator */
+	public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
+		key("metrics.latency.history-size")
+			.defaultValue(128);
+
+	private MetricOptions() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 87c4ccb..73e7f0b 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.ScheduledReporter;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -76,13 +77,13 @@ public class ScheduledDropwizardReporterTest {
 		String taskManagerId = "tas:kMana::ger";
 		String counterName = "testCounter";
 
-		configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		configuration.setString(MetricOptions.REPORTERS_LIST, "test");
 		configuration.setString(
 				ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
 				"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");
 
-		configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
-		configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
+		configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
+		configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
 
 		MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 2f2a536..8f7796c 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -28,6 +28,7 @@ import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
@@ -96,7 +97,7 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
 		int size = 10;
 		String histogramMetricName = "histogram";
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
+		config.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 1a96287..85ab897 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.metrics.jmx;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.metrics.reporter.MetricReporter;
@@ -94,7 +95,7 @@ public class JMXReporterTest extends TestLogger {
 	@Test
 	public void testPortConflictHandling() throws Exception {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+		cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9020-9035");
@@ -154,7 +155,7 @@ public class JMXReporterTest extends TestLogger {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
 
-		cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+		cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9040-9055");
@@ -230,7 +231,7 @@ public class JMXReporterTest extends TestLogger {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
+			config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
 			registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
@@ -280,7 +281,7 @@ public class JMXReporterTest extends TestLogger {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
+			config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
 			registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 934a621..6d54db3 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -55,11 +56,11 @@ public class JMXJobManagerMetricTest {
 		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
 		Configuration flinkConfiguration = new Configuration();
 
-		flinkConfiguration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		flinkConfiguration.setString(MetricOptions.REPORTERS_LIST, "test");
 		flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "9060-9075");
 
-		flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");
+		flinkConfiguration.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");
 
 		TestingCluster flink = new TestingCluster(flinkConfiguration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 17fd65a..9215c3f 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.metrics.statsd;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
@@ -75,13 +76,13 @@ public class StatsDReporterTest extends TestLogger {
 		String taskManagerId = "tas:kMana::ger";
 		String counterName = "testCounter";
 
-		configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		configuration.setString(MetricOptions.REPORTERS_LIST, "test");
 		configuration.setString(
 				ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
 				"org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");
 
-		configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
-		configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
+		configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
+		configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
 
 		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
 
@@ -145,7 +146,7 @@ public class StatsDReporterTest extends TestLogger {
 			int port = receiver.getPort();
 
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+			config.setString(MetricOptions.REPORTERS_LIST, "test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");
@@ -219,7 +220,7 @@ public class StatsDReporterTest extends TestLogger {
 			int port = receiver.getPort();
 
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+			config.setString(MetricOptions.REPORTERS_LIST, "test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 596fc82..3475f04 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
-import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -101,13 +101,13 @@ public class MetricRegistryConfiguration {
 
 		char delim;
 		try {
-			delim = configuration.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0);
+			delim = configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
 		} catch (Exception e) {
 			LOG.warn("Failed to parse delimiter, using default delimiter.", e);
 			delim = '.';
 		}
 
-		final String definedReporters = configuration.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
+		final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST);
 		List<Tuple2<String, Configuration>> reporterConfigurations;
 
 		if (definedReporters == null) {
@@ -136,18 +136,12 @@ public class MetricRegistryConfiguration {
 	 * @return Scope formats extracted from the given configuration
 	 */
 	static ScopeFormats createScopeConfig(Configuration configuration) {
-		String jmFormat = configuration.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
-		String jmJobFormat = configuration.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
-		String tmFormat = configuration.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-		String tmJobFormat = configuration.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
-		String taskFormat = configuration.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
-		String operatorFormat = configuration.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+		String jmFormat = configuration.getString(MetricOptions.SCOPE_NAMING_JM);
+		String jmJobFormat = configuration.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
+		String tmFormat = configuration.getString(MetricOptions.SCOPE_NAMING_TM);
+		String tmJobFormat = configuration.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
+		String taskFormat = configuration.getString(MetricOptions.SCOPE_NAMING_TASK);
+		String operatorFormat = configuration.getString(MetricOptions.SCOPE_NAMING_OPERATOR);
 
 		return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
index adf0a85..f9efb88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
@@ -70,46 +70,15 @@ public abstract class ScopeFormat {
 
 	public static final String SCOPE_HOST = asVariable("host");
 
-	// ----- Job Manager ----
-
-	/** The default scope format of the JobManager component: {@code "<host>.jobmanager"} */
-	public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT =
-		concat(SCOPE_HOST, "jobmanager");
-
-	/** The default scope format of JobManager metrics: {@code "<host>.jobmanager"} */
-	public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = DEFAULT_SCOPE_JOBMANAGER_COMPONENT;
-
 	// ----- Task Manager ----
 
 	public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
 
-	/** The default scope format of the TaskManager component: {@code "<host>.taskmanager.<tm_id>"} */
-	public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT =
-			concat(SCOPE_HOST, "taskmanager", SCOPE_TASKMANAGER_ID);
-
-	/** The default scope format of TaskManager metrics: {@code "<host>.taskmanager.<tm_id>"} */
-	public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = DEFAULT_SCOPE_TASKMANAGER_COMPONENT;
-
 	// ----- Job -----
 
 	public static final String SCOPE_JOB_ID = asVariable("job_id");
 	public static final String SCOPE_JOB_NAME = asVariable("job_name");
 
-	/** The default scope format for the job component: {@code "<job_name>"} */
-	public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME;
-
-	// ----- Job on Job Manager ----
-
-	/** The default scope format for all job metrics on a jobmanager: {@code "<host>.jobmanager.<job_name>"} */
-	public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP =
-		concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT);
-
-	// ----- Job on Task Manager ----
-
-	/** The default scope format for all job metrics on a taskmanager: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */
-	public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP =
-			concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT);
-
 	// ----- Task ----
 
 	public static final String SCOPE_TASK_VERTEX_ID = asVariable("task_id");
@@ -118,27 +87,9 @@ public abstract class ScopeFormat {
 	public static final String SCOPE_TASK_ATTEMPT_NUM = asVariable("task_attempt_num");
 	public static final String SCOPE_TASK_SUBTASK_INDEX = asVariable("subtask_index");
 
-	/** Default scope of the task component: {@code "<task_name>.<subtask_index>"} */
-	public static final String DEFAULT_SCOPE_TASK_COMPONENT =
-			concat(SCOPE_TASK_NAME, SCOPE_TASK_SUBTASK_INDEX);
-
-	/** The default scope format for all task metrics:
-	 * {@code "<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"} */
-	public static final String DEFAULT_SCOPE_TASK_GROUP =
-			concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_TASK_COMPONENT);
-
 	// ----- Operator ----
 
 	public static final String SCOPE_OPERATOR_NAME = asVariable("operator_name");
-
-	/** The default scope added by the operator component: "<operator_name>.<subtask_index>" */
-	public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT =
-			concat(SCOPE_OPERATOR_NAME, SCOPE_TASK_SUBTASK_INDEX);
-
-	/** The default scope format for all operator metrics:
-	 * {@code "<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"} */
-	public static final String DEFAULT_SCOPE_OPERATOR_GROUP =
-			concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_OPERATOR_COMPONENT);
 	
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
index bbbe6ba..bde93be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.metrics.scope;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -41,21 +41,21 @@ public class ScopeFormats {
 	 * Creates all default scope formats.
 	 */
 	public ScopeFormats() {
-		this.jobManagerFormat = new JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT);
+		this.jobManagerFormat = new JobManagerScopeFormat(MetricOptions.SCOPE_NAMING_JM.defaultValue());
 
 		this.jobManagerJobFormat = new JobManagerJobScopeFormat(
-				ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, this.jobManagerFormat);
+			MetricOptions.SCOPE_NAMING_JM_JOB.defaultValue(), this.jobManagerFormat);
 
-		this.taskManagerFormat = new TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT);
+		this.taskManagerFormat = new TaskManagerScopeFormat(MetricOptions.SCOPE_NAMING_TM.defaultValue());
 
 		this.taskManagerJobFormat = new TaskManagerJobScopeFormat(
-				ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, this.taskManagerFormat);
+			MetricOptions.SCOPE_NAMING_TM_JOB.defaultValue(), this.taskManagerFormat);
 
 		this.taskFormat = new TaskScopeFormat(
-				ScopeFormat.DEFAULT_SCOPE_TASK_GROUP, this.taskManagerJobFormat);
+				MetricOptions.SCOPE_NAMING_TASK.defaultValue(), this.taskManagerJobFormat);
 
 		this.operatorFormat = new OperatorScopeFormat(
-				ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP, this.taskFormat);
+			MetricOptions.SCOPE_NAMING_OPERATOR.defaultValue(), this.taskFormat);
 	}
 
 	/**
@@ -135,18 +135,12 @@ public class ScopeFormats {
 	 * @return The ScopeFormats parsed from the configuration
 	 */
 	public static ScopeFormats fromConfig(Configuration config) {
-		String jmFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
-		String jmJobFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
-		String tmFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-		String tmJobFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
-		String taskFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
-		String operatorFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+		String jmFormat = config.getString(MetricOptions.SCOPE_NAMING_JM);
+		String jmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
+		String tmFormat = config.getString(MetricOptions.SCOPE_NAMING_TM);
+		String tmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
+		String taskFormat = config.getString(MetricOptions.SCOPE_NAMING_TASK);
+		String operatorFormat = config.getString(MetricOptions.SCOPE_NAMING_OPERATOR);
 
 		return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index fe29ccb..b9502b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -23,6 +23,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
@@ -69,7 +70,7 @@ public class MetricRegistryTest extends TestLogger {
 	public void testReporterInstantiation() {
 		Configuration config = new Configuration();
 
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
 
 		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
@@ -97,7 +98,7 @@ public class MetricRegistryTest extends TestLogger {
 	public void testMultipleReporterInstantiation() {
 		Configuration config = new Configuration();
 
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1, test2,test3");
+		config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName());
@@ -147,7 +148,7 @@ public class MetricRegistryTest extends TestLogger {
 	public void testReporterArgumentForwarding() {
 		Configuration config = new Configuration();
 
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
@@ -172,7 +173,7 @@ public class MetricRegistryTest extends TestLogger {
 	public void testReporterScheduling() throws InterruptedException {
 		Configuration config = new Configuration();
 
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
@@ -215,7 +216,7 @@ public class MetricRegistryTest extends TestLogger {
 	@Test
 	public void testReporterNotifications() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
 
@@ -278,10 +279,10 @@ public class MetricRegistryTest extends TestLogger {
 	public void testScopeConfig() {
 		Configuration config = new Configuration();
 
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A");
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "B");
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "C");
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, "D");
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A");
+		config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B");
+		config.setString(MetricOptions.SCOPE_NAMING_TASK, "C");
+		config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D");
 
 		ScopeFormats scopeConfig = MetricRegistryConfiguration.createScopeConfig(config);
 
@@ -294,8 +295,8 @@ public class MetricRegistryTest extends TestLogger {
 	@Test
 	public void testConfigurableDelimiter() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D.E");
+		config.setString(MetricOptions.SCOPE_DELIMITER, "_");
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
 
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
@@ -308,7 +309,7 @@ public class MetricRegistryTest extends TestLogger {
 	@Test
 	public void testConfigurableDelimiterForReporters() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2,test3");
+		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -331,7 +332,7 @@ public class MetricRegistryTest extends TestLogger {
 	@Test
 	public void testConfigurableDelimiterForReportersInGroup() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2,test3,test4");
+		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -339,7 +340,7 @@ public class MetricRegistryTest extends TestLogger {
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B");
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B");
 
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 		List<MetricReporter> reporters = registry.getReporters();

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index da14bfd..04e40ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
@@ -77,8 +78,8 @@ public class AbstractMetricGroupTest {
 	@Test
 	public void testScopeCachingForMultipleReporters() throws Exception {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D");
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
+		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
@@ -166,7 +167,7 @@ public class AbstractMetricGroupTest {
 	@Test
 	public void testScopeGenerationWithoutReporters() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D");
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
 		MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 317d19e..7834755 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -18,8 +18,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -110,7 +110,7 @@ public class JobManagerGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "constant.<host>.foo.<host>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_JM, "constant.<host>.foo.<host>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host");

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index ed4056f..d8bd57a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -54,8 +54,8 @@ public class JobManagerJobGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "abc");
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "some-constant.<job_name>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc");
+		cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "some-constant.<job_name>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
@@ -77,8 +77,8 @@ public class JobManagerJobGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustomWildcard() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "peter");
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter");
+		cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index f121b59..ace0236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
@@ -41,7 +42,7 @@ public class MetricGroupRegistrationTest {
 	@Test
 	public void testMetricInstantiation() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
 
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 965e8fb..11f3c0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -161,7 +161,7 @@ public class TaskManagerGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "constant.<host>.foo.<host>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM, "constant.<host>.foo.<host>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 		TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index f9bef6c..f9891cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -54,8 +54,8 @@ public class TaskManagerJobGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc");
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "some-constant.<job_name>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "some-constant.<job_name>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
@@ -76,8 +76,8 @@ public class TaskManagerJobGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustomWildcard() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "peter.<tm_id>");
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "*.some-constant.<job_id>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM, "peter.<tm_id>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "*.some-constant.<job_id>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index eb906d0..183237a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -63,9 +63,9 @@ public class TaskMetricGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc");
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "def");
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<tm_id>.<job_id>.<task_id>.<task_attempt_id>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "def");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "<tm_id>.<job_id>.<task_id>.<task_attempt_id>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
@@ -90,7 +90,7 @@ public class TaskMetricGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeWilcard() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "*.<task_attempt_id>.<subtask_index>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "*.<task_attempt_id>.<subtask_index>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		AbstractID executionId = new AbstractID();

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7569170..99c3f4e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -28,8 +28,8 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -177,10 +177,10 @@ public abstract class AbstractStreamOperator<OUT>
 			((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
 		}
 		Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
-		int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+		int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
 		if (historySize <= 0) {
-			LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize);
-			historySize = ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE;
+			LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
+			historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
 		}
 
 		latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));


[06/13] flink git commit: [FLINK-6210] [rocksdb] Close RocksDB in ListViaMergeSpeedMiniBenchmark && ListViaRangeSpeedMiniBenchmark

Posted by se...@apache.org.
[FLINK-6210] [rocksdb] Close RocksDB in ListViaMergeSpeedMiniBenchmark && ListViaRangeSpeedMiniBenchmark

This closes #3652


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea054a7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea054a7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea054a7d

Branch: refs/heads/master
Commit: ea054a7d3bc452d153c070b2789ddbe6a2f080a7
Parents: 7a70524
Author: mengji.fy <me...@taobao.com>
Authored: Thu Mar 30 11:12:41 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:00:32 2017 +0200

----------------------------------------------------------------------
 .../ListViaMergeSpeedMiniBenchmark.java         | 73 ++++++++-------
 .../ListViaRangeSpeedMiniBenchmark.java         | 99 +++++++++++---------
 2 files changed, 93 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea054a7d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
index 2a530e1..f3e084f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
@@ -53,52 +53,59 @@ public class ListViaMergeSpeedMiniBenchmark {
 		final String key = "key";
 		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
 
-		final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-		final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+		try {
+			final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+			final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
 
-		final int num = 50000;
+			final int num = 50000;
 
-		// ----- insert -----
-		System.out.println("begin insert");
+			// ----- insert -----
+			System.out.println("begin insert");
 
-		final long beginInsert = System.nanoTime();
-		for (int i = 0; i < num; i++) {
-			rocksDB.merge(write_options, keyBytes, valueBytes);
-		}
-		final long endInsert = System.nanoTime();
-		System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
+			final long beginInsert = System.nanoTime();
+			for (int i = 0; i < num; i++) {
+				rocksDB.merge(write_options, keyBytes, valueBytes);
+			}
+			final long endInsert = System.nanoTime();
+			System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
 
-		// ----- read (attempt 1) -----
+			// ----- read (attempt 1) -----
 
-		final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
-		final long beginGet1 = System.nanoTime();
-		rocksDB.get(keyBytes, resultHolder);
-		final long endGet1 = System.nanoTime();
+			final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
+			final long beginGet1 = System.nanoTime();
+			rocksDB.get(keyBytes, resultHolder);
+			final long endGet1 = System.nanoTime();
 
-		System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
+			System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
 
-		// ----- read (attempt 2) -----
+			// ----- read (attempt 2) -----
 
-		final long beginGet2 = System.nanoTime();
-		rocksDB.get(keyBytes, resultHolder);
-		final long endGet2 = System.nanoTime();
+			final long beginGet2 = System.nanoTime();
+			rocksDB.get(keyBytes, resultHolder);
+			final long endGet2 = System.nanoTime();
 
-		System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
+			System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
 
-		// ----- compact -----
-		System.out.println("compacting...");
-		final long beginCompact = System.nanoTime();
-		rocksDB.compactRange();
-		final long endCompact = System.nanoTime();
+			// ----- compact -----
+			System.out.println("compacting...");
+			final long beginCompact = System.nanoTime();
+			rocksDB.compactRange();
+			final long endCompact = System.nanoTime();
 
-		System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
+			System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
 
-		// ----- read (attempt 3) -----
+			// ----- read (attempt 3) -----
 
-		final long beginGet3 = System.nanoTime();
-		rocksDB.get(keyBytes, resultHolder);
-		final long endGet3 = System.nanoTime();
+			final long beginGet3 = System.nanoTime();
+			rocksDB.get(keyBytes, resultHolder);
+			final long endGet3 = System.nanoTime();
 
-		System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
+			System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
+		} finally {
+			rocksDB.close();
+			options.close();
+			write_options.close();
+			FileUtils.deleteDirectory(rocksDir);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea054a7d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
index b427ef1..f46e2cd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
@@ -56,61 +56,68 @@ public class ListViaRangeSpeedMiniBenchmark {
 
 		final String key = "key";
 		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+		
+		try {
 
-		final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-		final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+			final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+			final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
 
-		final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
+			final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
 
-		final Unsafe unsafe = MemoryUtils.UNSAFE;
-		final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
+			final Unsafe unsafe = MemoryUtils.UNSAFE;
+			final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
 
-		final int num = 50000;
-		System.out.println("begin insert");
+			final int num = 50000;
+			System.out.println("begin insert");
 
-		final long beginInsert = System.nanoTime();
-		for (int i = 0; i < num; i++) {
-			unsafe.putInt(keyTemplate, offset, i);
-			rocksDB.put(write_options, keyTemplate, valueBytes);
-		}
-		final long endInsert = System.nanoTime();
-		System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
-
-		final byte[] resultHolder = new byte[num * valueBytes.length];
-
-		final long beginGet = System.nanoTime();
-
-		final RocksIterator iterator = rocksDB.newIterator();
-		int pos = 0;
-
-		// seek to start
-		unsafe.putInt(keyTemplate, offset, 0);
-		iterator.seek(keyTemplate);
-
-		// mark end
-		unsafe.putInt(keyTemplate, offset, -1);
-
-		// iterate
-		while (iterator.isValid()) {
-			byte[] currKey = iterator.key();
-			if (samePrefix(keyBytes, currKey)) {
-				byte[] currValue = iterator.value();
-				System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
-				pos += currValue.length;
-				iterator.next();
+			final long beginInsert = System.nanoTime();
+			for (int i = 0; i < num; i++) {
+				unsafe.putInt(keyTemplate, offset, i);
+				rocksDB.put(write_options, keyTemplate, valueBytes);
 			}
-			else {
-				break;
+			final long endInsert = System.nanoTime();
+			System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
+
+			final byte[] resultHolder = new byte[num * valueBytes.length];
+
+			final long beginGet = System.nanoTime();
+
+			final RocksIterator iterator = rocksDB.newIterator();
+			int pos = 0;
+
+			try {
+				// seek to start
+				unsafe.putInt(keyTemplate, offset, 0);
+				iterator.seek(keyTemplate);
+
+				// mark end
+				unsafe.putInt(keyTemplate, offset, -1);
+
+				// iterate
+				while (iterator.isValid()) {
+					byte[] currKey = iterator.key();
+					if (samePrefix(keyBytes, currKey)) {
+						byte[] currValue = iterator.value();
+						System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
+						pos += currValue.length;
+						iterator.next();
+					} else {
+						break;
+					}
+				}
+			}finally {
+				iterator.close();
 			}
-		}
 
-		final long endGet = System.nanoTime();
+			final long endGet = System.nanoTime();
 
-		System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
-
-		// WriteOptions and RocksDB ultimately extends AbstractNativeReference, so we need to close resource as well.
-		write_options.close();
-		rocksDB.close();
+			System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
+		} finally {
+			rocksDB.close();
+			options.close();
+			write_options.close();
+			FileUtils.deleteDirectory(rocksDir);
+		}
 	}
 
 	private static boolean samePrefix(byte[] prefix, byte[] key) {


[13/13] flink git commit: [FLINK-5672] [scripts] Add special cases for a local setup in cluster start/stop scripts

Posted by se...@apache.org.
[FLINK-5672] [scripts] Add special cases for a local setup in cluster start/stop scripts

This way, if all slaves refer to "localhost", we do not require ssh at all.

This closes #3298


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83061ad0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83061ad0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83061ad0

Branch: refs/heads/master
Commit: 83061ad0f34ab158b6aa7924bba1aa35695817f4
Parents: d6a1551
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Feb 13 15:33:23 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 13:30:29 2017 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/config.sh     | 33 +++++++++++++++++++-
 .../src/main/flink-bin/bin/start-cluster.sh     | 14 ++-------
 .../src/main/flink-bin/bin/stop-cluster.sh      | 14 ++-------
 3 files changed, 36 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/83061ad0/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index dbfdd0e..a16529c 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -410,16 +410,47 @@ readSlaves() {
 
     SLAVES=()
 
+    SLAVES_ALL_LOCALHOST=true
     GOON=true
     while $GOON; do
         read line || GOON=false
         HOST=$( extractHostName $line)
-        if [ -n "$HOST" ]; then
+        if [ -n "$HOST" ] ; then
             SLAVES+=(${HOST})
+            if [ "${HOST}" != "localhost" ] ; then
+                SLAVES_ALL_LOCALHOST=false
+            fi
         fi
     done < "$SLAVES_FILE"
 }
 
+# starts or stops TMs on all slaves
+# TMSlaves start|stop
+TMSlaves() {
+    CMD=$1
+
+    readSlaves
+
+    if [ ${SLAVES_ALL_LOCALHOST} = true ] ; then
+        # all-local setup
+        for slave in ${SLAVES[@]}; do
+            "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
+        done
+    else
+        # non-local setup
+        # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
+        command -v pdsh >/dev/null 2>&1
+        if [[ $? -ne 0 ]]; then
+            for slave in ${SLAVES[@]}; do
+                ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
+            done
+        else
+            PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \
+                "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
+        fi
+    fi
+}
+
 useOffHeapMemory() {
     [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]]
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/83061ad0/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index 7611189..5d2f92b 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -49,15 +49,5 @@ else
 fi
 shopt -u nocasematch
 
-# Start TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
-readSlaves
-
-command -v pdsh >/dev/null 2>&1
-if [[ $? -ne 0 ]]; then
-    for slave in ${SLAVES[@]}; do
-        ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" start &"
-    done
-else
-    PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND="${FLINK_SSH_OPTS}" pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \
-        "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" start"
-fi
+# Start TaskManager instance(s)
+TMSlaves start

http://git-wip-us.apache.org/repos/asf/flink/blob/83061ad0/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
index bc86291..e04d2fa 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
@@ -22,18 +22,8 @@ bin=`cd "$bin"; pwd`
 
 . "$bin"/config.sh
 
-# Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
-readSlaves
-
-command -v pdsh >/dev/null 2>&1
-if [[ $? -ne 0 ]]; then
-    for slave in ${SLAVES[@]}; do
-        ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" stop &"
-    done
-else
-    PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \
-        "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" stop"
-fi
+# Stop TaskManager instance(s)
+TMSlaves stop
 
 # Stop JobManager instance(s)
 shopt -s nocasematch


[10/13] flink git commit: [FLINK-5962] [runtime-web] [tests] Replace deprecated usage og JsonNode#getValueAsText

Posted by se...@apache.org.
[FLINK-5962] [runtime-web] [tests] Replace deprecated usage og JsonNode#getValueAsText

This closes #3679


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3cad3097
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3cad3097
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3cad3097

Branch: refs/heads/master
Commit: 3cad30974e3bf16d1604096f2b0e138768ae795b
Parents: 1b77cc3
Author: zentol <ch...@apache.org>
Authored: Wed Apr 5 23:50:12 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:32:32 2017 +0200

----------------------------------------------------------------------
 ...obCancellationWithSavepointHandlersTest.java | 44 ++++++++++----------
 1 file changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3cad3097/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
index 8b948aa..8c2d3fc 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import akka.dispatch.ExecutionContexts$;
 import akka.dispatch.Futures;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpResponseStatus;
@@ -32,8 +34,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Assert;
 import org.junit.Test;
 import scala.concurrent.ExecutionContext;
@@ -201,9 +201,9 @@ public class JobCancellationWithSavepointHandlersTest {
 		String json = response.content().toString(Charset.forName("UTF-8"));
 		JsonNode root = new ObjectMapper().readTree(json);
 
-		assertEquals("accepted", root.get("status").getValueAsText());
-		assertEquals("1", root.get("request-id").getValueAsText());
-		assertEquals(location, root.get("location").getValueAsText());
+		assertEquals("accepted", root.get("status").asText());
+		assertEquals("1", root.get("request-id").asText());
+		assertEquals(location, root.get("location").asText());
 
 		// Trigger again
 		response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
@@ -215,9 +215,9 @@ public class JobCancellationWithSavepointHandlersTest {
 		json = response.content().toString(Charset.forName("UTF-8"));
 		root = new ObjectMapper().readTree(json);
 
-		assertEquals("accepted", root.get("status").getValueAsText());
-		assertEquals("1", root.get("request-id").getValueAsText());
-		assertEquals(location, root.get("location").getValueAsText());
+		assertEquals("accepted", root.get("status").asText());
+		assertEquals("1", root.get("request-id").asText());
+		assertEquals(location, root.get("location").asText());
 
 		// Only single actual request
 		verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class));
@@ -233,8 +233,8 @@ public class JobCancellationWithSavepointHandlersTest {
 		json = response.content().toString(Charset.forName("UTF-8"));
 		root = new ObjectMapper().readTree(json);
 
-		assertEquals("in-progress", root.get("status").getValueAsText());
-		assertEquals("1", root.get("request-id").getValueAsText());
+		assertEquals("in-progress", root.get("status").asText());
+		assertEquals("1", root.get("request-id").asText());
 
 		// Complete
 		promise.success(new CancellationSuccess(jobId, "_path-savepoint_"));
@@ -249,9 +249,9 @@ public class JobCancellationWithSavepointHandlersTest {
 
 		root = new ObjectMapper().readTree(json);
 
-		assertEquals("success", root.get("status").getValueAsText());
-		assertEquals("1", root.get("request-id").getValueAsText());
-		assertEquals("_path-savepoint_", root.get("savepoint-path").getValueAsText());
+		assertEquals("success", root.get("status").asText());
+		assertEquals("1", root.get("request-id").asText());
+		assertEquals("_path-savepoint_", root.get("savepoint-path").asText());
 
 		// Query again, keep recent history
 
@@ -265,9 +265,9 @@ public class JobCancellationWithSavepointHandlersTest {
 
 		root = new ObjectMapper().readTree(json);
 
-		assertEquals("success", root.get("status").getValueAsText());
-		assertEquals("1", root.get("request-id").getValueAsText());
-		assertEquals("_path-savepoint_", root.get("savepoint-path").getValueAsText());
+		assertEquals("success", root.get("status").asText());
+		assertEquals("1", root.get("request-id").asText());
+		assertEquals("_path-savepoint_", root.get("savepoint-path").asText());
 
 		// Query for unknown request
 		params.put("requestId", "9929");
@@ -281,9 +281,9 @@ public class JobCancellationWithSavepointHandlersTest {
 
 		root = new ObjectMapper().readTree(json);
 
-		assertEquals("failed", root.get("status").getValueAsText());
-		assertEquals("9929", root.get("request-id").getValueAsText());
-		assertEquals("Unknown job/request ID", root.get("cause").getValueAsText());
+		assertEquals("failed", root.get("status").asText());
+		assertEquals("9929", root.get("request-id").asText());
+		assertEquals("Unknown job/request ID", root.get("cause").asText());
 	}
 
 	/**
@@ -327,8 +327,8 @@ public class JobCancellationWithSavepointHandlersTest {
 		String json = response.content().toString(Charset.forName("UTF-8"));
 		JsonNode root = new ObjectMapper().readTree(json);
 
-		assertEquals("failed", root.get("status").getValueAsText());
-		assertEquals("1", root.get("request-id").getValueAsText());
-		assertEquals("Test Exception", root.get("cause").getValueAsText());
+		assertEquals("failed", root.get("status").asText());
+		assertEquals("1", root.get("request-id").asText());
+		assertEquals("Test Exception", root.get("cause").asText());
 	}
 }


[02/13] flink git commit: [hotfix] [docs] Typo sick -> sink

Posted by se...@apache.org.
[hotfix] [docs] Typo sick -> sink

This closes #3749


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f75466fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f75466fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f75466fe

Branch: refs/heads/master
Commit: f75466fecf494e5da700488144ae79667f2ed27e
Parents: c974684
Author: Marc Tremblay <ma...@gmail.com>
Authored: Fri Apr 21 00:22:14 2017 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 10:16:24 2017 +0200

----------------------------------------------------------------------
 docs/dev/connectors/filesystem_sink.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f75466fe/docs/dev/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index 67250f0..d12752f 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -44,7 +44,7 @@ cluster execution.
 #### Bucketing File Sink
 
 The bucketing behaviour as well as the writing can be configured but we will get to that later.
-This is how you can create a bucketing sick which by default, sinks to rolling files that are split by time:
+This is how you can create a bucketing sink which by default, sinks to rolling files that are split by time:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">