You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/21 11:26:07 UTC

flink git commit: [FLINK-4780] Make GraphiteReporter protocol configurable

Repository: flink
Updated Branches:
  refs/heads/master 45c3d9b76 -> 3137bf774


[FLINK-4780] Make GraphiteReporter protocol configurable

This closes #2677.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3137bf77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3137bf77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3137bf77

Branch: refs/heads/master
Commit: 3137bf7746f7eb43956f0de1f1b789c8eb635d5d
Parents: 45c3d9b
Author: zentol <ch...@apache.org>
Authored: Sun Oct 9 11:19:38 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Oct 21 13:25:46 2016 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  1 +
 .../metrics/graphite/GraphiteReporter.java      | 25 +++++++++++++++++++-
 2 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3137bf77/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index cb756c5..2b1e9b5 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -349,6 +349,7 @@ Parameters:
 
 - `host` - the Graphite server host
 - `port` - the Graphite server port
+- `protocol` - protocol to use (TCP/UDP)
 
 ### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
 Dependency:

http://git-wip-us.apache.org/repos/asf/flink/blob/3137bf77/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
index ca301aa..038bc3f 100644
--- a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
+++ b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
@@ -21,6 +21,7 @@ package org.apache.flink.metrics.graphite;
 import com.codahale.metrics.ScheduledReporter;
 import com.codahale.metrics.graphite.Graphite;
 
+import com.codahale.metrics.graphite.GraphiteUDP;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 import org.apache.flink.metrics.MetricConfig;
@@ -30,6 +31,13 @@ import java.util.concurrent.TimeUnit;
 @PublicEvolving
 public class GraphiteReporter extends ScheduledDropwizardReporter {
 
+	public static final String ARG_PROTOCOL = "protocol";
+
+	private enum Protocol {
+		TCP,
+		UDP
+	}
+
 	@Override
 	public ScheduledReporter getReporter(MetricConfig config) {
 		String host = config.getString(ARG_HOST, null);
@@ -42,6 +50,7 @@ public class GraphiteReporter extends ScheduledDropwizardReporter {
 		String prefix = config.getString(ARG_PREFIX, null);
 		String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
 		String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
+		String protocol = config.getString(ARG_PROTOCOL, "TCP");
 
 		com.codahale.metrics.graphite.GraphiteReporter.Builder builder =
 			com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry);
@@ -58,6 +67,20 @@ public class GraphiteReporter extends ScheduledDropwizardReporter {
 			builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
 		}
 
-		return builder.build(new Graphite(host, port));
+		Protocol prot;
+		try {
+			prot = Protocol.valueOf(protocol);
+		} catch (IllegalArgumentException iae) {
+			log.warn("Invalid protocol configuration: " + protocol + " Expected: TCP or UDP, defaulting to TCP.");
+			prot = Protocol.TCP;
+		}
+
+		switch(prot) {
+			case UDP:
+				return builder.build(new GraphiteUDP(host, port));				
+			case TCP:
+			default:
+				return builder.build(new Graphite(host, port));
+		}
 	}
 }