You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/12/12 14:01:28 UTC
[1/4] flink git commit: [FLINK-4906] [metrics] Introduce constants
for IO metrics
Repository: flink
Updated Branches:
refs/heads/master 365cd987c -> 47db9cb1a
[FLINK-4906] [metrics] Introduce constants for IO metrics
This closes #2980.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc5dd510
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc5dd510
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc5dd510
Branch: refs/heads/master
Commit: dc5dd5106738e393761a62a56d9e684c722c516f
Parents: 4befbb8
Author: zentol <ch...@apache.org>
Authored: Fri Dec 9 14:10:10 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 12 12:45:26 2016 +0100
----------------------------------------------------------------------
.../webmonitor/handlers/JobDetailsHandler.java | 9 ++---
.../handlers/JobVertexDetailsHandler.java | 9 ++---
.../handlers/JobVertexTaskManagersHandler.java | 9 ++---
.../SubtaskExecutionAttemptDetailsHandler.java | 9 ++---
.../flink/runtime/metrics/MetricNames.java | 38 ++++++++++++++++++++
.../metrics/groups/OperatorIOMetricGroup.java | 9 ++---
.../metrics/groups/TaskIOMetricGroup.java | 21 +++++------
7 files changed, 74 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 6de6dc5..35e6ca7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
@@ -161,10 +162,10 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
fetcher.update();
MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(graph.getJobID().toString(), ejv.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
if (metrics != null) {
- numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
- numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0"));
- numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
- numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+ numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+ numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+ numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+ numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 14dcd39..32626ba 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
@@ -99,10 +100,10 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
fetcher.update();
MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), jobVertex.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
if (metrics != null) {
- numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
- numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0"));
- numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
- numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+ numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+ numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+ numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+ numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index c1fabf8..f468d35 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
@@ -120,10 +121,10 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
fetcher.update();
MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), vertex.getParallelSubtaskIndex());
if (metrics != null) {
- numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
- numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0"));
- numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
- numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+ numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+ numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+ numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+ numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index ca9c7ad..da8db02 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
@@ -86,10 +87,10 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
fetcher.update();
MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), execAttempt.getParallelSubtaskIndex());
if (metrics != null) {
- numBytesIn = Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
- numBytesOut = Long.valueOf(metrics.getMetric("numBytesOut", "0"));
- numRecordsIn = Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
- numRecordsOut = Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+ numBytesIn = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+ numBytesOut = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+ numRecordsIn = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+ numRecordsOut = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
new file mode 100644
index 0000000..9202ca1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+public class MetricNames {
+ private MetricNames() {
+ }
+
+ private static final String SUFFIX_RATE = "PerSecond";
+
+ public static final String IO_NUM_RECORDS_IN = "numRecordsIn";
+ public static final String IO_NUM_RECORDS_OUT = "numRecordsOut";
+ public static final String IO_NUM_RECORDS_IN_RATE = IO_NUM_RECORDS_IN + SUFFIX_RATE;
+ public static final String IO_NUM_RECORDS_OUT_RATE = IO_NUM_RECORDS_OUT + SUFFIX_RATE;
+
+ public static final String IO_NUM_BYTES_IN = "numBytesIn";
+ public static final String IO_NUM_BYTES_IN_LOCAL = IO_NUM_BYTES_IN + "Local";
+ public static final String IO_NUM_BYTES_IN_REMOTE = IO_NUM_BYTES_IN + "Remote";
+ public static final String IO_NUM_BYTES_OUT = "numBytesOut";
+ public static final String IO_NUM_BYTES_IN_LOCAL_RATE = IO_NUM_BYTES_IN_LOCAL + SUFFIX_RATE;
+ public static final String IO_NUM_BYTES_IN_REMOTE_RATE = IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE;
+ public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + SUFFIX_RATE;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
index 2e321fe..5bf7d1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.metrics.MetricNames;
/**
* Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
@@ -35,10 +36,10 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup>
public OperatorIOMetricGroup(OperatorMetricGroup parentMetricGroup) {
super(parentMetricGroup);
- numRecordsIn = parentMetricGroup.counter("numRecordsIn");
- numRecordsOut = parentMetricGroup.counter("numRecordsOut");
- numRecordsInRate = parentMetricGroup.meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60));
- numRecordsOutRate = parentMetricGroup.meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60));
+ numRecordsIn = parentMetricGroup.counter(MetricNames.IO_NUM_RECORDS_IN);
+ numRecordsOut = parentMetricGroup.counter(MetricNames.IO_NUM_RECORDS_OUT);
+ numRecordsInRate = parentMetricGroup.meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60));
+ numRecordsOutRate = parentMetricGroup.meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60));
}
public Counter getNumRecordsInCounter() {
http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index c5296fb..fcea098 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.executiongraph.IOMetrics;
@@ -53,16 +54,16 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
public TaskIOMetricGroup(TaskMetricGroup parent) {
super(parent);
- this.numBytesOut = counter("numBytesOut");
- this.numBytesInLocal = counter("numBytesInLocal");
- this.numBytesInRemote = counter("numBytesInRemote");
- this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60));
- this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60));
- this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60));
- this.numRecordsIn = counter("numRecordsIn", new SumCounter());
- this.numRecordsOut = counter("numRecordsOut", new SumCounter());
- this.numRecordsInRate = meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60));
- this.numRecordsOutRate = meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60));
+ this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT);
+ this.numBytesInLocal = counter(MetricNames.IO_NUM_BYTES_IN_LOCAL);
+ this.numBytesInRemote = counter(MetricNames.IO_NUM_BYTES_IN_REMOTE);
+ this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut, 60));
+ this.numBytesInRateLocal = meter(MetricNames.IO_NUM_BYTES_IN_LOCAL_RATE, new MeterView(numBytesInLocal, 60));
+ this.numBytesInRateRemote = meter(MetricNames.IO_NUM_BYTES_IN_REMOTE_RATE, new MeterView(numBytesInRemote, 60));
+ this.numRecordsIn = counter(MetricNames.IO_NUM_RECORDS_IN, new SumCounter());
+ this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, new SumCounter());
+ this.numRecordsInRate = meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60));
+ this.numRecordsOutRate = meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60));
}
public IOMetrics createSnapshot() {
[4/4] flink git commit: [FLINK-5147] Prevent NPE in LocalFS#delete()
Posted by ch...@apache.org.
[FLINK-5147] Prevent NPE in LocalFS#delete()
This closes #2859.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47db9cb1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47db9cb1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47db9cb1
Branch: refs/heads/master
Commit: 47db9cb1a867870a8da0b403e0ec217ac461ba04
Parents: dc5dd51
Author: zentol <ch...@apache.org>
Authored: Wed Nov 23 15:37:06 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 12 12:45:51 2016 +0100
----------------------------------------------------------------------
.../org/apache/flink/core/fs/local/LocalFileSystem.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/47db9cb1/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 3aaa85f..7ad68b3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -184,8 +184,13 @@ public class LocalFileSystem extends FileSystem {
final File file = pathToFile(f);
if (file.isFile()) {
return file.delete();
- } else if ((!recursive) && file.isDirectory() && (file.listFiles().length != 0)) {
- throw new IOException("Directory " + file.toString() + " is not empty");
+ } else if ((!recursive) && file.isDirectory()) {
+ File[] containedFiles = file.listFiles();
+ if (containedFiles == null) {
+ throw new IOException("Directory " + file.toString() + " does not exist or an I/O error occurred");
+ } else if (containedFiles.length != 0) {
+ throw new IOException("Directory " + file.toString() + " is not empty");
+ }
}
return delete(file);
[3/4] flink git commit: [FLINK-5211] [metrics] [docs] Include example
reporter configuration
Posted by ch...@apache.org.
[FLINK-5211] [metrics] [docs] Include example reporter configuration
This closes #2972.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36bf9938
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36bf9938
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36bf9938
Branch: refs/heads/master
Commit: 36bf99381dbee6425da51c7448d632ced19c0992
Parents: 365cd98
Author: zentol <ch...@apache.org>
Authored: Wed Nov 30 16:20:45 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 12 12:45:26 2016 +0100
----------------------------------------------------------------------
docs/monitoring/metrics.md | 48 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 48 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/36bf9938/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 2b1e9b5..afbce90 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -316,6 +316,16 @@ range is specified the actual port is shown in the relevant job or task manager
specify a port no extra JMX server will be started. Metrics are still available on the default
local JMX interface.
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: jmx
+metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
+metrics.reporter.jmx.port: 8789
+
+{% endhighlight %}
+
### Ganglia (org.apache.flink.metrics.ganglia.GangliaReporter)
Dependency:
{% highlight xml %}
@@ -335,6 +345,21 @@ Parameters:
- `ttl` - time-to-live for transmitted UDP packets
- `addressingMode` - UDP addressing mode to use (UNICAST/MULTICAST)
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: gang
+metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
+metrics.reporter.gang.host: localhost
+metrics.reporter.gang.port: 8649
+metrics.reporter.gang.tmax: 60
+metrics.reporter.gang.dmax: 0
+metrics.reporter.gang.ttl: 1
+metrics.reporter.gang.addressingMode: MULTICAST
+
+{% endhighlight %}
+
### Graphite (org.apache.flink.metrics.graphite.GraphiteReporter)
Dependency:
{% highlight xml %}
@@ -351,6 +376,18 @@ Parameters:
- `port` - the Graphite server port
- `protocol` - protocol to use (TCP/UDP)
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: grph
+metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
+metrics.reporter.grph.host: localhost
+metrics.reporter.grph.port: 2003
+metrics.reporter.grph.protocol: TCP
+
+{% endhighlight %}
+
### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
Dependency:
{% highlight xml %}
@@ -366,6 +403,17 @@ Parameters:
- `host` - the StatsD server host
- `port` - the StatsD server port
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: stsd
+metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
+metrics.reporter.stsd.host: localhost
+metrics.reporter.stsd.port: 8125
+
+{% endhighlight %}
+
## System metrics
Flink exposes the following system metrics:
[2/4] flink git commit: [FLINK-5206] [py] Use random file names in
tests
Posted by ch...@apache.org.
[FLINK-5206] [py] Use random file names in tests
This closes #2973.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4befbb8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4befbb8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4befbb8c
Branch: refs/heads/master
Commit: 4befbb8cc64ebe16071cd355bf5456785323ea9b
Parents: 36bf993
Author: zentol <ch...@apache.org>
Authored: Thu Dec 8 19:29:03 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 12 12:45:26 2016 +0100
----------------------------------------------------------------------
.../test/python/org/apache/flink/python/api/test_main.py | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4befbb8c/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
index 29eb36b..5818dc7 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -25,6 +25,7 @@ from flink.functions.GroupReduceFunction import GroupReduceFunction
from flink.plan.Constants import Order, WriteMode
from flink.plan.Constants import INT, STRING
import struct
+from uuid import uuid4
from utils import Id, Verify
if __name__ == "__main__":
@@ -58,14 +59,17 @@ if __name__ == "__main__":
#CSV Source/Sink
csv_data = env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, STRING))
- csv_data.write_csv("/tmp/flink/result1", line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE)
+ out = "flink_python_" + str(uuid4())
+ csv_data.write_csv("/tmp/flink/" + out, line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE)
- d8.write_csv("/tmp/flink/result2", line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE)
+ out = "flink_python_" + str(uuid4())
+ d8.write_csv("/tmp/flink/" + out, line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE)
#Text Source/Sink
text_data = env.read_text("src/test/python/org/apache/flink/python/api/data_text")
- text_data.write_text("/tmp/flink/result2", WriteMode.OVERWRITE)
+ out = "flink_python_" + str(uuid4())
+ text_data.write_text("/tmp/flink/" + out, WriteMode.OVERWRITE)
#Types
env.from_elements(bytearray(b"hello"), bytearray(b"world"))\