You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/12/12 14:01:28 UTC

[1/4] flink git commit: [FLINK-4906] [metrics] Introduce constants for IO metrics

Repository: flink
Updated Branches:
  refs/heads/master 365cd987c -> 47db9cb1a


[FLINK-4906] [metrics] Introduce constants for IO metrics

This closes #2980.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc5dd510
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc5dd510
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc5dd510

Branch: refs/heads/master
Commit: dc5dd5106738e393761a62a56d9e684c722c516f
Parents: 4befbb8
Author: zentol <ch...@apache.org>
Authored: Fri Dec 9 14:10:10 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 12 12:45:26 2016 +0100

----------------------------------------------------------------------
 .../webmonitor/handlers/JobDetailsHandler.java  |  9 ++---
 .../handlers/JobVertexDetailsHandler.java       |  9 ++---
 .../handlers/JobVertexTaskManagersHandler.java  |  9 ++---
 .../SubtaskExecutionAttemptDetailsHandler.java  |  9 ++---
 .../flink/runtime/metrics/MetricNames.java      | 38 ++++++++++++++++++++
 .../metrics/groups/OperatorIOMetricGroup.java   |  9 ++---
 .../metrics/groups/TaskIOMetricGroup.java       | 21 +++++------
 7 files changed, 74 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 6de6dc5..35e6ca7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
