You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/11 10:05:23 UTC

[1/9] flink git commit: [FLINK-9743][client] Use correct zip path separator for nested jars

Repository: flink
Updated Branches:
  refs/heads/master cde504eb4 -> c3b013b9d


[FLINK-9743][client] Use correct zip path separator for nested jars

This closes #6263.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fbe562d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fbe562d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fbe562d

Branch: refs/heads/master
Commit: 2fbe562d4db93ecef8b5cf4a3f477148d43208f6
Parents: cde504e
Author: snuyanzin <sn...@gmail.com>
Authored: Thu Jul 5 11:58:33 2018 +0300
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 11 12:05:06 2018 +0200

----------------------------------------------------------------------
 .../flink/client/program/PackagedProgram.java   |  4 ++-
 .../client/program/PackagedProgramTest.java     | 28 ++++++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2fbe562d/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index fbc8187..c25d943 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -696,7 +696,9 @@ public class PackagedProgram {
 					for (int i = 0; i < containedJarFileEntries.size(); i++) {
 						final JarEntry entry = containedJarFileEntries.get(i);
 						String name = entry.getName();
-						name = name.replace(File.separatorChar, '_');
+						// '/' as in case of zip, jar
+						// java.util.zip.ZipEntry#isDirectory always looks only for '/' not for File.separator
+						name = name.replace('/', '_');
 
 						File tempFile;
 						try {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fbe562d/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
index e137ada..355e663 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
@@ -19,18 +19,29 @@
 package org.apache.flink.client.program;
 
 import org.apache.flink.client.cli.CliFrontendTestUtils;
+import org.apache.flink.configuration.ConfigConstants;
 
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.PrintStream;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
 
 /**
  * Tests for the {@link PackagedProgramTest}.
  */
 public class PackagedProgramTest {
 
+	@Rule
+	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	@Test
 	public void testGetPreviewPlan() {
 		try {
@@ -56,6 +67,23 @@ public class PackagedProgramTest {
 		}
 	}
 
+	@Test
+	public void testExtractContainedLibraries() throws Exception {
+		String s = "testExtractContainedLibraries";
+		byte[] nestedJarContent = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
+		File fakeJar = temporaryFolder.newFile("test.jar");
+		try (ZipOutputStream zos = new ZipOutputStream(new FileOutputStream(fakeJar))) {
+			ZipEntry entry = new ZipEntry("lib/internalTest.jar");
+			zos.putNextEntry(entry);
+			zos.write(nestedJarContent);
+			zos.closeEntry();
+		}
+
+		final List<File> files = PackagedProgram.extractContainedLibraries(fakeJar.toURI().toURL());
+		Assert.assertEquals(1, files.size());
+		Assert.assertArrayEquals(nestedJarContent, Files.readAllBytes(files.iterator().next().toPath()));
+	}
+
 	private static final class NullOutputStream extends java.io.OutputStream {
 		@Override
 		public void write(int b) {}


[3/9] flink git commit: [FLINK-9789][metrics] Ensure uniqueness of watermark metrics

Posted by ch...@apache.org.
[FLINK-9789][metrics] Ensure uniqueness of watermark metrics

This closes #6292.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60df251a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60df251a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60df251a

Branch: refs/heads/master
Commit: 60df251ad34ac033ed6c4423a69765739cb04199
Parents: 2fbe562
Author: zentol <ch...@apache.org>
Authored: Tue Jul 10 13:27:34 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 11 12:05:07 2018 +0200

----------------------------------------------------------------------
 .../util/InterceptingTaskMetricGroup.java       | 53 ++++++++++++++++++++
 .../runtime/tasks/OneInputStreamTask.java       |  3 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  5 +-
 .../runtime/tasks/TwoInputStreamTask.java       |  3 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   | 18 ++++++-
 .../runtime/tasks/TwoInputStreamTaskTest.java   | 22 +++++++-
 6 files changed, 98 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
new file mode 100644
index 0000000..29454b4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link TaskMetricGroup} that exposes all registered metrics.
+ */
+public class InterceptingTaskMetricGroup extends UnregisteredMetricGroups.UnregisteredTaskMetricGroup {
+
+	private Map<String, Metric> intercepted;
+
+	/**
+	 * Returns the registered metric for the given name, or null if it was never registered.
+	 *
+	 * @param name metric name
+	 * @return registered metric for the given name, or null if it was never registered
+	 */
+	public Metric get(String name) {
+		return intercepted.get(name);
+	}
+
+	@Override
+	protected void addMetric(String name, Metric metric) {
+		if (intercepted == null) {
+			intercepted = new HashMap<>();
+		}
+		intercepted.put(name, metric);
+		super.addMetric(name, metric);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 43eab24..7498518 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -93,7 +93,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 					inputWatermarkGauge);
 		}
 		headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
-		getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
+		// wrap watermark gauge since registered metrics must be unique
+		getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index c105ad7..015b7db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -381,8 +381,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
 		}
 
-		chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge());
-		chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge());
+		// wrap watermark gauges since registered metrics must be unique
+		chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
+		chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);
 
 		return currentOperatorOutput;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 93a5675..546ccdb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -105,7 +105,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 		headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge);
 		headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge);
 		headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, input2WatermarkGauge);
-		getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge);
+		// wrap watermark gauge since registered metrics must be unique
+		getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge::getValue);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index af776d5..201e138 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -64,6 +65,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -678,7 +680,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
 		InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
