You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/04/21 12:24:15 UTC
[01/13] flink git commit: [FLINK-5623] [runtime] Fix TempBarrier dam
has been closed
Repository: flink
Updated Branches:
refs/heads/master f9eac3afd -> 83061ad0f
[FLINK-5623] [runtime] Fix TempBarrier dam has been closed
Properly reset the "pipeline breaker" upon closing.
This closes #3747
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9746846
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9746846
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9746846
Branch: refs/heads/master
Commit: c9746846b357d8ce538ff872cea60c52b1904b43
Parents: f9eac3a
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Apr 20 08:46:01 2017 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 10:15:26 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/graph/library/link_analysis/PageRank.java | 5 -----
.../main/java/org/apache/flink/runtime/operators/BatchTask.java | 1 +
2 files changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c9746846/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
index c5c4178..747735e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
@@ -45,7 +45,6 @@ import org.apache.flink.graph.asm.result.UnaryResult;
import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
import org.apache.flink.graph.library.link_analysis.PageRank.Result;
import org.apache.flink.graph.utils.GraphUtils;
-import org.apache.flink.graph.utils.GraphUtils.IdentityMapper;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.types.DoubleValue;
@@ -176,10 +175,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
.run(new VertexDegrees<K, VV, EV>()
.setParallelism(parallelism));
- // prevent Exception "The dam has been closed." in TempBarrier
- // for a simplified Graph as in PageRankITCase (see FLINK-5623)
- vertexDegree = vertexDegree.map(new IdentityMapper<Vertex<K, Degrees>>());
-
// vertex count
DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree);
http://git-wip-us.apache.org/repos/asf/flink/blob/c9746846/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index f748079..87b0a76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -884,6 +884,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
// close the async barrier if there is one
if (this.tempBarriers[i] != null) {
this.tempBarriers[i].close();
+ this.tempBarriers[i] = null;
}
// recreate the local strategy
[05/13] flink git commit: [misc] Closing commit for outdated pull
request
Posted by se...@apache.org.
[misc] Closing commit for outdated pull request
This closes #3592
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a70524a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a70524a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a70524a
Branch: refs/heads/master
Commit: 7a70524acfe85c5b634f18f3cbe6dd32be302e37
Parents: eef85e0
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Apr 21 11:05:00 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 11:05:00 2017 +0200
----------------------------------------------------------------------
----------------------------------------------------------------------
[11/13] flink git commit: [hotfix] [docs] Remove empty Docker docs
page
Posted by se...@apache.org.
[hotfix] [docs] Remove empty Docker docs page
This will be re-added when we have a coherent plan for Docker support
and after the official Docker images are available.
This closes #3658
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/918d25ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/918d25ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/918d25ef
Branch: refs/heads/master
Commit: 918d25efffc3a59347cc5ac96e56f9eadff2d716
Parents: 3cad309
Author: Patrick Lucas <me...@patricklucas.com>
Authored: Fri Mar 31 17:00:21 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:35:06 2017 +0200
----------------------------------------------------------------------
docs/setup/docker.md | 29 -----------------------------
1 file changed, 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/918d25ef/docs/setup/docker.md
----------------------------------------------------------------------
diff --git a/docs/setup/docker.md b/docs/setup/docker.md
deleted file mode 100644
index 8b9757f..0000000
--- a/docs/setup/docker.md
+++ /dev/null
@@ -1,29 +0,0 @@
----
-title: "Docker Setup"
-nav-title: Docker
-nav-parent_id: deployment
-nav-pos: 4
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* This will be replaced by the TOC
-{:toc}
-
-
[07/13] flink git commit: [hotfix] [rocksdb] Convert performance
benchmarks to unit tests
Posted by se...@apache.org.
[hotfix] [rocksdb] Convert performance benchmarks to unit tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05065451
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05065451
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05065451
Branch: refs/heads/master
Commit: 05065451abf5917f796b00692f43f0a2d2f3ed48
Parents: ea054a7
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Apr 21 12:19:04 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:19:04 2017 +0200
----------------------------------------------------------------------
.../ListViaMergeSpeedMiniBenchmark.java | 111 ----------
.../ListViaRangeSpeedMiniBenchmark.java | 132 ------------
.../state/benchmark/RocksDBPerformanceTest.java | 204 +++++++++++++++++++
3 files changed, 204 insertions(+), 243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/05065451/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
deleted file mode 100644
index f3e084f..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.benchmark;
-
-import org.apache.flink.util.FileUtils;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.StringAppendOperator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-
-public class ListViaMergeSpeedMiniBenchmark {
-
- public static void main(String[] args) throws Exception {
- final File rocksDir = new File("/tmp/rdb");
- FileUtils.deleteDirectory(rocksDir);
-
- final Options options = new Options()
- .setCompactionStyle(CompactionStyle.LEVEL)
- .setLevelCompactionDynamicLevelBytes(true)
- .setIncreaseParallelism(4)
- .setUseFsync(false)
- .setMaxOpenFiles(-1)
- .setDisableDataSync(true)
- .setCreateIfMissing(true)
- .setMergeOperator(new StringAppendOperator());
-
- final WriteOptions write_options = new WriteOptions()
- .setSync(false)
- .setDisableWAL(true);
-
- final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
-
- final String key = "key";
- final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
-
- try {
- final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
- final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
-
- final int num = 50000;
-
- // ----- insert -----
- System.out.println("begin insert");
-
- final long beginInsert = System.nanoTime();
- for (int i = 0; i < num; i++) {
- rocksDB.merge(write_options, keyBytes, valueBytes);
- }
- final long endInsert = System.nanoTime();
- System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
-
- // ----- read (attempt 1) -----
-
- final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
- final long beginGet1 = System.nanoTime();
- rocksDB.get(keyBytes, resultHolder);
- final long endGet1 = System.nanoTime();
-
- System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
-
- // ----- read (attempt 2) -----
-
- final long beginGet2 = System.nanoTime();
- rocksDB.get(keyBytes, resultHolder);
- final long endGet2 = System.nanoTime();
-
- System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
-
- // ----- compact -----
- System.out.println("compacting...");
- final long beginCompact = System.nanoTime();
- rocksDB.compactRange();
- final long endCompact = System.nanoTime();
-
- System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
-
- // ----- read (attempt 3) -----
-
- final long beginGet3 = System.nanoTime();
- rocksDB.get(keyBytes, resultHolder);
- final long endGet3 = System.nanoTime();
-
- System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
- } finally {
- rocksDB.close();
- options.close();
- write_options.close();
- FileUtils.deleteDirectory(rocksDir);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/05065451/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
deleted file mode 100644
index f46e2cd..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.benchmark;
-
-import org.apache.flink.core.memory.MemoryUtils;
-import org.apache.flink.util.FileUtils;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.StringAppendOperator;
-import org.rocksdb.WriteOptions;
-import sun.misc.Unsafe;
-
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-
-public class ListViaRangeSpeedMiniBenchmark {
-
- public static void main(String[] args) throws Exception {
- final File rocksDir = new File("/tmp/rdb");
- FileUtils.deleteDirectory(rocksDir);
-
- final Options options = new Options()
- .setCompactionStyle(CompactionStyle.LEVEL)
- .setLevelCompactionDynamicLevelBytes(true)
- .setIncreaseParallelism(4)
- .setUseFsync(false)
- .setMaxOpenFiles(-1)
- .setDisableDataSync(true)
- .setCreateIfMissing(true)
- .setMergeOperator(new StringAppendOperator());
-
- final WriteOptions write_options = new WriteOptions()
- .setSync(false)
- .setDisableWAL(true);
-
- final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
-
- final String key = "key";
- final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
-
- try {
-
- final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
- final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
-
- final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
-
- final Unsafe unsafe = MemoryUtils.UNSAFE;
- final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
-
- final int num = 50000;
- System.out.println("begin insert");
-
- final long beginInsert = System.nanoTime();
- for (int i = 0; i < num; i++) {
- unsafe.putInt(keyTemplate, offset, i);
- rocksDB.put(write_options, keyTemplate, valueBytes);
- }
- final long endInsert = System.nanoTime();
- System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
-
- final byte[] resultHolder = new byte[num * valueBytes.length];
-
- final long beginGet = System.nanoTime();
-
- final RocksIterator iterator = rocksDB.newIterator();
- int pos = 0;
-
- try {
- // seek to start
- unsafe.putInt(keyTemplate, offset, 0);
- iterator.seek(keyTemplate);
-
- // mark end
- unsafe.putInt(keyTemplate, offset, -1);
-
- // iterate
- while (iterator.isValid()) {
- byte[] currKey = iterator.key();
- if (samePrefix(keyBytes, currKey)) {
- byte[] currValue = iterator.value();
- System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
- pos += currValue.length;
- iterator.next();
- } else {
- break;
- }
- }
- }finally {
- iterator.close();
- }
-
- final long endGet = System.nanoTime();
-
- System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
- } finally {
- rocksDB.close();
- options.close();
- write_options.close();
- FileUtils.deleteDirectory(rocksDir);
- }
- }
-
- private static boolean samePrefix(byte[] prefix, byte[] key) {
- for (int i = 0; i < prefix.length; i++) {
- if (prefix[i] != key [i]) {
- return false;
- }
- }
-
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/05065451/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
new file mode 100644
index 0000000..011703e
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Test that validates that the performance of RocksDB is as expected.
+ * This test guards against the bug filed as 'FLINK-5756'
+ */
+public class RocksDBPerformanceTest extends TestLogger {
+
+ @Rule
+ public final TemporaryFolder TMP = new TemporaryFolder();
+
+ @Test(timeout = 2000)
+ public void testRocksDbMergePerformance() throws Exception {
+ final File rocksDir = TMP.newFolder("rdb");
+
+ final String key = "key";
+ final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+ final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+ final int num = 50000;
+
+ try (
+ final Options options = new Options()
+ .setCompactionStyle(CompactionStyle.LEVEL)
+ .setLevelCompactionDynamicLevelBytes(true)
+ .setIncreaseParallelism(4)
+ .setUseFsync(false)
+ .setMaxOpenFiles(-1)
+ .setDisableDataSync(true)
+ .setCreateIfMissing(true)
+ .setMergeOperator(new StringAppendOperator());
+
+ final WriteOptions write_options = new WriteOptions()
+ .setSync(false)
+ .setDisableWAL(true);
+
+ final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()))
+ {
+ // ----- insert -----
+ log.info("begin insert");
+
+ final long beginInsert = System.nanoTime();
+ for (int i = 0; i < num; i++) {
+ rocksDB.merge(write_options, keyBytes, valueBytes);
+ }
+ final long endInsert = System.nanoTime();
+ log.info("end insert - duration: {} ms", (endInsert - beginInsert) / 1_000_000);
+
+ // ----- read (attempt 1) -----
+
+ final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
+ final long beginGet1 = System.nanoTime();
+ rocksDB.get(keyBytes, resultHolder);
+ final long endGet1 = System.nanoTime();
+
+ log.info("end get - duration: {} ms", (endGet1 - beginGet1) / 1_000_000);
+
+ // ----- read (attempt 2) -----
+
+ final long beginGet2 = System.nanoTime();
+ rocksDB.get(keyBytes, resultHolder);
+ final long endGet2 = System.nanoTime();
+
+ log.info("end get - duration: {} ms", (endGet2 - beginGet2) / 1_000_000);
+
+ // ----- compact -----
+ log.info("compacting...");
+ final long beginCompact = System.nanoTime();
+ rocksDB.compactRange();
+ final long endCompact = System.nanoTime();
+
+ log.info("end compaction - duration: {} ms", (endCompact - beginCompact) / 1_000_000);
+
+ // ----- read (attempt 3) -----
+
+ final long beginGet3 = System.nanoTime();
+ rocksDB.get(keyBytes, resultHolder);
+ final long endGet3 = System.nanoTime();
+
+ log.info("end get - duration: {} ms", (endGet3 - beginGet3) / 1_000_000);
+ }
+ }
+
+ @Test(timeout = 2000)
+ public void testRocksDbRangeGetPerformance() throws Exception {
+ final File rocksDir = TMP.newFolder("rdb");
+
+ final String key = "key";
+ final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+ final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+ final int num = 50000;
+
+ try (
+ final Options options = new Options()
+ .setCompactionStyle(CompactionStyle.LEVEL)
+ .setLevelCompactionDynamicLevelBytes(true)
+ .setIncreaseParallelism(4)
+ .setUseFsync(false)
+ .setMaxOpenFiles(-1)
+ .setDisableDataSync(true)
+ .setCreateIfMissing(true)
+ .setMergeOperator(new StringAppendOperator());
+
+ final WriteOptions write_options = new WriteOptions()
+ .setSync(false)
+ .setDisableWAL(true);
+
+ final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()))
+ {
+ final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
+
+ final Unsafe unsafe = MemoryUtils.UNSAFE;
+ final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
+
+ log.info("begin insert");
+
+ final long beginInsert = System.nanoTime();
+ for (int i = 0; i < num; i++) {
+ unsafe.putInt(keyTemplate, offset, i);
+ rocksDB.put(write_options, keyTemplate, valueBytes);
+ }
+ final long endInsert = System.nanoTime();
+ log.info("end insert - duration: {} ms", (endInsert - beginInsert) / 1_000_000);
+
+ @SuppressWarnings("MismatchedReadAndWriteOfArray")
+ final byte[] resultHolder = new byte[num * valueBytes.length];
+
+ final long beginGet = System.nanoTime();
+
+ int pos = 0;
+
+ try (final RocksIterator iterator = rocksDB.newIterator()) {
+ // seek to start
+ unsafe.putInt(keyTemplate, offset, 0);
+ iterator.seek(keyTemplate);
+
+ // iterate
+ while (iterator.isValid() && samePrefix(keyBytes, iterator.key())) {
+ byte[] currValue = iterator.value();
+ System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
+ pos += currValue.length;
+ iterator.next();
+ }
+ }
+
+ final long endGet = System.nanoTime();
+
+ log.info("end get - duration: {} ms", (endGet - beginGet) / 1_000_000);
+ }
+ }
+
+
+ private static boolean samePrefix(byte[] prefix, byte[] key) {
+ for (int i = 0; i < prefix.length; i++) {
+ if (prefix[i] != key [i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
[08/13] flink git commit: [FLINK-6290] [CEP] Fix SharedBuffer release
when having multiple edges between entries
Posted by se...@apache.org.
[FLINK-6290] [CEP] Fix SharedBuffer release when having multiple edges between entries
This closes #3706
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0ea74a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0ea74a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0ea74a0
Branch: refs/heads/master
Commit: c0ea74a0a4eedf4fe73e8f383e837ea373216476
Parents: 0506545
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Mon Apr 10 16:57:48 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:22:00 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/cep/nfa/SharedBuffer.java | 5 ++--
.../apache/flink/cep/nfa/SharedBufferTest.java | 28 ++++++++++++++++++++
2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c0ea74a0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index ccc6884..43c2aca 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -274,7 +274,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
public void release(final K key, final V value, final long timestamp) {
SharedBufferEntry<K, V> entry = get(key, value, timestamp);
if (entry != null) {
- entry.decreaseReferenceCounter();
internalRemove(entry);
}
}
@@ -493,13 +492,13 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
while (!entriesToRemove.isEmpty()) {
SharedBufferEntry<K, V> currentEntry = entriesToRemove.pop();
+ currentEntry.decreaseReferenceCounter();
if (currentEntry.getReferenceCounter() == 0) {
currentEntry.remove();
- for (SharedBufferEdge<K, V> edge: currentEntry.getEdges()) {
+ for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
if (edge.getTarget() != null) {
- edge.getTarget().decreaseReferenceCounter();
entriesToRemove.push(edge.getTarget());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0ea74a0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index f0a25d2..adc07b3 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -32,6 +32,7 @@ import java.util.Collection;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SharedBufferTest extends TestLogger {
@@ -138,4 +139,31 @@ public class SharedBufferTest extends TestLogger {
assertEquals(sharedBuffer, copy);
}
+
+ @Test
+ public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
+ SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer());
+ int numberEvents = 8;
+ Event[] events = new Event[numberEvents];
+ final long timestamp = 1L;
+
+ for (int i = 0; i < numberEvents; i++) {
+ events[i] = new Event(i + 1, "e" + (i + 1), i);
+ }
+
+ sharedBuffer.put("start", events[1], timestamp, DeweyNumber.fromString("1"));
+ sharedBuffer.put("branching", events[2], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.0"));
+ sharedBuffer.put("branching", events[3], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.1"));
+ sharedBuffer.put("branching", events[3], timestamp, "branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
+ sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
+ sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));
+
+ //simulate IGNORE (next event can point to events[2])
+ sharedBuffer.lock("branching", events[2], timestamp);
+
+ sharedBuffer.release("branching", events[4], timestamp);
+
+ //There should be still events[1] and events[2] in the buffer
+ assertFalse(sharedBuffer.isEmpty());
+ }
}
[04/13] flink git commit: [FLINK-6117] [security] Make setting of
'zookeeper.sasl.disable' work correctly
Posted by se...@apache.org.
[FLINK-6117] [security] Make setting of 'zookeeper.sasl.disable' work correctly
This closes #3600
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eef85e09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eef85e09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eef85e09
Branch: refs/heads/master
Commit: eef85e095a8a0e4c4553631b74ba7b9f173cebf0
Parents: daf4038
Author: zcb <20...@qq.com>
Authored: Thu Mar 23 03:44:10 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 10:37:44 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/configuration/SecurityOptions.java | 4 ++++
.../org/apache/flink/runtime/security/SecurityUtils.java | 7 +++++++
.../flink/runtime/security/modules/ZooKeeperModule.java | 10 ++++++++++
.../org/apache/flink/test/util/SecureTestEnvironment.java | 1 +
4 files changed, 22 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/eef85e09/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 95cf0c7..3763198 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -55,6 +55,10 @@ public class SecurityOptions {
// ZooKeeper Security Options
// ------------------------------------------------------------------------
+ public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE =
+ key("zookeeper.sasl.disable")
+ .defaultValue(false);
+
public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME =
key("zookeeper.sasl.service-name")
.defaultValue("zookeeper");
http://git-wip-us.apache.org/repos/asf/flink/blob/eef85e09/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index d76e7a5..7a09c32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -125,6 +125,8 @@ public class SecurityUtils {
private final org.apache.hadoop.conf.Configuration hadoopConf;
+ private final boolean isZkSaslDisable;
+
private final boolean useTicketCache;
private final String keytab;
@@ -164,6 +166,7 @@ public class SecurityUtils {
org.apache.hadoop.conf.Configuration hadoopConf,
List<? extends Class<? extends SecurityModule>> securityModules) {
this.hadoopConf = checkNotNull(hadoopConf);
+ this.isZkSaslDisable = flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
@@ -175,6 +178,10 @@ public class SecurityUtils {
validate();
}
+ public boolean isZkSaslDisable() {
+ return isZkSaslDisable;
+ }
+
public String getKeytab() {
return keytab;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eef85e09/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
index c0ba4a5..216bdde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
@@ -41,6 +41,8 @@ public class ZooKeeperModule implements SecurityModule {
*/
private static final String ZK_LOGIN_CONTEXT_NAME = "zookeeper.sasl.clientconfig";
+ private String priorSaslEnable;
+
private String priorServiceName;
private String priorLoginContextName;
@@ -48,6 +50,9 @@ public class ZooKeeperModule implements SecurityModule {
@Override
public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException {
+ priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, null);
+ System.setProperty(ZK_ENABLE_CLIENT_SASL, String.valueOf(!configuration.isZkSaslDisable()));
+
priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
if (!"zookeeper".equals(configuration.getZooKeeperServiceName())) {
System.setProperty(ZK_SASL_CLIENT_USERNAME, configuration.getZooKeeperServiceName());
@@ -61,6 +66,11 @@ public class ZooKeeperModule implements SecurityModule {
@Override
public void uninstall() throws SecurityInstallException {
+ if(priorSaslEnable != null) {
+ System.setProperty(ZK_ENABLE_CLIENT_SASL, priorSaslEnable);
+ } else {
+ System.clearProperty(ZK_ENABLE_CLIENT_SASL);
+ }
if(priorServiceName != null) {
System.setProperty(ZK_SASL_CLIENT_USERNAME, priorServiceName);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/eef85e09/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index febd074..98deee6 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -138,6 +138,7 @@ public class SecureTestEnvironment {
//ctx.setHadoopConfiguration() for the UGI implementation to work properly.
//See Yarn test case module for reference
Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
+ flinkConfig.setBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE, false);
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, testKeytab);
flinkConfig.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, false);
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, testPrincipal);
[12/13] flink git commit: [FLINK-6236] [docs] Mention that resume
from savepoint is possible via the web UI
Posted by se...@apache.org.
[FLINK-6236] [docs] Mention that resume from savepoint is possible via the web UI
This closes #3657
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6a1551f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6a1551f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6a1551f
Branch: refs/heads/master
Commit: d6a1551f90edfa534a57244a3be46e54e99dfeae
Parents: 918d25e
Author: rami-alisawi <ra...@users.noreply.github.com>
Authored: Fri Mar 31 17:11:29 2017 +0300
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:39:05 2017 +0200
----------------------------------------------------------------------
docs/setup/savepoints.md | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d6a1551f/docs/setup/savepoints.md
----------------------------------------------------------------------
diff --git a/docs/setup/savepoints.md b/docs/setup/savepoints.md
index da0784d..4bdc43f 100644
--- a/docs/setup/savepoints.md
+++ b/docs/setup/savepoints.md
@@ -69,6 +69,8 @@ In the above example, the print sink is stateless and hence not part of the save
You can use the [command line client]({{ site.baseurl }}/setup/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a savepoint*, *resume from savepoints*, and *dispose savepoints*.
+With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the webui.
+
### Triggering Savepoints
When triggering a savepoint, a single savepoint file will be created that contains the checkpoint *meta data*. The actual checkpoint state will be kept around in the configured checkpoint directory. For example with a `FsStateBackend` or `RocksDBStateBackend`:
[03/13] flink git commit: [FLINK-6176] [scripts] [yarn] [mesos] Add
JARs to CLASSPATH deterministically
Posted by se...@apache.org.
[FLINK-6176] [scripts] [yarn] [mesos] Add JARs to CLASSPATH deterministically
Sorts files read from Flink's lib directory and places the distribution
JAR to the end of the CLASSPATH.
This closes #3632
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daf4038c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daf4038c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daf4038c
Branch: refs/heads/master
Commit: daf4038c88b459084a1232c69fb584a7e2e100da
Parents: f75466f
Author: Greg Hogan <co...@greghogan.com>
Authored: Sun Mar 26 15:46:00 2017 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 10:30:44 2017 +0200
----------------------------------------------------------------------
flink-dist/src/main/flink-bin/bin/config.sh | 18 +++++++++--
.../main/flink-bin/mesos-bin/mesos-appmaster.sh | 17 +----------
.../flink-bin/mesos-bin/mesos-taskmanager.sh | 17 +----------
.../src/main/flink-bin/yarn-bin/yarn-session.sh | 17 +----------
.../yarn/AbstractYarnClusterDescriptor.java | 32 +++++++++++---------
5 files changed, 35 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 5ef4ef7..dbfdd0e 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -18,16 +18,28 @@
################################################################################
constructFlinkClassPath() {
+ local FLINK_DIST
+ local FLINK_CLASSPATH
while read -d '' -r jarfile ; do
- if [[ $FLINK_CLASSPATH = "" ]]; then
+ if [[ "$jarfile" =~ .*flink-dist.*.jar ]]; then
+ FLINK_DIST="$FLINK_DIST":"$jarfile"
+ elif [[ "$FLINK_CLASSPATH" == "" ]]; then
FLINK_CLASSPATH="$jarfile";
else
FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
fi
- done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
+ done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)
- echo $FLINK_CLASSPATH
+ if [[ "$FLINK_DIST" == "" ]]; then
+ # write error message to stderr since stdout is stored as the classpath
+ (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
+
+ # exit function with empty classpath to force process failure
+ exit 1
+ fi
+
+ echo "$FLINK_CLASSPATH""$FLINK_DIST"
}
# These are used to mangle paths that are passed to java when using
http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
index 6fae6c2..67eab9d 100755
--- a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
@@ -23,26 +23,11 @@ bin=`cd "$bin"; pwd`
# get Flink config
. "$bin"/config.sh
-# auxilliary function to construct a lightweight classpath for the
-# Flink AppMaster
-constructAppMasterClassPath() {
-
- while read -d '' -r jarfile ; do
- if [[ $CC_CLASSPATH = "" ]]; then
- CC_CLASSPATH="$jarfile";
- else
- CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
- fi
- done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
-
- echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
-}
-
if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi
-CC_CLASSPATH=`manglePathList $(constructAppMasterClassPath)`
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
index 23a301b..ab2f7b1 100755
--- a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
@@ -23,22 +23,7 @@ bin=`cd "$bin"; pwd`
# get Flink config
. "$bin"/config.sh
-# auxilliary function to construct a lightweight classpath for the
-# Flink TaskManager
-constructTaskManagerClassPath() {
-
- while read -d '' -r jarfile ; do
- if [[ $CC_CLASSPATH = "" ]]; then
- CC_CLASSPATH="$jarfile";
- else
- CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
- fi
- done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
-
- echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
-}
-
-CC_CLASSPATH=`manglePathList $(constructTaskManagerClassPath)`
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
log=flink-taskmanager.log
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
index 1755f32..03b1e3a 100755
--- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
+++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
@@ -29,22 +29,7 @@ fi
JVM_ARGS="$JVM_ARGS -Xmx512m"
-# auxilliary function to construct a lightweight classpath for the
-# Flink CLI client
-constructCLIClientClassPath() {
-
- while read -d '' -r jarfile ; do
- if [[ $CC_CLASSPATH = "" ]]; then
- CC_CLASSPATH="$jarfile";
- else
- CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
- fi
- done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
-
- echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
-}
-
-CC_CLASSPATH=`manglePathList $(constructCLIClientClassPath)`
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"
http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ec7af5a..a5a6c36 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -23,10 +23,10 @@ import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -63,10 +63,10 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.ObjectOutputStream;
+import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
@@ -669,12 +669,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
final Map<String, LocalResource> localResources = new HashMap<>(2 + effectiveShipFiles.size());
// list of remote paths (after upload)
final List<Path> paths = new ArrayList<>(2 + effectiveShipFiles.size());
- // classpath assembler
- final StringBuilder classPathBuilder = new StringBuilder();
// ship list that enables reuse of resources for task manager containers
StringBuilder envShipFileList = new StringBuilder();
// upload and register ship files
+ final List<String> classPaths = new ArrayList<>();
for (File shipFile : effectiveShipFiles) {
LocalResource shipResources = Records.newRecord(LocalResource.class);
@@ -693,29 +692,32 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() {
@Override
- public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttributes attrs)
+ public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
throws IOException {
- super.preVisitDirectory(dir, attrs);
-
- java.nio.file.Path relativePath = parentPath.relativize(dir);
+ java.nio.file.Path relativePath = parentPath.relativize(file);
- classPathBuilder
- .append(relativePath)
- .append(File.separator)
- .append("*")
- .append(File.pathSeparator);
+ classPaths.add(relativePath.toString());
return FileVisitResult.CONTINUE;
}
});
} else {
// add files to the classpath
- classPathBuilder.append(shipFile.getName()).append(File.pathSeparator);
+ classPaths.add(shipFile.getName());
}
envShipFileList.append(remotePath).append(",");
}
+ // normalize classpath by sorting
+ Collections.sort(classPaths);
+
+ // classpath assembler
+ StringBuilder classPathBuilder = new StringBuilder();
+ for (String classPath : classPaths) {
+ classPathBuilder.append(classPath).append(File.pathSeparator);
+ }
+
// Setup jar for ApplicationMaster
LocalResource appMasterJar = Records.newRecord(LocalResource.class);
LocalResource flinkConf = Records.newRecord(LocalResource.class);
[09/13] flink git commit: [FLINK-4769] [metrics] Port metric config
parameters to ConfigOptions
Posted by se...@apache.org.
[FLINK-4769] [metrics] Port metric config parameters to ConfigOptions
This closes #3687
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b77cc3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b77cc3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b77cc3f
Branch: refs/heads/master
Commit: 1b77cc3f2682afd839ce6b16cd1658bbd01a6b04
Parents: c0ea74a
Author: zentol <ch...@apache.org>
Authored: Thu Apr 6 13:47:33 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:31:30 2017 +0200
----------------------------------------------------------------------
.../connectors/kafka/KafkaTestBase.java | 3 +-
.../flink/configuration/ConfigConstants.java | 44 +++++-----
.../flink/configuration/MetricOptions.java | 88 ++++++++++++++++++++
.../ScheduledDropwizardReporterTest.java | 7 +-
.../DropwizardFlinkHistogramWrapperTest.java | 3 +-
.../flink/metrics/jmx/JMXReporterTest.java | 9 +-
.../jobmanager/JMXJobManagerMetricTest.java | 5 +-
.../metrics/statsd/StatsDReporterTest.java | 11 +--
.../metrics/MetricRegistryConfiguration.java | 24 ++----
.../runtime/metrics/scope/ScopeFormat.java | 49 -----------
.../runtime/metrics/scope/ScopeFormats.java | 32 +++----
.../runtime/metrics/MetricRegistryTest.java | 29 +++----
.../metrics/groups/AbstractMetricGroupTest.java | 7 +-
.../metrics/groups/JobManagerGroupTest.java | 4 +-
.../metrics/groups/JobManagerJobGroupTest.java | 10 +--
.../groups/MetricGroupRegistrationTest.java | 3 +-
.../metrics/groups/TaskManagerGroupTest.java | 4 +-
.../metrics/groups/TaskManagerJobGroupTest.java | 10 +--
.../metrics/groups/TaskMetricGroupTest.java | 10 +--
.../api/operators/AbstractStreamOperator.java | 8 +-
20 files changed, 196 insertions(+), 164 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 0c6bfa9..a21a239 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.client.JobExecutionException;
@@ -116,7 +117,7 @@ public abstract class KafkaTestBase extends TestLogger {
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
- flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
+ flinkConfig.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
return flinkConfig;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a035beb..61c1b27 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -990,21 +990,8 @@ public final class ConfigConstants {
// ---------------------------- Metrics -----------------------------------
- /**
- * The list of named reporters. Names are defined here and per-reporter configs
- * are given with the reporter config prefix and the reporter name.
- *
- * Example:
- * <pre>{@code
- * metrics.reporters = foo, bar
- *
- * metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter
- * metrics.reporter.foo.interval = 10
- *
- * metrics.reporter.bar.class = org.apache.flink.metrics.graphite.GraphiteReporter
- * metrics.reporter.bar.port = 1337
- * }</pre>
- */
+ /** @deprecated Use {@link MetricOptions#REPORTERS_LIST} instead. */
+ @Deprecated
public static final String METRICS_REPORTERS_LIST = "metrics.reporters";
/**
@@ -1022,28 +1009,36 @@ public final class ConfigConstants {
/** The delimiter used to assemble the metric identifier. This is used as a suffix in an actual reporter config. */
public static final String METRICS_REPORTER_SCOPE_DELIMITER = "scope.delimiter";
- /** The delimiter used to assemble the metric identifier. */
+ /** @deprecated Use {@link MetricOptions#SCOPE_DELIMITER} instead. */
+ @Deprecated
public static final String METRICS_SCOPE_DELIMITER = "metrics.scope.delimiter";
- /** The scope format string that is applied to all metrics scoped to a JobManager. */
+ /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_JM} instead. */
+ @Deprecated
public static final String METRICS_SCOPE_NAMING_JM = "metrics.scope.jm";
- /** The scope format string that is applied to all metrics scoped to a TaskManager. */
+ /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TM} instead. */
+ @Deprecated
public static final String METRICS_SCOPE_NAMING_TM = "metrics.scope.tm";
- /** The scope format string that is applied to all metrics scoped to a job on a JobManager. */
+ /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_JM_JOB} instead. */
+ @Deprecated
public static final String METRICS_SCOPE_NAMING_JM_JOB = "metrics.scope.jm.job";
- /** The scope format string that is applied to all metrics scoped to a job on a TaskManager. */
+ /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TM_JOB} instead. */
+ @Deprecated
public static final String METRICS_SCOPE_NAMING_TM_JOB = "metrics.scope.tm.job";
- /** The scope format string that is applied to all metrics scoped to a task. */
+ /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TASK} instead. */
+ @Deprecated
public static final String METRICS_SCOPE_NAMING_TASK = "metrics.scope.task";
- /** The scope format string that is applied to all metrics scoped to an operator. */
+ /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_OPERATOR} instead. */
+ @Deprecated
public static final String METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator";
- /** The number of measured latencies to maintain at each operator */
+ /** @deprecated Use {@link MetricOptions#LATENCY_HISTORY_SIZE} instead. */
+ @Deprecated
public static final String METRICS_LATENCY_HISTORY_SIZE = "metrics.latency.history-size";
@@ -1498,7 +1493,8 @@ public final class ConfigConstants {
// ----------------------------- Metrics ----------------------------
- /** The default number of measured latencies to maintain at each operator */
+ /** @deprecated Use {@link MetricOptions#LATENCY_HISTORY_SIZE} instead. */
+ @Deprecated
public static final int DEFAULT_METRICS_LATENCY_HISTORY_SIZE = 128;
// ----------------------------- Environment Variables ----------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
new file mode 100644
index 0000000..8a328ac
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+@PublicEvolving
+public class MetricOptions {
+
+ /**
+ * The list of named reporters. Names are defined here and per-reporter configs
+ * are given with the reporter config prefix and the reporter name.
+ *
+ * Example:
+ * <pre>{@code
+ * metrics.reporters = foo, bar
+ *
+ * metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter
+ * metrics.reporter.foo.interval = 10
+ *
+ * metrics.reporter.bar.class = org.apache.flink.metrics.graphite.GraphiteReporter
+ * metrics.reporter.bar.port = 1337
+ * }</pre>
+ */
+ public static final ConfigOption<String> REPORTERS_LIST =
+ key("metrics.reporters")
+ .noDefaultValue();
+
+ /** The delimiter used to assemble the metric identifier. */
+ public static final ConfigOption<String> SCOPE_DELIMITER =
+ key("metrics.scope.delimiter")
+ .defaultValue(".");
+
+ /** The scope format string that is applied to all metrics scoped to a JobManager. */
+ public static final ConfigOption<String> SCOPE_NAMING_JM =
+ key("metrics.scope.jm")
+ .defaultValue("<host>.jobmanager");
+
+ /** The scope format string that is applied to all metrics scoped to a TaskManager. */
+ public static final ConfigOption<String> SCOPE_NAMING_TM =
+ key("metrics.scope.tm")
+ .defaultValue("<host>.taskmanager.<tm_id>");
+
+ /** The scope format string that is applied to all metrics scoped to a job on a JobManager. */
+ public static final ConfigOption<String> SCOPE_NAMING_JM_JOB =
+ key("metrics.scope.jm.job")
+ .defaultValue("<host>.jobmanager.<job_name>");
+
+ /** The scope format string that is applied to all metrics scoped to a job on a TaskManager. */
+ public static final ConfigOption<String> SCOPE_NAMING_TM_JOB =
+ key("metrics.scope.tm.job")
+ .defaultValue("<host>.taskmanager.<tm_id>.<job_name>");
+
+ /** The scope format string that is applied to all metrics scoped to a task. */
+ public static final ConfigOption<String> SCOPE_NAMING_TASK =
+ key("metrics.scope.task")
+ .defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>");
+
+ /** The scope format string that is applied to all metrics scoped to an operator. */
+ public static final ConfigOption<String> SCOPE_NAMING_OPERATOR =
+ key("metrics.scope.operator")
+ .defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>");
+
+ /** The number of measured latencies to maintain at each operator */
+ public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
+ key("metrics.latency.history-size")
+ .defaultValue(128);
+
+ private MetricOptions() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 87c4ccb..73e7f0b 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.ScheduledReporter;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
@@ -76,13 +77,13 @@ public class ScheduledDropwizardReporterTest {
String taskManagerId = "tas:kMana::ger";
String counterName = "testCounter";
- configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+ configuration.setString(MetricOptions.REPORTERS_LIST, "test");
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");
- configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
- configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
+ configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
+ configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 2f2a536..8f7796c 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -28,6 +28,7 @@ import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
@@ -96,7 +97,7 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
int size = 10;
String histogramMetricName = "histogram";
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
+ config.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS");
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 1a96287..85ab897 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.metrics.jmx;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.metrics.reporter.MetricReporter;
@@ -94,7 +95,7 @@ public class JMXReporterTest extends TestLogger {
@Test
public void testPortConflictHandling() throws Exception {
Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+ cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9020-9035");
@@ -154,7 +155,7 @@ public class JMXReporterTest extends TestLogger {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
- cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+ cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9040-9055");
@@ -230,7 +231,7 @@ public class JMXReporterTest extends TestLogger {
try {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
+ config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
@@ -280,7 +281,7 @@ public class JMXReporterTest extends TestLogger {
try {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
+ config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 934a621..6d54db3 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -55,11 +56,11 @@ public class JMXJobManagerMetricTest {
Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
Configuration flinkConfiguration = new Configuration();
- flinkConfiguration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+ flinkConfiguration.setString(MetricOptions.REPORTERS_LIST, "test");
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "9060-9075");
- flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");
+ flinkConfiguration.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");
TestingCluster flink = new TestingCluster(flinkConfiguration);
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 17fd65a..9215c3f 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.metrics.statsd;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
@@ -75,13 +76,13 @@ public class StatsDReporterTest extends TestLogger {
String taskManagerId = "tas:kMana::ger";
String counterName = "testCounter";
- configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+ configuration.setString(MetricOptions.REPORTERS_LIST, "test");
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
"org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");
- configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
- configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
+ configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
+ configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
@@ -145,7 +146,7 @@ public class StatsDReporterTest extends TestLogger {
int port = receiver.getPort();
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+ config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");
@@ -219,7 +220,7 @@ public class StatsDReporterTest extends TestLogger {
int port = receiver.getPort();
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+ config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 596fc82..3475f04 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
-import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -101,13 +101,13 @@ public class MetricRegistryConfiguration {
char delim;
try {
- delim = configuration.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0);
+ delim = configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
} catch (Exception e) {
LOG.warn("Failed to parse delimiter, using default delimiter.", e);
delim = '.';
}
- final String definedReporters = configuration.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
+ final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST);
List<Tuple2<String, Configuration>> reporterConfigurations;
if (definedReporters == null) {
@@ -136,18 +136,12 @@ public class MetricRegistryConfiguration {
* @return Scope formats extracted from the given configuration
*/
static ScopeFormats createScopeConfig(Configuration configuration) {
- String jmFormat = configuration.getString(
- ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
- String jmJobFormat = configuration.getString(
- ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
- String tmFormat = configuration.getString(
- ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
- String tmJobFormat = configuration.getString(
- ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
- String taskFormat = configuration.getString(
- ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
- String operatorFormat = configuration.getString(
- ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+ String jmFormat = configuration.getString(MetricOptions.SCOPE_NAMING_JM);
+ String jmJobFormat = configuration.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
+ String tmFormat = configuration.getString(MetricOptions.SCOPE_NAMING_TM);
+ String tmJobFormat = configuration.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
+ String taskFormat = configuration.getString(MetricOptions.SCOPE_NAMING_TASK);
+ String operatorFormat = configuration.getString(MetricOptions.SCOPE_NAMING_OPERATOR);
return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
index adf0a85..f9efb88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
@@ -70,46 +70,15 @@ public abstract class ScopeFormat {
public static final String SCOPE_HOST = asVariable("host");
- // ----- Job Manager ----
-
- /** The default scope format of the JobManager component: {@code "<host>.jobmanager"} */
- public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT =
- concat(SCOPE_HOST, "jobmanager");
-
- /** The default scope format of JobManager metrics: {@code "<host>.jobmanager"} */
- public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = DEFAULT_SCOPE_JOBMANAGER_COMPONENT;
-
// ----- Task Manager ----
public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
- /** The default scope format of the TaskManager component: {@code "<host>.taskmanager.<tm_id>"} */
- public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT =
- concat(SCOPE_HOST, "taskmanager", SCOPE_TASKMANAGER_ID);
-
- /** The default scope format of TaskManager metrics: {@code "<host>.taskmanager.<tm_id>"} */
- public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = DEFAULT_SCOPE_TASKMANAGER_COMPONENT;
-
// ----- Job -----
public static final String SCOPE_JOB_ID = asVariable("job_id");
public static final String SCOPE_JOB_NAME = asVariable("job_name");
- /** The default scope format for the job component: {@code "<job_name>"} */
- public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME;
-
- // ----- Job on Job Manager ----
-
- /** The default scope format for all job metrics on a jobmanager: {@code "<host>.jobmanager.<job_name>"} */
- public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP =
- concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT);
-
- // ----- Job on Task Manager ----
-
- /** The default scope format for all job metrics on a taskmanager: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */
- public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP =
- concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT);
-
// ----- Task ----
public static final String SCOPE_TASK_VERTEX_ID = asVariable("task_id");
@@ -118,27 +87,9 @@ public abstract class ScopeFormat {
public static final String SCOPE_TASK_ATTEMPT_NUM = asVariable("task_attempt_num");
public static final String SCOPE_TASK_SUBTASK_INDEX = asVariable("subtask_index");
- /** Default scope of the task component: {@code "<task_name>.<subtask_index>"} */
- public static final String DEFAULT_SCOPE_TASK_COMPONENT =
- concat(SCOPE_TASK_NAME, SCOPE_TASK_SUBTASK_INDEX);
-
- /** The default scope format for all task metrics:
- * {@code "<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"} */
- public static final String DEFAULT_SCOPE_TASK_GROUP =
- concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_TASK_COMPONENT);
-
// ----- Operator ----
public static final String SCOPE_OPERATOR_NAME = asVariable("operator_name");
-
- /** The default scope added by the operator component: "<operator_name>.<subtask_index>" */
- public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT =
- concat(SCOPE_OPERATOR_NAME, SCOPE_TASK_SUBTASK_INDEX);
-
- /** The default scope format for all operator metrics:
- * {@code "<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"} */
- public static final String DEFAULT_SCOPE_OPERATOR_GROUP =
- concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_OPERATOR_COMPONENT);
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
index bbbe6ba..bde93be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.metrics.scope;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,21 +41,21 @@ public class ScopeFormats {
* Creates all default scope formats.
*/
public ScopeFormats() {
- this.jobManagerFormat = new JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT);
+ this.jobManagerFormat = new JobManagerScopeFormat(MetricOptions.SCOPE_NAMING_JM.defaultValue());
this.jobManagerJobFormat = new JobManagerJobScopeFormat(
- ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, this.jobManagerFormat);
+ MetricOptions.SCOPE_NAMING_JM_JOB.defaultValue(), this.jobManagerFormat);
- this.taskManagerFormat = new TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT);
+ this.taskManagerFormat = new TaskManagerScopeFormat(MetricOptions.SCOPE_NAMING_TM.defaultValue());
this.taskManagerJobFormat = new TaskManagerJobScopeFormat(
- ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, this.taskManagerFormat);
+ MetricOptions.SCOPE_NAMING_TM_JOB.defaultValue(), this.taskManagerFormat);
this.taskFormat = new TaskScopeFormat(
- ScopeFormat.DEFAULT_SCOPE_TASK_GROUP, this.taskManagerJobFormat);
+ MetricOptions.SCOPE_NAMING_TASK.defaultValue(), this.taskManagerJobFormat);
this.operatorFormat = new OperatorScopeFormat(
- ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP, this.taskFormat);
+ MetricOptions.SCOPE_NAMING_OPERATOR.defaultValue(), this.taskFormat);
}
/**
@@ -135,18 +135,12 @@ public class ScopeFormats {
* @return The ScopeFormats parsed from the configuration
*/
public static ScopeFormats fromConfig(Configuration config) {
- String jmFormat = config.getString(
- ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
- String jmJobFormat = config.getString(
- ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
- String tmFormat = config.getString(
- ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
- String tmJobFormat = config.getString(
- ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
- String taskFormat = config.getString(
- ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
- String operatorFormat = config.getString(
- ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+ String jmFormat = config.getString(MetricOptions.SCOPE_NAMING_JM);
+ String jmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
+ String tmFormat = config.getString(MetricOptions.SCOPE_NAMING_TM);
+ String tmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
+ String taskFormat = config.getString(MetricOptions.SCOPE_NAMING_TASK);
+ String operatorFormat = config.getString(MetricOptions.SCOPE_NAMING_OPERATOR);
return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index fe29ccb..b9502b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -23,6 +23,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
@@ -69,7 +70,7 @@ public class MetricRegistryTest extends TestLogger {
public void testReporterInstantiation() {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+ config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
@@ -97,7 +98,7 @@ public class MetricRegistryTest extends TestLogger {
public void testMultipleReporterInstantiation() {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1, test2,test3");
+ config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName());
@@ -147,7 +148,7 @@ public class MetricRegistryTest extends TestLogger {
public void testReporterArgumentForwarding() {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+ config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
@@ -172,7 +173,7 @@ public class MetricRegistryTest extends TestLogger {
public void testReporterScheduling() throws InterruptedException {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+ config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
@@ -215,7 +216,7 @@ public class MetricRegistryTest extends TestLogger {
@Test
public void testReporterNotifications() {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+ config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
@@ -278,10 +279,10 @@ public class MetricRegistryTest extends TestLogger {
public void testScopeConfig() {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A");
- config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "B");
- config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "C");
- config.setString(ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, "D");
+ config.setString(MetricOptions.SCOPE_NAMING_TM, "A");
+ config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B");
+ config.setString(MetricOptions.SCOPE_NAMING_TASK, "C");
+ config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D");
ScopeFormats scopeConfig = MetricRegistryConfiguration.createScopeConfig(config);
@@ -294,8 +295,8 @@ public class MetricRegistryTest extends TestLogger {
@Test
public void testConfigurableDelimiter() {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
- config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D.E");
+ config.setString(MetricOptions.SCOPE_DELIMITER, "_");
+ config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
@@ -308,7 +309,7 @@ public class MetricRegistryTest extends TestLogger {
@Test
public void testConfigurableDelimiterForReporters() {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2,test3");
+ config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -331,7 +332,7 @@ public class MetricRegistryTest extends TestLogger {
@Test
public void testConfigurableDelimiterForReportersInGroup() {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2,test3,test4");
+ config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -339,7 +340,7 @@ public class MetricRegistryTest extends TestLogger {
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
- config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B");
+ config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B");
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
List<MetricReporter> reporters = registry.getReporters();
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index da14bfd..04e40ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
@@ -77,8 +78,8 @@ public class AbstractMetricGroupTest {
@Test
public void testScopeCachingForMultipleReporters() throws Exception {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D");
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+ config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
+ config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
@@ -166,7 +167,7 @@ public class AbstractMetricGroupTest {
@Test
public void testScopeGenerationWithoutReporters() {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D");
+ config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 317d19e..7834755 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -110,7 +110,7 @@ public class JobManagerGroupTest extends TestLogger {
@Test
public void testGenerateScopeCustom() {
Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "constant.<host>.foo.<host>");
+ cfg.setString(MetricOptions.SCOPE_NAMING_JM, "constant.<host>.foo.<host>");
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host");
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index ed4056f..d8bd57a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -54,8 +54,8 @@ public class JobManagerJobGroupTest extends TestLogger {
@Test
public void testGenerateScopeCustom() {
Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "abc");
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "some-constant.<job_name>");
+ cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc");
+ cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "some-constant.<job_name>");
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
JobID jid = new JobID();
@@ -77,8 +77,8 @@ public class JobManagerJobGroupTest extends TestLogger {
@Test
public void testGenerateScopeCustomWildcard() {
Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "peter");
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>");
+ cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter");
+ cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>");
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
JobID jid = new JobID();
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index f121b59..ace0236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
@@ -41,7 +42,7 @@ public class MetricGroupRegistrationTest {
@Test
public void testMetricInstantiation() {
Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+ config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 965e8fb..11f3c0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -161,7 +161,7 @@ public class TaskManagerGroupTest extends TestLogger {
@Test
public void testGenerateScopeCustom() {
Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "constant.<host>.foo.<host>");
+ cfg.setString(MetricOptions.SCOPE_NAMING_TM, "constant.<host>.foo.<host>");
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index f9bef6c..f9891cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -54,8 +54,8 @@ public class TaskManagerJobGroupTest extends TestLogger {
@Test
public void testGenerateScopeCustom() {
Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc");
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "some-constant.<job_name>");
+ cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
+ cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "some-constant.<job_name>");
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
JobID jid = new JobID();
@@ -76,8 +76,8 @@ public class TaskManagerJobGroupTest extends TestLogger {
@Test
public void testGenerateScopeCustomWildcard() {
Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "peter.<tm_id>");
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "*.some-constant.<job_id>");
+ cfg.setString(MetricOptions.SCOPE_NAMING_TM, "peter.<tm_id>");
+ cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "*.some-constant.<job_id>");
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
JobID jid = new JobID();
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index eb906d0..183237a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -63,9 +63,9 @@ public class TaskMetricGroupTest extends TestLogger {
@Test
public void testGenerateScopeCustom() {
Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc");
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "def");
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<tm_id>.<job_id>.<task_id>.<task_attempt_id>");
+ cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
+ cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "def");
+ cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "<tm_id>.<job_id>.<task_id>.<task_attempt_id>");
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
JobID jid = new JobID();
@@ -90,7 +90,7 @@ public class TaskMetricGroupTest extends TestLogger {
@Test
public void testGenerateScopeWilcard() {
Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "*.<task_attempt_id>.<subtask_index>");
+ cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "*.<task_attempt_id>.<subtask_index>");
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
AbstractID executionId = new AbstractID();
http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7569170..99c3f4e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -28,8 +28,8 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -177,10 +177,10 @@ public abstract class AbstractStreamOperator<OUT>
((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
}
Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
- int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+ int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
if (historySize <= 0) {
- LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize);
- historySize = ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE;
+ LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
+ historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
}
latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));
[06/13] flink git commit: [FLINK-6210] [rocksdb] Close RocksDB in
ListViaMergeSpeedMiniBenchmark && ListViaRangeSpeedMiniBenchmark
Posted by se...@apache.org.
[FLINK-6210] [rocksdb] Close RocksDB in ListViaMergeSpeedMiniBenchmark && ListViaRangeSpeedMiniBenchmark
This closes #3652
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea054a7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea054a7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea054a7d
Branch: refs/heads/master
Commit: ea054a7d3bc452d153c070b2789ddbe6a2f080a7
Parents: 7a70524
Author: mengji.fy <me...@taobao.com>
Authored: Thu Mar 30 11:12:41 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:00:32 2017 +0200
----------------------------------------------------------------------
.../ListViaMergeSpeedMiniBenchmark.java | 73 ++++++++-------
.../ListViaRangeSpeedMiniBenchmark.java | 99 +++++++++++---------
2 files changed, 93 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ea054a7d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
index 2a530e1..f3e084f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
@@ -53,52 +53,59 @@ public class ListViaMergeSpeedMiniBenchmark {
final String key = "key";
final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
- final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
- final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+ try {
+ final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
- final int num = 50000;
+ final int num = 50000;
- // ----- insert -----
- System.out.println("begin insert");
+ // ----- insert -----
+ System.out.println("begin insert");
- final long beginInsert = System.nanoTime();
- for (int i = 0; i < num; i++) {
- rocksDB.merge(write_options, keyBytes, valueBytes);
- }
- final long endInsert = System.nanoTime();
- System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
+ final long beginInsert = System.nanoTime();
+ for (int i = 0; i < num; i++) {
+ rocksDB.merge(write_options, keyBytes, valueBytes);
+ }
+ final long endInsert = System.nanoTime();
+ System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
- // ----- read (attempt 1) -----
+ // ----- read (attempt 1) -----
- final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
- final long beginGet1 = System.nanoTime();
- rocksDB.get(keyBytes, resultHolder);
- final long endGet1 = System.nanoTime();
+ final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
+ final long beginGet1 = System.nanoTime();
+ rocksDB.get(keyBytes, resultHolder);
+ final long endGet1 = System.nanoTime();
- System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
+ System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
- // ----- read (attempt 2) -----
+ // ----- read (attempt 2) -----
- final long beginGet2 = System.nanoTime();
- rocksDB.get(keyBytes, resultHolder);
- final long endGet2 = System.nanoTime();
+ final long beginGet2 = System.nanoTime();
+ rocksDB.get(keyBytes, resultHolder);
+ final long endGet2 = System.nanoTime();
- System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
+ System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
- // ----- compact -----
- System.out.println("compacting...");
- final long beginCompact = System.nanoTime();
- rocksDB.compactRange();
- final long endCompact = System.nanoTime();
+ // ----- compact -----
+ System.out.println("compacting...");
+ final long beginCompact = System.nanoTime();
+ rocksDB.compactRange();
+ final long endCompact = System.nanoTime();
- System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
+ System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
- // ----- read (attempt 3) -----
+ // ----- read (attempt 3) -----
- final long beginGet3 = System.nanoTime();
- rocksDB.get(keyBytes, resultHolder);
- final long endGet3 = System.nanoTime();
+ final long beginGet3 = System.nanoTime();
+ rocksDB.get(keyBytes, resultHolder);
+ final long endGet3 = System.nanoTime();
- System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
+ System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
+ } finally {
+ rocksDB.close();
+ options.close();
+ write_options.close();
+ FileUtils.deleteDirectory(rocksDir);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea054a7d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
index b427ef1..f46e2cd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
@@ -56,61 +56,68 @@ public class ListViaRangeSpeedMiniBenchmark {
final String key = "key";
final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+ try {
- final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
- final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+ final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
- final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
+ final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
- final Unsafe unsafe = MemoryUtils.UNSAFE;
- final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
+ final Unsafe unsafe = MemoryUtils.UNSAFE;
+ final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
- final int num = 50000;
- System.out.println("begin insert");
+ final int num = 50000;
+ System.out.println("begin insert");
- final long beginInsert = System.nanoTime();
- for (int i = 0; i < num; i++) {
- unsafe.putInt(keyTemplate, offset, i);
- rocksDB.put(write_options, keyTemplate, valueBytes);
- }
- final long endInsert = System.nanoTime();
- System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
-
- final byte[] resultHolder = new byte[num * valueBytes.length];
-
- final long beginGet = System.nanoTime();
-
- final RocksIterator iterator = rocksDB.newIterator();
- int pos = 0;
-
- // seek to start
- unsafe.putInt(keyTemplate, offset, 0);
- iterator.seek(keyTemplate);
-
- // mark end
- unsafe.putInt(keyTemplate, offset, -1);
-
- // iterate
- while (iterator.isValid()) {
- byte[] currKey = iterator.key();
- if (samePrefix(keyBytes, currKey)) {
- byte[] currValue = iterator.value();
- System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
- pos += currValue.length;
- iterator.next();
+ final long beginInsert = System.nanoTime();
+ for (int i = 0; i < num; i++) {
+ unsafe.putInt(keyTemplate, offset, i);
+ rocksDB.put(write_options, keyTemplate, valueBytes);
}
- else {
- break;
+ final long endInsert = System.nanoTime();
+ System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
+
+ final byte[] resultHolder = new byte[num * valueBytes.length];
+
+ final long beginGet = System.nanoTime();
+
+ final RocksIterator iterator = rocksDB.newIterator();
+ int pos = 0;
+
+ try {
+ // seek to start
+ unsafe.putInt(keyTemplate, offset, 0);
+ iterator.seek(keyTemplate);
+
+ // mark end
+ unsafe.putInt(keyTemplate, offset, -1);
+
+ // iterate
+ while (iterator.isValid()) {
+ byte[] currKey = iterator.key();
+ if (samePrefix(keyBytes, currKey)) {
+ byte[] currValue = iterator.value();
+ System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
+ pos += currValue.length;
+ iterator.next();
+ } else {
+ break;
+ }
+ }
+ }finally {
+ iterator.close();
}
- }
- final long endGet = System.nanoTime();
+ final long endGet = System.nanoTime();
- System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
-
- // WriteOptions and RocksDB ultimately extends AbstractNativeReference, so we need to close resource as well.
- write_options.close();
- rocksDB.close();
+ System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
+ } finally {
+ rocksDB.close();
+ options.close();
+ write_options.close();
+ FileUtils.deleteDirectory(rocksDir);
+ }
}
private static boolean samePrefix(byte[] prefix, byte[] key) {
[13/13] flink git commit: [FLINK-5672] [scripts] Add special cases
for a local setup in cluster start/stop scripts
Posted by se...@apache.org.
[FLINK-5672] [scripts] Add special cases for a local setup in cluster start/stop scripts
This way, if all slaves refer to "localhost", we do not require ssh at all.
This closes #3298
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83061ad0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83061ad0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83061ad0
Branch: refs/heads/master
Commit: 83061ad0f34ab158b6aa7924bba1aa35695817f4
Parents: d6a1551
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Feb 13 15:33:23 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 13:30:29 2017 +0200
----------------------------------------------------------------------
flink-dist/src/main/flink-bin/bin/config.sh | 33 +++++++++++++++++++-
.../src/main/flink-bin/bin/start-cluster.sh | 14 ++-------
.../src/main/flink-bin/bin/stop-cluster.sh | 14 ++-------
3 files changed, 36 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/83061ad0/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index dbfdd0e..a16529c 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -410,16 +410,47 @@ readSlaves() {
SLAVES=()
+ SLAVES_ALL_LOCALHOST=true
GOON=true
while $GOON; do
read line || GOON=false
HOST=$( extractHostName $line)
- if [ -n "$HOST" ]; then
+ if [ -n "$HOST" ] ; then
SLAVES+=(${HOST})
+ if [ "${HOST}" != "localhost" ] ; then
+ SLAVES_ALL_LOCALHOST=false
+ fi
fi
done < "$SLAVES_FILE"
}
+# starts or stops TMs on all slaves
+# TMSlaves start|stop
+TMSlaves() {
+ CMD=$1
+
+ readSlaves
+
+ if [ ${SLAVES_ALL_LOCALHOST} = true ] ; then
+ # all-local setup
+ for slave in ${SLAVES[@]}; do
+ "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
+ done
+ else
+ # non-local setup
+ # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
+ command -v pdsh >/dev/null 2>&1
+ if [[ $? -ne 0 ]]; then
+ for slave in ${SLAVES[@]}; do
+ ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
+ done
+ else
+ PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \
+ "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
+ fi
+ fi
+}
+
useOffHeapMemory() {
[[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/83061ad0/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index 7611189..5d2f92b 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -49,15 +49,5 @@ else
fi
shopt -u nocasematch
-# Start TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
-readSlaves
-
-command -v pdsh >/dev/null 2>&1
-if [[ $? -ne 0 ]]; then
- for slave in ${SLAVES[@]}; do
- ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" start &"
- done
-else
- PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND="${FLINK_SSH_OPTS}" pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \
- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" start"
-fi
+# Start TaskManager instance(s)
+TMSlaves start
http://git-wip-us.apache.org/repos/asf/flink/blob/83061ad0/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
index bc86291..e04d2fa 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
@@ -22,18 +22,8 @@ bin=`cd "$bin"; pwd`
. "$bin"/config.sh
-# Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
-readSlaves
-
-command -v pdsh >/dev/null 2>&1
-if [[ $? -ne 0 ]]; then
- for slave in ${SLAVES[@]}; do
- ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" stop &"
- done
-else
- PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \
- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" stop"
-fi
+# Stop TaskManager instance(s)
+TMSlaves stop
# Stop JobManager instance(s)
shopt -s nocasematch
[10/13] flink git commit: [FLINK-5962] [runtime-web] [tests] Replace
deprecated usage og JsonNode#getValueAsText
Posted by se...@apache.org.
[FLINK-5962] [runtime-web] [tests] Replace deprecated usage og JsonNode#getValueAsText
This closes #3679
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3cad3097
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3cad3097
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3cad3097
Branch: refs/heads/master
Commit: 3cad30974e3bf16d1604096f2b0e138768ae795b
Parents: 1b77cc3
Author: zentol <ch...@apache.org>
Authored: Wed Apr 5 23:50:12 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 12:32:32 2017 +0200
----------------------------------------------------------------------
...obCancellationWithSavepointHandlersTest.java | 44 ++++++++++----------
1 file changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3cad3097/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
index 8b948aa..8c2d3fc 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
import akka.dispatch.ExecutionContexts$;
import akka.dispatch.Futures;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -32,8 +34,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.ExecutionContext;
@@ -201,9 +201,9 @@ public class JobCancellationWithSavepointHandlersTest {
String json = response.content().toString(Charset.forName("UTF-8"));
JsonNode root = new ObjectMapper().readTree(json);
- assertEquals("accepted", root.get("status").getValueAsText());
- assertEquals("1", root.get("request-id").getValueAsText());
- assertEquals(location, root.get("location").getValueAsText());
+ assertEquals("accepted", root.get("status").asText());
+ assertEquals("1", root.get("request-id").asText());
+ assertEquals(location, root.get("location").asText());
// Trigger again
response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
@@ -215,9 +215,9 @@ public class JobCancellationWithSavepointHandlersTest {
json = response.content().toString(Charset.forName("UTF-8"));
root = new ObjectMapper().readTree(json);
- assertEquals("accepted", root.get("status").getValueAsText());
- assertEquals("1", root.get("request-id").getValueAsText());
- assertEquals(location, root.get("location").getValueAsText());
+ assertEquals("accepted", root.get("status").asText());
+ assertEquals("1", root.get("request-id").asText());
+ assertEquals(location, root.get("location").asText());
// Only single actual request
verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class));
@@ -233,8 +233,8 @@ public class JobCancellationWithSavepointHandlersTest {
json = response.content().toString(Charset.forName("UTF-8"));
root = new ObjectMapper().readTree(json);
- assertEquals("in-progress", root.get("status").getValueAsText());
- assertEquals("1", root.get("request-id").getValueAsText());
+ assertEquals("in-progress", root.get("status").asText());
+ assertEquals("1", root.get("request-id").asText());
// Complete
promise.success(new CancellationSuccess(jobId, "_path-savepoint_"));
@@ -249,9 +249,9 @@ public class JobCancellationWithSavepointHandlersTest {
root = new ObjectMapper().readTree(json);
- assertEquals("success", root.get("status").getValueAsText());
- assertEquals("1", root.get("request-id").getValueAsText());
- assertEquals("_path-savepoint_", root.get("savepoint-path").getValueAsText());
+ assertEquals("success", root.get("status").asText());
+ assertEquals("1", root.get("request-id").asText());
+ assertEquals("_path-savepoint_", root.get("savepoint-path").asText());
// Query again, keep recent history
@@ -265,9 +265,9 @@ public class JobCancellationWithSavepointHandlersTest {
root = new ObjectMapper().readTree(json);
- assertEquals("success", root.get("status").getValueAsText());
- assertEquals("1", root.get("request-id").getValueAsText());
- assertEquals("_path-savepoint_", root.get("savepoint-path").getValueAsText());
+ assertEquals("success", root.get("status").asText());
+ assertEquals("1", root.get("request-id").asText());
+ assertEquals("_path-savepoint_", root.get("savepoint-path").asText());
// Query for unknown request
params.put("requestId", "9929");
@@ -281,9 +281,9 @@ public class JobCancellationWithSavepointHandlersTest {
root = new ObjectMapper().readTree(json);
- assertEquals("failed", root.get("status").getValueAsText());
- assertEquals("9929", root.get("request-id").getValueAsText());
- assertEquals("Unknown job/request ID", root.get("cause").getValueAsText());
+ assertEquals("failed", root.get("status").asText());
+ assertEquals("9929", root.get("request-id").asText());
+ assertEquals("Unknown job/request ID", root.get("cause").asText());
}
/**
@@ -327,8 +327,8 @@ public class JobCancellationWithSavepointHandlersTest {
String json = response.content().toString(Charset.forName("UTF-8"));
JsonNode root = new ObjectMapper().readTree(json);
- assertEquals("failed", root.get("status").getValueAsText());
- assertEquals("1", root.get("request-id").getValueAsText());
- assertEquals("Test Exception", root.get("cause").getValueAsText());
+ assertEquals("failed", root.get("status").asText());
+ assertEquals("1", root.get("request-id").asText());
+ assertEquals("Test Exception", root.get("cause").asText());
}
}
[02/13] flink git commit: [hotfix] [docs] Typo sick -> sink
Posted by se...@apache.org.
[hotfix] [docs] Typo sick -> sink
This closes #3749
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f75466fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f75466fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f75466fe
Branch: refs/heads/master
Commit: f75466fecf494e5da700488144ae79667f2ed27e
Parents: c974684
Author: Marc Tremblay <ma...@gmail.com>
Authored: Fri Apr 21 00:22:14 2017 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 21 10:16:24 2017 +0200
----------------------------------------------------------------------
docs/dev/connectors/filesystem_sink.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f75466fe/docs/dev/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index 67250f0..d12752f 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -44,7 +44,7 @@ cluster execution.
#### Bucketing File Sink
The bucketing behaviour as well as the writing can be configured but we will get to that later.
-This is how you can create a bucketing sick which by default, sinks to rolling files that are split by time:
+This is how you can create a bucketing sink which by default, sinks to rolling files that are split by time:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">