@@ -161,10 +162,10 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 					fetcher.update();
 					MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(graph.getJobID().toString(), ejv.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
 					if (metrics != null) {
-						numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
-						numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0"));
-						numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
-						numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+						numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+						numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+						numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+						numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 14dcd39..32626ba 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
@@ -99,10 +100,10 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 				fetcher.update();
 				MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), jobVertex.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
 				if (metrics != null) {
-					numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
-					numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0"));
-					numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
-					numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+					numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+					numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+					numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+					numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index c1fabf8..f468d35 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
@@ -120,10 +121,10 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 					fetcher.update();
 					MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), vertex.getParallelSubtaskIndex());
 					if (metrics != null) {
-						numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
-						numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0"));
-						numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
-						numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+						numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+						numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+						numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+						numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index ca9c7ad..da8db02 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
@@ -86,10 +87,10 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 			fetcher.update();
 			MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), execAttempt.getParallelSubtaskIndex());
 			if (metrics != null) {
-				numBytesIn = Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
-				numBytesOut = Long.valueOf(metrics.getMetric("numBytesOut", "0"));
-				numRecordsIn = Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
-				numRecordsOut = Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+				numBytesIn = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+				numBytesOut = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+				numRecordsIn = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+				numRecordsOut = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
 			}
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
new file mode 100644
index 0000000..9202ca1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+public class MetricNames {
+	private MetricNames() {
+	}
+
+	private static final String SUFFIX_RATE = "PerSecond";
+
+	public static final String IO_NUM_RECORDS_IN = "numRecordsIn";
+	public static final String IO_NUM_RECORDS_OUT = "numRecordsOut";
+	public static final String IO_NUM_RECORDS_IN_RATE = IO_NUM_RECORDS_IN + SUFFIX_RATE;
+	public static final String IO_NUM_RECORDS_OUT_RATE = IO_NUM_RECORDS_OUT + SUFFIX_RATE;
+
+	public static final String IO_NUM_BYTES_IN = "numBytesIn";
+	public static final String IO_NUM_BYTES_IN_LOCAL = IO_NUM_BYTES_IN + "Local";
+	public static final String IO_NUM_BYTES_IN_REMOTE = IO_NUM_BYTES_IN + "Remote";
+	public static final String IO_NUM_BYTES_OUT = "numBytesOut";
+	public static final String IO_NUM_BYTES_IN_LOCAL_RATE = IO_NUM_BYTES_IN_LOCAL + SUFFIX_RATE;
+	public static final String IO_NUM_BYTES_IN_REMOTE_RATE = IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE;
+	public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + SUFFIX_RATE;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
index 2e321fe..5bf7d1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.metrics.MetricNames;
 
 /**
  * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
@@ -35,10 +36,10 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup>
 
 	public OperatorIOMetricGroup(OperatorMetricGroup parentMetricGroup) {
 		super(parentMetricGroup);
-		numRecordsIn = parentMetricGroup.counter("numRecordsIn");
-		numRecordsOut = parentMetricGroup.counter("numRecordsOut");
-		numRecordsInRate = parentMetricGroup.meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60));
-		numRecordsOutRate = parentMetricGroup.meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60));
+		numRecordsIn = parentMetricGroup.counter(MetricNames.IO_NUM_RECORDS_IN);
+		numRecordsOut = parentMetricGroup.counter(MetricNames.IO_NUM_RECORDS_OUT);
+		numRecordsInRate = parentMetricGroup.meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60));
+		numRecordsOutRate = parentMetricGroup.meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60));
 	}
 
 	public Counter getNumRecordsInCounter() {

http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index c5296fb..fcea098 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 
@@ -53,16 +54,16 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 	public TaskIOMetricGroup(TaskMetricGroup parent) {
 		super(parent);
 
-		this.numBytesOut = counter("numBytesOut");
-		this.numBytesInLocal = counter("numBytesInLocal");
-		this.numBytesInRemote = counter("numBytesInRemote");
-		this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60));
-		this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60));
-		this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60));
-		this.numRecordsIn = counter("numRecordsIn", new SumCounter());
-		this.numRecordsOut = counter("numRecordsOut", new SumCounter());
-		this.numRecordsInRate = meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60));
-		this.numRecordsOutRate = meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60));
+		this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT);
+		this.numBytesInLocal = counter(MetricNames.IO_NUM_BYTES_IN_LOCAL);
+		this.numBytesInRemote = counter(MetricNames.IO_NUM_BYTES_IN_REMOTE);
+		this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut, 60));
+		this.numBytesInRateLocal = meter(MetricNames.IO_NUM_BYTES_IN_LOCAL_RATE, new MeterView(numBytesInLocal, 60));
+		this.numBytesInRateRemote = meter(MetricNames.IO_NUM_BYTES_IN_REMOTE_RATE, new MeterView(numBytesInRemote, 60));
+		this.numRecordsIn = counter(MetricNames.IO_NUM_RECORDS_IN, new SumCounter());
+		this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, new SumCounter());
+		this.numRecordsInRate = meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60));
+		this.numRecordsOutRate = meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60));
 	}
 
 	public IOMetrics createSnapshot() {


[4/4] flink git commit: [FLINK-5147] Prevent NPE in LocalFS#delete()

Posted by ch...@apache.org.
[FLINK-5147] Prevent NPE in LocalFS#delete()

This closes #2859.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47db9cb1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47db9cb1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47db9cb1

Branch: refs/heads/master
Commit: 47db9cb1a867870a8da0b403e0ec217ac461ba04
Parents: dc5dd51
Author: zentol <ch...@apache.org>
Authored: Wed Nov 23 15:37:06 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 12 12:45:51 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/local/LocalFileSystem.java     | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47db9cb1/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 3aaa85f..7ad68b3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -184,8 +184,13 @@ public class LocalFileSystem extends FileSystem {
 		final File file = pathToFile(f);
 		if (file.isFile()) {
 			return file.delete();
-		} else if ((!recursive) && file.isDirectory() && (file.listFiles().length != 0)) {
-			throw new IOException("Directory " + file.toString() + " is not empty");
+		} else if ((!recursive) && file.isDirectory()) {
+			File[] containedFiles = file.listFiles();
+			if (containedFiles == null) {
+				throw new IOException("Directory " + file.toString() + " does not exist or an I/O error occurred");
+			} else if (containedFiles.length != 0) {
+				throw new IOException("Directory " + file.toString() + " is not empty");
+			}
 		}
 
 		return delete(file);


[3/4] flink git commit: [FLINK-5211] [metrics] [docs] Include example reporter configuration

Posted by ch...@apache.org.
[FLINK-5211] [metrics] [docs] Include example reporter configuration

This closes #2972.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36bf9938
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36bf9938
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36bf9938

Branch: refs/heads/master
Commit: 36bf99381dbee6425da51c7448d632ced19c0992
Parents: 365cd98
Author: zentol <ch...@apache.org>
Authored: Wed Nov 30 16:20:45 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 12 12:45:26 2016 +0100

----------------------------------------------------------------------
 docs/monitoring/metrics.md | 48 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 48 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/36bf9938/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 2b1e9b5..afbce90 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -316,6 +316,16 @@ range is specified the actual port is shown in the relevant job or task manager
 specify a port no extra JMX server will be started. Metrics are still available on the default
 local JMX interface.
 
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: jmx
+metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.jmx.port: 8789
+
+{% endhighlight %}
+
 ### Ganglia (org.apache.flink.metrics.ganglia.GangliaReporter)
 Dependency:
 {% highlight xml %}
@@ -335,6 +345,21 @@ Parameters:
 - `ttl` - time-to-live for transmitted UDP packets
 - `addressingMode` - UDP addressing mode to use (UNICAST/MULTICAST)
 
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: gang
+metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
+metrics.reporter.gang.host: localhost
+metrics.reporter.gang.port: 8649
+metrics.reporter.gang.tmax: 60
+metrics.reporter.gang.dmax: 0
+metrics.reporter.gang.ttl: 1
+metrics.reporter.gang.addressingMode: MULTICAST
+
+{% endhighlight %}
+
 ### Graphite (org.apache.flink.metrics.graphite.GraphiteReporter)
 Dependency:
 {% highlight xml %}
@@ -351,6 +376,18 @@ Parameters:
 - `port` - the Graphite server port
 - `protocol` - protocol to use (TCP/UDP)
 
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: grph
+metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
+metrics.reporter.grph.host: localhost
+metrics.reporter.grph.port: 2003
+metrics.reporter.grph.protocol: TCP
+
+{% endhighlight %}
+
 ### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
 Dependency:
 {% highlight xml %}
@@ -366,6 +403,17 @@ Parameters:
 - `host` - the StatsD server host
 - `port` - the StatsD server port
 
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: stsd
+metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
+metrics.reporter.stsd.host: localhost
+metrics.reporter.stsd.port: 8125
+
+{% endhighlight %}
+
 ## System metrics
 
 Flink exposes the following system metrics:


[2/4] flink git commit: [FLINK-5206] [py] Use random file names in tests

Posted by ch...@apache.org.
[FLINK-5206] [py] Use random file names in tests

This closes #2973.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4befbb8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4befbb8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4befbb8c

Branch: refs/heads/master
Commit: 4befbb8cc64ebe16071cd355bf5456785323ea9b
Parents: 36bf993
Author: zentol <ch...@apache.org>
Authored: Thu Dec 8 19:29:03 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 12 12:45:26 2016 +0100

----------------------------------------------------------------------
 .../test/python/org/apache/flink/python/api/test_main.py  | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4befbb8c/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
index 29eb36b..5818dc7 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -25,6 +25,7 @@ from flink.functions.GroupReduceFunction import GroupReduceFunction
 from flink.plan.Constants import Order, WriteMode
 from flink.plan.Constants import INT, STRING
 import struct
+from uuid import uuid4
 from utils import Id, Verify
 
 if __name__ == "__main__":
@@ -58,14 +59,17 @@ if __name__ == "__main__":
     #CSV Source/Sink
     csv_data = env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, STRING))
 
-    csv_data.write_csv("/tmp/flink/result1", line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE)
+    out = "flink_python_" + str(uuid4())
+    csv_data.write_csv("/tmp/flink/" + out, line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE)
 
-    d8.write_csv("/tmp/flink/result2", line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE)
+    out = "flink_python_" + str(uuid4())
+    d8.write_csv("/tmp/flink/" + out, line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE)
 
     #Text Source/Sink
     text_data = env.read_text("src/test/python/org/apache/flink/python/api/data_text")
 
-    text_data.write_text("/tmp/flink/result2", WriteMode.OVERWRITE)
+    out = "flink_python_" + str(uuid4())
+    text_data.write_text("/tmp/flink/" + out, WriteMode.OVERWRITE)
 
     #Types
     env.from_elements(bytearray(b"hello"), bytearray(b"world"))\