-		TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
+		InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
 			@Override
 			public OperatorMetricGroup addOperator(OperatorID id, String name) {
 				if (id.equals(headOperatorId)) {
@@ -702,11 +704,23 @@ public class OneInputStreamTaskTest extends TestLogger {
 		testHarness.invoke(env);
 		testHarness.waitForTaskRunning();
 
+		Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
 		Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
 		Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
 		Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
 		Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
 
+		Assert.assertEquals("A metric was registered multiple times.",
+			5,
+			new HashSet<>(Arrays.asList(
+				taskInputWatermarkGauge,
+				headInputWatermarkGauge,
+				headOutputWatermarkGauge,
+				chainedInputWatermarkGauge,
+				chainedOutputWatermarkGauge))
+				.size());
+
+		Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
@@ -714,6 +728,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		testHarness.processElement(new Watermark(1L));
 		testHarness.waitForInputProcessing();
+		Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
@@ -721,6 +736,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		testHarness.processElement(new Watermark(2L));
 		testHarness.waitForInputProcessing();
+		Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(4L, headOutputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(4L, chainedInputWatermarkGauge.getValue().longValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 38b262c..5d15157 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -49,6 +50,8 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -465,7 +468,7 @@ public class TwoInputStreamTaskTest {
 
 		InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
 		InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
-		TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
+		InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
 			@Override
 			public OperatorMetricGroup addOperator(OperatorID id, String name) {
 				if (id.equals(headOperatorId)) {
@@ -489,6 +492,7 @@ public class TwoInputStreamTaskTest {
 		testHarness.invoke(env);
 		testHarness.waitForTaskRunning();
 
+		Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
 		Gauge<Long> headInput1WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_1_WATERMARK);
 		Gauge<Long> headInput2WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_2_WATERMARK);
 		Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
@@ -496,6 +500,19 @@ public class TwoInputStreamTaskTest {
 		Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
 		Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
 
+		Assert.assertEquals("A metric was registered multiple times.",
+			7,
+			new HashSet<>(Arrays.asList(
+				taskInputWatermarkGauge,
+				headInput1WatermarkGauge,
+				headInput2WatermarkGauge,
+				headInputWatermarkGauge,
+				headOutputWatermarkGauge,
+				chainedInputWatermarkGauge,
+				chainedOutputWatermarkGauge))
+				.size());
+
+		Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headInput1WatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
@@ -505,6 +522,7 @@ public class TwoInputStreamTaskTest {
 
 		testHarness.processElement(new Watermark(1L), 0, 0);
 		testHarness.waitForInputProcessing();
+		Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
@@ -514,6 +532,7 @@ public class TwoInputStreamTaskTest {
 
 		testHarness.processElement(new Watermark(2L), 1, 0);
 		testHarness.waitForInputProcessing();
+		Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
 		Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
@@ -523,6 +542,7 @@ public class TwoInputStreamTaskTest {
 
 		testHarness.processElement(new Watermark(3L), 0, 0);
 		testHarness.waitForInputProcessing();
+		Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(3L, headInput1WatermarkGauge.getValue().longValue());
 		Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());


[7/9] flink git commit: [FLINK-9754][release] Remove references to scala profiles

Posted by ch...@apache.org.
[FLINK-9754][release] Remove references to scala profiles

This closes #6286.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3b013b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3b013b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3b013b9

Branch: refs/heads/master
Commit: c3b013b9d0c2e5f4941ddeff18084d090c066440
Parents: 04d0009
Author: zentol <ch...@apache.org>
Authored: Mon Jul 9 12:48:13 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 11 12:05:10 2018 +0200

----------------------------------------------------------------------
 tools/releasing/create_binary_release.sh | 2 +-
 tools/releasing/deploy_staging_jars.sh   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c3b013b9/tools/releasing/create_binary_release.sh
----------------------------------------------------------------------
diff --git a/tools/releasing/create_binary_release.sh b/tools/releasing/create_binary_release.sh
index ac2619c..374c785 100755
--- a/tools/releasing/create_binary_release.sh
+++ b/tools/releasing/create_binary_release.sh
@@ -60,7 +60,7 @@ make_binary_release() {
   fi
 
   # enable release profile here (to check for the maven version)
-  $MVN clean package $FLAGS -Prelease,scala-${SCALA_VERSION} -pl flink-shaded-hadoop/flink-shaded-hadoop2-uber,flink-dist -am -Dgpg.skip -Dcheckstyle.skip=true -DskipTests -Dmaven.test.skip=true
+  $MVN clean package $FLAGS -Prelease -pl flink-shaded-hadoop/flink-shaded-hadoop2-uber,flink-dist -am -Dgpg.skip -Dcheckstyle.skip=true -DskipTests -Dmaven.test.skip=true
 
   cd flink-dist/target/flink-*-bin/
   tar czf "${dir_name}.tgz" flink-*

http://git-wip-us.apache.org/repos/asf/flink/blob/c3b013b9/tools/releasing/deploy_staging_jars.sh
----------------------------------------------------------------------
diff --git a/tools/releasing/deploy_staging_jars.sh b/tools/releasing/deploy_staging_jars.sh
index 9245bdd..1bc20d0 100755
--- a/tools/releasing/deploy_staging_jars.sh
+++ b/tools/releasing/deploy_staging_jars.sh
@@ -41,5 +41,5 @@ cd ..
 echo "Deploying to repository.apache.org"
 
 echo "Deploying Scala 2.11 version"
-$MVN clean deploy -Prelease,docs-and-source,scala-2.11 -DskipTests -DretryFailedDeploymentCount=10
+$MVN clean deploy -Prelease,docs-and-source -DskipTests -DretryFailedDeploymentCount=10
 


[6/9] flink git commit: [hotfix][rat] Add *.snapshot files to RAT exclusion list

Posted by ch...@apache.org.
[hotfix][rat] Add *.snapshot files to RAT exclusion list

This closes #6295.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0cb1fe2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0cb1fe2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0cb1fe2f

Branch: refs/heads/master
Commit: 0cb1fe2f892bf631473ff2c06db6ab2d875ffd99
Parents: 04a7cd4
Author: an4828 <ne...@att.com>
Authored: Tue Jul 10 10:13:58 2018 -0400
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 11 12:05:10 2018 +0200

----------------------------------------------------------------------
 pom.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0cb1fe2f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1c04285..f3aedda 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1095,6 +1095,7 @@ under the License.
 
 						<!-- snapshots -->
 						<exclude>**/src/test/resources/*-snapshot</exclude>
+						<exclude>**/src/test/resources/*.snapshot</exclude>
 						<exclude>**/src/test/resources/*-savepoint</exclude>
 						<exclude>flink-core/src/test/resources/serialized-kryo-serializer-1.3</exclude>
 						<exclude>flink-core/src/test/resources/type-without-avro-serialized-using-kryo</exclude>


[9/9] flink git commit: [FLINK-9584][connector] Properly close output streams in Bucketing-/RollingSink

Posted by ch...@apache.org.
[FLINK-9584][connector] Properly close output streams in Bucketing-/RollingSink

This closes #6164.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04a7cd4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04a7cd4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04a7cd4d

Branch: refs/heads/master
Commit: 04a7cd4d9232e757560e156a684e224d54b71176
Parents: 5dbb6dd
Author: sihuazhou <su...@163.com>
Authored: Thu Jun 14 18:12:20 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 11 12:05:10 2018 +0200

----------------------------------------------------------------------
 .../flink/streaming/connectors/fs/RollingSink.java       | 11 ++++-------
 .../streaming/connectors/fs/bucketing/BucketingSink.java | 11 ++++-------
 2 files changed, 8 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04a7cd4d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 709a7c9..9ec97b7 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -533,12 +533,9 @@ public class RollingSink<T> extends RichSinkFunction<T>
 			}
 
 			// verify that truncate actually works
-			FSDataOutputStream outputStream;
 			Path testPath = new Path(UUID.randomUUID().toString());
-			try {
-				outputStream = fs.create(testPath);
+			try (FSDataOutputStream outputStream = fs.create(testPath)) {
 				outputStream.writeUTF("hello");
-				outputStream.close();
 			} catch (IOException e) {
 				LOG.error("Could not create file for checking if truncate works.", e);
 				throw new RuntimeException("Could not create file for checking if truncate works.", e);
@@ -702,9 +699,9 @@ public class RollingSink<T> extends RichSinkFunction<T>
 					Path validLengthFilePath = getValidLengthPathFor(partPath);
 					if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
 						LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
-						FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
-						lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
-						lengthFileOut.close();
+						try (FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath)) {
+							lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
+						}
 					}
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04a7cd4d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 34fb1b7..e55aff5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -631,12 +631,9 @@ public class BucketingSink<T>
 			}
 
 			// verify that truncate actually works
-			FSDataOutputStream outputStream;
 			Path testPath = new Path(UUID.randomUUID().toString());
-			try {
-				outputStream = fs.create(testPath);
+			try (FSDataOutputStream outputStream = fs.create(testPath)) {
 				outputStream.writeUTF("hello");
-				outputStream.close();
 			} catch (IOException e) {
 				LOG.error("Could not create file for checking if truncate works.", e);
 				throw new RuntimeException("Could not create file for checking if truncate works. " +
@@ -880,9 +877,9 @@ public class BucketingSink<T>
 					Path validLengthFilePath = getValidLengthPathFor(partPath);
 					if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
 						LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
-						FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
-						lengthFileOut.writeUTF(Long.toString(validLength));
-						lengthFileOut.close();
+						try (FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath)) {
+							lengthFileOut.writeUTF(Long.toString(validLength));
+						}
 					}
 				}
 


[2/9] flink git commit: [FLINK-9187][metrics] Add Prometheus PushGateway reporter

Posted by ch...@apache.org.
[FLINK-9187][metrics] Add Prometheus PushGateway reporter

This closes #6184.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ee5dbf3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ee5dbf3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ee5dbf3

Branch: refs/heads/master
Commit: 5ee5dbf3dd9e5240ed13c8c3eaff6cca158010b3
Parents: 60df251
Author: lamber-ken <!@#123zxcQ>
Authored: Wed Jun 20 12:26:10 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 11 12:05:07 2018 +0200

----------------------------------------------------------------------
 ...eus_push_gateway_reporter_configuration.html |  36 +++
 docs/monitoring/metrics.md                      |  26 ++
 flink-docs/pom.xml                              |   5 +
 .../ConfigOptionsDocGenerator.java              |   1 +
 flink-metrics/flink-metrics-prometheus/pom.xml  |   6 +
 .../prometheus/AbstractPrometheusReporter.java  | 306 +++++++++++++++++++
 .../PrometheusPushGatewayReporter.java          |  91 ++++++
 .../PrometheusPushGatewayReporterOptions.java   |  53 ++++
 .../metrics/prometheus/PrometheusReporter.java  | 271 +---------------
 .../prometheus/PrometheusReporterTest.java      |   8 +-
 10 files changed, 532 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/docs/_includes/generated/prometheus_push_gateway_reporter_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/prometheus_push_gateway_reporter_configuration.html b/docs/_includes/generated/prometheus_push_gateway_reporter_configuration.html
new file mode 100644
index 0000000..2d14b50
--- /dev/null
+++ b/docs/_includes/generated/prometheus_push_gateway_reporter_configuration.html
@@ -0,0 +1,36 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>deleteOnShutdown</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Specifies whether to delete metrics from the PushGateway on shutdown.</td>
+        </tr>
+        <tr>
+            <td><h5>host</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The PushGateway server host.</td>
+        </tr>
+        <tr>
+            <td><h5>jobName</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The job name under which metrics will be pushed</td>
+        </tr>
+        <tr>
+            <td><h5>port</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The PushGateway server port.</td>
+        </tr>
+        <tr>
+            <td><h5>randomJobNameSuffix</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Specifies whether a random suffix should be appended to the job name.</td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index a06e7f3..55f626e 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -700,6 +700,32 @@ Flink metric types are mapped to Prometheus metric types as follows:
 
 All Flink metrics variables (see [List of all Variables](#list-of-all-variables)) are exported to Prometheus as labels. 
 
+### PrometheusPushGateway (org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
+
+In order to use this reporter you must copy `/opt/flink-metrics-prometheus-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Parameters:
+
+{% include generated/prometheus_push_gateway_reporter_configuration.html %}
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
+metrics.reporter.promgateway.host: localhost
+metrics.reporter.promgateway.port: 9091
+metrics.reporter.promgateway.jobName: myJob
+metrics.reporter.promgateway.randomJobNameSuffix: true
+metrics.reporter.promgateway.deleteOnShutdown: false
+
+{% endhighlight %}
+
+The PrometheusPushGatewayReporter pushes metrics to a [Pushgateway](https://github.com/prometheus/pushgateway), which can be scraped by Prometheus.
+
+Please see the [Prometheus documentation](https://prometheus.io/docs/practices/pushing/) for use-cases.
+
 ### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
 
 In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/lib` folder

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-docs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index dfe33be..2135dea 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -54,6 +54,11 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-prometheus</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
 			<!-- necessary for loading the web-submission extension -->
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index aca722e..743f49f 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -56,6 +56,7 @@ public class ConfigOptionsDocGenerator {
 		new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
+		new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus"),
 	};
 
 	static final String DEFAULT_PATH_PREFIX = "src/main/java";

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-metrics/flink-metrics-prometheus/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/pom.xml b/flink-metrics/flink-metrics-prometheus/pom.xml
index b0cad84..9aad69c 100644
--- a/flink-metrics/flink-metrics-prometheus/pom.xml
+++ b/flink-metrics/flink-metrics-prometheus/pom.xml
@@ -73,6 +73,12 @@ under the License.
 			<version>${prometheus.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>io.prometheus</groupId>
+			<artifactId>simpleclient_pushgateway</artifactId>
+			<version>${prometheus.version}</version>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
new file mode 100644
index 0000000..426cd4c
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+
+/**
+ * base prometheus reporter for prometheus metrics.
+ */
+@PublicEvolving
+public abstract class AbstractPrometheusReporter implements MetricReporter {
+
+	protected final Logger log = LoggerFactory.getLogger(getClass());
+
+	private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
+	private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() {
+		@Override
+		public String filterCharacters(String input) {
+			return replaceInvalidChars(input);
+		}
+	};
+
+	private static final char SCOPE_SEPARATOR = '_';
+	private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR;
+
+	private final Map<String, AbstractMap.SimpleImmutableEntry<Collector, Integer>> collectorsWithCountByMetricName = new HashMap<>();
+
+	@VisibleForTesting
+	static String replaceInvalidChars(final String input) {
+		// https://prometheus.io/docs/instrumenting/writing_exporters/
+		// Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
+		return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+	}
+
+	@Override
+	public void close() {
+		CollectorRegistry.defaultRegistry.clear();
+	}
+
+	@Override
+	public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) {
+
+		List<String> dimensionKeys = new LinkedList<>();
+		List<String> dimensionValues = new LinkedList<>();
+		for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
+			final String key = dimension.getKey();
+			dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key.substring(1, key.length() - 1)));
+			dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
+		}
+
+		final String scopedMetricName = getScopedName(metricName, group);
+		final String helpString = metricName + " (scope: " + getLogicalScope(group) + ")";
+
+		final Collector collector;
+		Integer count = 0;
+
+		synchronized (this) {
+			if (collectorsWithCountByMetricName.containsKey(scopedMetricName)) {
+				final AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName);
+				collector = collectorWithCount.getKey();
+				count = collectorWithCount.getValue();
+			} else {
+				collector = createCollector(metric, dimensionKeys, dimensionValues, scopedMetricName, helpString);
+				try {
+					collector.register();
+				} catch (Exception e) {
+					log.warn("There was a problem registering metric {}.", metricName, e);
+				}
+			}
+			addMetric(metric, dimensionValues, collector);
+			collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count + 1));
+		}
+	}
+
+	private static String getScopedName(String metricName, MetricGroup group) {
+		return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName);
+	}
+
+	private Collector createCollector(Metric metric, List<String> dimensionKeys, List<String> dimensionValues, String scopedMetricName, String helpString) {
+		Collector collector;
+		if (metric instanceof Gauge || metric instanceof Counter || metric instanceof Meter) {
+			collector = io.prometheus.client.Gauge
+				.build()
+				.name(scopedMetricName)
+				.help(helpString)
+				.labelNames(toArray(dimensionKeys))
+				.create();
+		} else if (metric instanceof Histogram) {
+			collector = new HistogramSummaryProxy((Histogram) metric, scopedMetricName, helpString, dimensionKeys, dimensionValues);
+		} else {
+			log.warn("Cannot create collector for unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
+				metric.getClass().getName());
+			collector = null;
+		}
+		return collector;
+	}
+
+	private void addMetric(Metric metric, List<String> dimensionValues, Collector collector) {
+		if (metric instanceof Gauge) {
+			((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues));
+		} else if (metric instanceof Counter) {
+			((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
+		} else if (metric instanceof Meter) {
+			((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
+		} else if (metric instanceof Histogram) {
+			((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues);
+		} else {
+			log.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
+				metric.getClass().getName());
+		}
+	}
+
+	private void removeMetric(Metric metric, List<String> dimensionValues, Collector collector) {
+		if (metric instanceof Gauge) {
+			((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
+		} else if (metric instanceof Counter) {
+			((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
+		} else if (metric instanceof Meter) {
+			((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
+		} else if (metric instanceof Histogram) {
+			((HistogramSummaryProxy) collector).remove(dimensionValues);
+		} else {
+			log.warn("Cannot remove unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
+				metric.getClass().getName());
+		}
+	}
+
+	@Override
+	public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) {
+
+		List<String> dimensionValues = new LinkedList<>();
+		for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
+			dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
+		}
+
+		final String scopedMetricName = getScopedName(metricName, group);
+		synchronized (this) {
+			final AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName);
+			final Integer count = collectorWithCount.getValue();
+			final Collector collector = collectorWithCount.getKey();
+
+			removeMetric(metric, dimensionValues, collector);
+
+			if (count == 1) {
+				try {
+					CollectorRegistry.defaultRegistry.unregister(collector);
+				} catch (Exception e) {
+					log.warn("There was a problem unregistering metric {}.", scopedMetricName, e);
+				}
+				collectorsWithCountByMetricName.remove(scopedMetricName);
+			} else {
+				collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count - 1));
+			}
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private static String getLogicalScope(MetricGroup group) {
+		return ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+	}
+
+	@VisibleForTesting
+	io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) {
+		return new io.prometheus.client.Gauge.Child() {
+			@Override
+			public double get() {
+				final Object value = gauge.getValue();
+				if (value == null) {
+					log.debug("Gauge {} is null-valued, defaulting to 0.", gauge);
+					return 0;
+				}
+				if (value instanceof Double) {
+					return (double) value;
+				}
+				if (value instanceof Number) {
+					return ((Number) value).doubleValue();
+				}
+				if (value instanceof Boolean) {
+					return ((Boolean) value) ? 1 : 0;
+				}
+				log.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.",
+					gauge, value.getClass().getName());
+				return 0;
+			}
+		};
+	}
+
+	private static io.prometheus.client.Gauge.Child gaugeFrom(Counter counter) {
+		return new io.prometheus.client.Gauge.Child() {
+			@Override
+			public double get() {
+				return (double) counter.getCount();
+			}
+		};
+	}
+
+	private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) {
+		return new io.prometheus.client.Gauge.Child() {
+			@Override
+			public double get() {
+				return meter.getRate();
+			}
+		};
+	}
+
+	@VisibleForTesting
+	static class HistogramSummaryProxy extends Collector {
+		static final List<Double> QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999);
+
+		private final String metricName;
+		private final String helpString;
+		private final List<String> labelNamesWithQuantile;
+
+		private final Map<List<String>, Histogram> histogramsByLabelValues = new HashMap<>();
+
+		HistogramSummaryProxy(final Histogram histogram, final String metricName, final String helpString, final List<String> labelNames, final List<String> labelValues) {
+			this.metricName = metricName;
+			this.helpString = helpString;
+			this.labelNamesWithQuantile = addToList(labelNames, "quantile");
+			histogramsByLabelValues.put(labelValues, histogram);
+		}
+
+		@Override
+		public List<MetricFamilySamples> collect() {
+			// We cannot use SummaryMetricFamily because it is impossible to get a sum of all values (at least for Dropwizard histograms,
+			// whose snapshot's values array only holds a sample of recent values).
+
+			List<MetricFamilySamples.Sample> samples = new LinkedList<>();
+			for (Map.Entry<List<String>, Histogram> labelValuesToHistogram : histogramsByLabelValues.entrySet()) {
+				addSamples(labelValuesToHistogram.getKey(), labelValuesToHistogram.getValue(), samples);
+			}
+			return Collections.singletonList(new MetricFamilySamples(metricName, Type.SUMMARY, helpString, samples));
+		}
+
+		void addChild(final Histogram histogram, final List<String> labelValues) {
+			histogramsByLabelValues.put(labelValues, histogram);
+		}
+
+		void remove(final List<String> labelValues) {
+			histogramsByLabelValues.remove(labelValues);
+		}
+
+		private void addSamples(final List<String> labelValues, final Histogram histogram, final List<MetricFamilySamples.Sample> samples) {
+			samples.add(new MetricFamilySamples.Sample(metricName + "_count",
+				labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount()));
+			for (final Double quantile : QUANTILES) {
+				samples.add(new MetricFamilySamples.Sample(metricName, labelNamesWithQuantile,
+					addToList(labelValues, quantile.toString()),
+					histogram.getStatistics().getQuantile(quantile)));
+			}
+		}
+	}
+
+	private static List<String> addToList(List<String> list, String element) {
+		final List<String> result = new ArrayList<>(list);
+		result.add(element);
+		return result;
+	}
+
+	private static String[] toArray(List<String> list) {
+		return list.toArray(new String[list.size()]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
new file mode 100644
index 0000000..164eaf3
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+
+import java.io.IOException;
+
+import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.DELETE_ON_SHUTDOWN;
+import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST;
+import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.JOB_NAME;
+import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PORT;
+import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.RANDOM_JOB_NAME_SUFFIX;
+
+/**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus {@link PushGateway}.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
+
+	private PushGateway pushGateway;
+	private String jobName;
+	private boolean deleteOnShutdown;
+
+	@Override
+	public void open(MetricConfig config) {
+		String host = config.getString(HOST.key(), HOST.defaultValue());
+		int port = config.getInteger(PORT.key(), PORT.defaultValue());
+		String configuredJobName = config.getString(JOB_NAME.key(), JOB_NAME.defaultValue());
+		boolean randomSuffix = config.getBoolean(RANDOM_JOB_NAME_SUFFIX.key(), RANDOM_JOB_NAME_SUFFIX.defaultValue());
+		deleteOnShutdown = config.getBoolean(DELETE_ON_SHUTDOWN.key(), DELETE_ON_SHUTDOWN.defaultValue());
+
+		if (host == null || host.isEmpty() || port < 1) {
+			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+		}
+
+		if (randomSuffix) {
+			this.jobName = configuredJobName + new AbstractID();
+		} else {
+			this.jobName = configuredJobName;
+		}
+
+		pushGateway = new PushGateway(host + ':' + port);
+		log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName: {}, randomJobNameSuffix:{}, deleteOnShutdown:{}}", host, port, jobName, randomSuffix, deleteOnShutdown);
+	}
+
+	@Override
+	public void report() {
+		try {
+			pushGateway.push(CollectorRegistry.defaultRegistry, jobName);
+		} catch (Exception e) {
+			log.warn("Failed to push metrics to PushGateway with jobName {}.", jobName, e);
+		}
+	}
+
+	@Override
+	public void close() {
+		if (deleteOnShutdown && pushGateway != null) {
+			try {
+				pushGateway.delete(jobName);
+			} catch (IOException e) {
+				log.warn("Failed to delete metrics from PushGateway with jobName {}.", jobName, e);
+			}
+		}
+		super.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
new file mode 100644
index 0000000..74fe7cb
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Config options for the {@link PrometheusPushGatewayReporter}.
+ */
+public class PrometheusPushGatewayReporterOptions {
+
+	public static final ConfigOption<String> HOST = ConfigOptions
+		.key("host")
+		.noDefaultValue()
+		.withDescription("The PushGateway server host.");
+
+	public static final ConfigOption<Integer> PORT = ConfigOptions
+		.key("port")
+		.defaultValue(-1)
+		.withDescription("The PushGateway server port.");
+
+	public static final ConfigOption<String> JOB_NAME = ConfigOptions
+		.key("jobName")
+		.defaultValue("")
+		.withDescription("The job name under which metrics will be pushed");
+
+	public static final ConfigOption<Boolean> RANDOM_JOB_NAME_SUFFIX = ConfigOptions
+		.key("randomJobNameSuffix")
+		.defaultValue(true)
+		.withDescription("Specifies whether a random suffix should be appended to the job name.");
+
+	public static final ConfigOption<Boolean> DELETE_ON_SHUTDOWN = ConfigOptions
+		.key("deleteOnShutdown")
+		.defaultValue(true)
+		.withDescription("Specifies whether to delete metrics from the PushGateway on shutdown.");
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
index ffa419c..190e120 100644
--- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
+++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
@@ -20,62 +20,28 @@ package org.apache.flink.metrics.prometheus;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.metrics.CharacterFilter;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.Preconditions;
 
-import io.prometheus.client.Collector;
-import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.HTTPServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
 
 /**
  * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus.
  */
 @PublicEvolving
-public class PrometheusReporter implements MetricReporter {
-	private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class);
+public class PrometheusReporter extends AbstractPrometheusReporter {
 
 	static final String ARG_PORT = "port";
 	private static final String DEFAULT_PORT = "9249";
 
-	private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
-	private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() {
-		@Override
-		public String filterCharacters(String input) {
-			return replaceInvalidChars(input);
-		}
-	};
-
-	private static final char SCOPE_SEPARATOR = '_';
-	private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR;
-
 	private HTTPServer httpServer;
 	private int port;
-	private final Map<String, AbstractMap.SimpleImmutableEntry<Collector, Integer>> collectorsWithCountByMetricName = new HashMap<>();
 
 	@VisibleForTesting
 	int getPort() {
@@ -83,13 +49,6 @@ public class PrometheusReporter implements MetricReporter {
 		return port;
 	}
 
-	@VisibleForTesting
-	static String replaceInvalidChars(final String input) {
-		// https://prometheus.io/docs/instrumenting/writing_exporters/
-		// Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
-		return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
-	}
-
 	@Override
 	public void open(MetricConfig config) {
 		String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT);
@@ -101,10 +60,10 @@ public class PrometheusReporter implements MetricReporter {
 				// internally accesses CollectorRegistry.defaultRegistry
 				httpServer = new HTTPServer(port);
 				this.port = port;
-				LOG.info("Started PrometheusReporter HTTP server on port {}.", port);
+				log.info("Started PrometheusReporter HTTP server on port {}.", port);
 				break;
 			} catch (IOException ioe) { //assume port conflict
-				LOG.debug("Could not start PrometheusReporter HTTP server on port {}.", port, ioe);
+				log.debug("Could not start PrometheusReporter HTTP server on port {}.", port, ioe);
 			}
 		}
 		if (httpServer == null) {
@@ -117,230 +76,8 @@ public class PrometheusReporter implements MetricReporter {
 		if (httpServer != null) {
 			httpServer.stop();
 		}
-		CollectorRegistry.defaultRegistry.clear();
-	}
-
-	@Override
-	public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) {
-
-		List<String> dimensionKeys = new LinkedList<>();
-		List<String> dimensionValues = new LinkedList<>();
-		for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
-			final String key = dimension.getKey();
-			dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key.substring(1, key.length() - 1)));
-			dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
-		}
-
-		final String scopedMetricName = getScopedName(metricName, group);
-		final String helpString = metricName + " (scope: " + getLogicalScope(group) + ")";
 
-		final Collector collector;
-		Integer count = 0;
-
-		synchronized (this) {
-			if (collectorsWithCountByMetricName.containsKey(scopedMetricName)) {
-				final AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName);
-				collector = collectorWithCount.getKey();
-				count = collectorWithCount.getValue();
-			} else {
-				collector = createCollector(metric, dimensionKeys, dimensionValues, scopedMetricName, helpString);
-				try {
-					collector.register();
-				} catch (Exception e) {
-					LOG.warn("There was a problem registering metric {}.", metricName, e);
-				}
-			}
-			addMetric(metric, dimensionValues, collector);
-			collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count + 1));
-		}
-	}
-
-	private static String getScopedName(String metricName, MetricGroup group) {
-		return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName);
+		super.close();
 	}
 
-	private static Collector createCollector(Metric metric, List<String> dimensionKeys, List<String> dimensionValues, String scopedMetricName, String helpString) {
-		Collector collector;
-		if (metric instanceof Gauge || metric instanceof Counter || metric instanceof Meter) {
-			collector = io.prometheus.client.Gauge
-				.build()
-				.name(scopedMetricName)
-				.help(helpString)
-				.labelNames(toArray(dimensionKeys))
-				.create();
-		} else if (metric instanceof Histogram) {
-			collector = new HistogramSummaryProxy((Histogram) metric, scopedMetricName, helpString, dimensionKeys, dimensionValues);
-		} else {
-			LOG.warn("Cannot create collector for unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
-				metric.getClass().getName());
-			collector = null;
-		}
-		return collector;
-	}
-
-	private static void addMetric(Metric metric, List<String> dimensionValues, Collector collector) {
-		if (metric instanceof Gauge) {
-			((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues));
-		} else if (metric instanceof Counter) {
-			((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
-		} else if (metric instanceof Meter) {
-			((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
-		} else if (metric instanceof Histogram) {
-			((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues);
-		} else {
-			LOG.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
-				metric.getClass().getName());
-		}
-	}
-
-	private static void removeMetric(Metric metric, List<String> dimensionValues, Collector collector) {
-		if (metric instanceof Gauge) {
-			((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
-		} else if (metric instanceof Counter) {
-			((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
-		} else if (metric instanceof Meter) {
-			((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
-		} else if (metric instanceof Histogram) {
-			((HistogramSummaryProxy) collector).remove(dimensionValues);
-		} else {
-			LOG.warn("Cannot remove unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
-				metric.getClass().getName());
-		}
-	}
-
-	@Override
-	public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) {
-
-		List<String> dimensionValues = new LinkedList<>();
-		for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
-			dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
-		}
-
-		final String scopedMetricName = getScopedName(metricName, group);
-		synchronized (this) {
-			final AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName);
-			final Integer count = collectorWithCount.getValue();
-			final Collector collector = collectorWithCount.getKey();
-
-			removeMetric(metric, dimensionValues, collector);
-
-			if (count == 1) {
-				try {
-					CollectorRegistry.defaultRegistry.unregister(collector);
-				} catch (Exception e) {
-					LOG.warn("There was a problem unregistering metric {}.", scopedMetricName, e);
-				}
-				collectorsWithCountByMetricName.remove(scopedMetricName);
-			} else {
-				collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count - 1));
-			}
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private static String getLogicalScope(MetricGroup group) {
-		return ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
-	}
-
-	@VisibleForTesting
-	static io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) {
-		return new io.prometheus.client.Gauge.Child() {
-			@Override
-			public double get() {
-				final Object value = gauge.getValue();
-				if (value == null) {
-					LOG.debug("Gauge {} is null-valued, defaulting to 0.", gauge);
-					return 0;
-				}
-				if (value instanceof Double) {
-					return (double) value;
-				}
-				if (value instanceof Number) {
-					return ((Number) value).doubleValue();
-				}
-				if (value instanceof Boolean) {
-					return ((Boolean) value) ? 1 : 0;
-				}
-				LOG.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.",
-					gauge, value.getClass().getName());
-				return 0;
-			}
-		};
-	}
-
-	private static io.prometheus.client.Gauge.Child gaugeFrom(Counter counter) {
-		return new io.prometheus.client.Gauge.Child() {
-			@Override
-			public double get() {
-				return (double) counter.getCount();
-			}
-		};
-	}
-
-	private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) {
-		return new io.prometheus.client.Gauge.Child() {
-			@Override
-			public double get() {
-				return meter.getRate();
-			}
-		};
-	}
-
-	@VisibleForTesting
-	static class HistogramSummaryProxy extends Collector {
-		static final List<Double> QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999);
-
-		private final String metricName;
-		private final String helpString;
-		private final List<String> labelNamesWithQuantile;
-
-		private final Map<List<String>, Histogram> histogramsByLabelValues = new HashMap<>();
-
-		HistogramSummaryProxy(final Histogram histogram, final String metricName, final String helpString, final List<String> labelNames, final List<String> labelValues) {
-			this.metricName = metricName;
-			this.helpString = helpString;
-			this.labelNamesWithQuantile = addToList(labelNames, "quantile");
-			histogramsByLabelValues.put(labelValues, histogram);
-		}
-
-		@Override
-		public List<MetricFamilySamples> collect() {
-			// We cannot use SummaryMetricFamily because it is impossible to get a sum of all values (at least for Dropwizard histograms,
-			// whose snapshot's values array only holds a sample of recent values).
-
-			List<MetricFamilySamples.Sample> samples = new LinkedList<>();
-			for (Map.Entry<List<String>, Histogram> labelValuesToHistogram : histogramsByLabelValues.entrySet()) {
-				addSamples(labelValuesToHistogram.getKey(), labelValuesToHistogram.getValue(), samples);
-			}
-			return Collections.singletonList(new MetricFamilySamples(metricName, Type.SUMMARY, helpString, samples));
-		}
-
-		void addChild(final Histogram histogram, final List<String> labelValues) {
-			histogramsByLabelValues.put(labelValues, histogram);
-		}
-
-		void remove(final List<String> labelValues) {
-			histogramsByLabelValues.remove(labelValues);
-		}
-
-		private void addSamples(final List<String> labelValues, final Histogram histogram, final List<MetricFamilySamples.Sample> samples) {
-			samples.add(new MetricFamilySamples.Sample(metricName + "_count",
-				labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount()));
-			for (final Double quantile : QUANTILES) {
-				samples.add(new MetricFamilySamples.Sample(metricName, labelNamesWithQuantile,
-					addToList(labelValues, quantile.toString()),
-					histogram.getStatistics().getQuantile(quantile)));
-			}
-		}
-	}
-
-	private static List<String> addToList(List<String> list, String element) {
-		final List<String> result = new ArrayList<>(list);
-		result.add(element);
-		return result;
-	}
-
-	private static String[] toArray(List<String> list) {
-		return list.toArray(new String[list.size()]);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5ee5dbf3/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index 592c246..890227f 100644
--- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -202,7 +202,7 @@ public class PrometheusReporterTest extends TestLogger {
 
 	@Test
 	public void doubleGaugeIsConvertedCorrectly() {
-		assertThat(PrometheusReporter.gaugeFrom(new Gauge<Double>() {
+		assertThat(reporter.gaugeFrom(new Gauge<Double>() {
 			@Override
 			public Double getValue() {
 				return 3.14;
@@ -212,7 +212,7 @@ public class PrometheusReporterTest extends TestLogger {
 
 	@Test
 	public void shortGaugeIsConvertedCorrectly() {
-		assertThat(PrometheusReporter.gaugeFrom(new Gauge<Short>() {
+		assertThat(reporter.gaugeFrom(new Gauge<Short>() {
 			@Override
 			public Short getValue() {
 				return 13;
@@ -222,7 +222,7 @@ public class PrometheusReporterTest extends TestLogger {
 
 	@Test
 	public void booleanGaugeIsConvertedCorrectly() {
-		assertThat(PrometheusReporter.gaugeFrom(new Gauge<Boolean>() {
+		assertThat(reporter.gaugeFrom(new Gauge<Boolean>() {
 			@Override
 			public Boolean getValue() {
 				return true;
@@ -235,7 +235,7 @@ public class PrometheusReporterTest extends TestLogger {
 	 */
 	@Test
 	public void stringGaugeCannotBeConverted() {
-		assertThat(PrometheusReporter.gaugeFrom(new Gauge<String>() {
+		assertThat(reporter.gaugeFrom(new Gauge<String>() {
 			@Override
 			public String getValue() {
 				return "I am not a number";


[8/9] flink git commit: [FLINK-9768][release] Speed up binary release

Posted by ch...@apache.org.
[FLINK-9768][release] Speed up binary release

This closes #6285.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04d00098
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04d00098
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04d00098

Branch: refs/heads/master
Commit: 04d000981671b38d81a9a5ff385c4100e2cb2b13
Parents: 0cb1fe2
Author: zentol <ch...@apache.org>
Authored: Mon Jul 9 12:02:35 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 11 12:05:10 2018 +0200

----------------------------------------------------------------------
 tools/releasing/create_binary_release.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04d00098/tools/releasing/create_binary_release.sh
----------------------------------------------------------------------
diff --git a/tools/releasing/create_binary_release.sh b/tools/releasing/create_binary_release.sh
index 6e783cf..ac2619c 100755
--- a/tools/releasing/create_binary_release.sh
+++ b/tools/releasing/create_binary_release.sh
@@ -60,7 +60,7 @@ make_binary_release() {
   fi
 
   # enable release profile here (to check for the maven version)
-  $MVN clean package $FLAGS -DskipTests -Prelease,scala-${SCALA_VERSION} -Dgpg.skip
+  $MVN clean package $FLAGS -Prelease,scala-${SCALA_VERSION} -pl flink-shaded-hadoop/flink-shaded-hadoop2-uber,flink-dist -am -Dgpg.skip -Dcheckstyle.skip=true -DskipTests -Dmaven.test.skip=true
 
   cd flink-dist/target/flink-*-bin/
   tar czf "${dir_name}.tgz" flink-*


[5/9] flink git commit: [FLINK-9666][refactor] Use short-circuit logic in boolean contexts

Posted by ch...@apache.org.
[FLINK-9666][refactor] Use short-circuit logic in boolean contexts

This closes #6230.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5dbb6dda
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5dbb6dda
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5dbb6dda

Branch: refs/heads/master
Commit: 5dbb6dda8662a1b13314780c34463ba5c7f3c020
Parents: cad6e4d
Author: lamber-ken <!@#123zxcQ>
Authored: Sat Jun 30 10:22:09 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 11 12:05:09 2018 +0200

----------------------------------------------------------------------
 .../streaming/api/environment/StreamExecutionEnvironment.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5dbb6dda/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index d7ca7e0..b7259de 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1604,7 +1604,7 @@ public abstract class StreamExecutionEnvironment {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		if (env instanceof ContextEnvironment) {
 			return new StreamContextEnvironment((ContextEnvironment) env);
-		} else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
+		} else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
 			return new StreamPlanEnvironment(env);
 		} else {
 			return createLocalEnvironment();


[4/9] flink git commit: [FLINK-9730][refactor] Fix static accesses via instance reference

Posted by ch...@apache.org.
[FLINK-9730][refactor] Fix static accesses via instance reference

This closes #6247.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cad6e4d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cad6e4d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cad6e4d3

Branch: refs/heads/master
Commit: cad6e4d396e7b901b8c83257312860021f01c060
Parents: 5ee5dbf
Author: lamber-ken <!@#123zxcQ>
Authored: Wed Jul 4 02:36:43 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 11 12:05:08 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/streaming/examples/async/AsyncIOExample.java  | 2 +-
 .../apache/flink/queryablestate/network/AbstractServerHandler.java | 2 +-
 .../java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java  | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cad6e4d3/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
index 95379e3..7fd5f88 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
@@ -179,7 +179,7 @@ public class AsyncIOExample {
 
 		@Override
 		public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) throws Exception {
-			this.executorService.submit(new Runnable() {
+			executorService.submit(new Runnable() {
 				@Override
 				public void run() {
 					// wait for while to simulate async operation here

http://git-wip-us.apache.org/repos/asf/flink/blob/cad6e4d3/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index b2f7a47..fb835e3 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -165,7 +165,7 @@ public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extend
 	@Override
 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 		final String msg = "Exception in server pipeline. Caused by: " + ExceptionUtils.stringifyException(cause);
-		final ByteBuf err = serializer.serializeServerFailure(ctx.alloc(), new RuntimeException(msg));
+		final ByteBuf err = MessageSerializer.serializeServerFailure(ctx.alloc(), new RuntimeException(msg));
 
 		LOG.debug(msg);
 		ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE);

http://git-wip-us.apache.org/repos/asf/flink/blob/cad6e4d3/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 636ae16..089d825 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -517,7 +517,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
 		}
 
-		final int yarnMinAllocationMB = yarnConfiguration.getInt(yarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
+		final int yarnMinAllocationMB = yarnConfiguration.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
 
 		final ClusterSpecification validClusterSpecification;
 		try {