You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/01/12 19:19:14 UTC

[32/34] hadoop git commit: HADOOP-11400. GraphiteSink does not reconnect to Graphite after 'broken pipe' (Kamil Gorlo via raviprak)

HADOOP-11400. GraphiteSink does not reconnect to Graphite after 'broken pipe'
(Kamil Gorlo via raviprak)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1c5eb1bc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1c5eb1bc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1c5eb1bc

Branch: refs/heads/HDFS-EC
Commit: 1c5eb1bc8acbf02cba9c422a7b2d220e37fc59e9
Parents: d77a397
Author: Ravi Prakash <ra...@apache.org>
Authored: Sat Jan 10 08:35:40 2015 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Jan 12 10:18:03 2015 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../hadoop/metrics2/sink/GraphiteSink.java      | 141 ++++++++++++++-----
 .../metrics2/impl/TestGraphiteMetrics.java      | 111 +++++++++------
 3 files changed, 175 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c5eb1bc/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 90bfe3e..679989a 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -488,6 +488,9 @@ Release 2.7.0 - UNRELEASED
 
   BUG FIXES
 
+    HADOOP 11400. GraphiteSink does not reconnect to Graphite after 'broken pipe' 
+    (Kamil Gorlo via raviprak)
+
     HADOOP-11236. NFS: Fix javadoc warning in RpcProgram.java (Abhiraj Butala via harsh)
 
     HADOOP-11166. Remove ulimit from test-patch.sh. (wang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c5eb1bc/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java
index e72fe24..e46a654 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java
@@ -18,25 +18,24 @@
 
 package org.apache.hadoop.metrics2.sink;
 
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.io.Closeable;
-import java.net.Socket;
-
 import org.apache.commons.configuration.SubsetConfiguration;
 import org.apache.commons.io.Charsets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsException;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsTag;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.Socket;
+
 /**
  * A metrics sink that writes to a Graphite server
  */
@@ -47,30 +46,22 @@ public class GraphiteSink implements MetricsSink, Closeable {
     private static final String SERVER_HOST_KEY = "server_host";
     private static final String SERVER_PORT_KEY = "server_port";
     private static final String METRICS_PREFIX = "metrics_prefix";
-    private Writer writer = null;
     private String metricsPrefix = null;
-    private Socket socket = null;
+    private Graphite graphite = null;
 
     @Override
     public void init(SubsetConfiguration conf) {
         // Get Graphite host configurations.
-        String serverHost = conf.getString(SERVER_HOST_KEY);
-        Integer serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY));
+        final String serverHost = conf.getString(SERVER_HOST_KEY);
+        final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY));
 
         // Get Graphite metrics graph prefix.
         metricsPrefix = conf.getString(METRICS_PREFIX);
         if (metricsPrefix == null)
             metricsPrefix = "";
 
-        try {
-            // Open an connection to Graphite server.
-            socket = new Socket(serverHost, serverPort);
-            writer = new OutputStreamWriter(
-                socket.getOutputStream(), Charsets.UTF_8);
-        } catch (Exception e) {
-            throw new MetricsException("Error creating connection, "
-                    + serverHost + ":" + serverPort, e);
-        }
+        graphite = new Graphite(serverHost, serverPort);
+        graphite.connect();
     }
 
     @Override
@@ -104,39 +95,111 @@ public class GraphiteSink implements MetricsSink, Closeable {
         }
 
         try {
-            if(writer != null){
-              writer.write(lines.toString());
-            } else {
-              throw new MetricsException("Writer in GraphiteSink is null!");
-            }
+          graphite.write(lines.toString());
         } catch (Exception e) {
-            throw new MetricsException("Error sending metrics", e);
+          LOG.warn("Error sending metrics to Graphite", e);
+          try {
+            graphite.close();
+          } catch (Exception e1) {
+            throw new MetricsException("Error closing connection to Graphite", e1);
+          }
         }
     }
 
     @Override
     public void flush() {
+      try {
+        graphite.flush();
+      } catch (Exception e) {
+        LOG.warn("Error flushing metrics to Graphite", e);
         try {
-            writer.flush();
-        } catch (Exception e) {
-            throw new MetricsException("Error flushing metrics", e);
+          graphite.close();
+        } catch (Exception e1) {
+          throw new MetricsException("Error closing connection to Graphite", e1);
         }
+      }
     }
 
     @Override
     public void close() throws IOException {
-      try {
-        IOUtils.closeStream(writer);
-        writer = null;
-        LOG.info("writer in GraphiteSink is closed!");
-      } catch (Throwable e){
-        throw new MetricsException("Error closing writer", e);
-      } finally {
-        if (socket != null && !socket.isClosed()) {
-          socket.close();
+      graphite.close();
+    }
+
+    public static class Graphite {
+      private final static int MAX_CONNECTION_FAILURES = 5;
+
+      private String serverHost;
+      private int serverPort;
+      private Writer writer = null;
+      private Socket socket = null;
+      private int connectionFailures = 0;
+
+      public Graphite(String serverHost, int serverPort) {
+        this.serverHost = serverHost;
+        this.serverPort = serverPort;
+      }
+
+      public void connect() {
+        if (isConnected()) {
+          throw new MetricsException("Already connected to Graphite");
+        }
+        if (tooManyConnectionFailures()) {
+          // return silently (there was ERROR in logs when we reached limit for the first time)
+          return;
+        }
+        try {
+          // Open a connection to Graphite server.
+          socket = new Socket(serverHost, serverPort);
+          writer = new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8);
+        } catch (Exception e) {
+          connectionFailures++;
+          if (tooManyConnectionFailures()) {
+            // first time when connection limit reached, report to logs
+            LOG.error("Too many connection failures, would not try to connect again.");
+          }
+          throw new MetricsException("Error creating connection, "
+              + serverHost + ":" + serverPort, e);
+        }
+      }
+
+      public void write(String msg) throws IOException {
+        if (!isConnected()) {
+          connect();
+        }
+        if (isConnected()) {
+          writer.write(msg);
+        }
+      }
+
+      public void flush() throws IOException {
+        if (isConnected()) {
+          writer.flush();
+        }
+      }
+
+      public boolean isConnected() {
+        return socket != null && socket.isConnected() && !socket.isClosed();
+      }
+
+      public void close() throws IOException {
+        try {
+          if (writer != null) {
+            writer.close();
+          }
+        } catch (IOException ex) {
+          if (socket != null) {
+            socket.close();
+          }
+        } finally {
           socket = null;
-          LOG.info("socket in GraphiteSink is closed!");
+          writer = null;
         }
       }
+
+      private boolean tooManyConnectionFailures() {
+        return connectionFailures > MAX_CONNECTION_FAILURES;
+      }
+
     }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c5eb1bc/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java
index 09f0081..5fac41e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java
@@ -18,23 +18,7 @@
 
 package org.apache.hadoop.metrics2.impl;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 import org.apache.hadoop.metrics2.AbstractMetric;
-import org.apache.hadoop.metrics2.MetricsException;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsTag;
 import org.apache.hadoop.metrics2.sink.GraphiteSink;
@@ -42,6 +26,23 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.internal.util.reflection.Whitebox;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.reset;
+
+
 public class TestGraphiteMetrics {
     private AbstractMetric makeMetric(String name, Number value) {
         AbstractMetric metric = mock(AbstractMetric.class);
@@ -50,6 +51,12 @@ public class TestGraphiteMetrics {
         return metric;
     }
 
+    private GraphiteSink.Graphite makeGraphite() {
+      GraphiteSink.Graphite mockGraphite = mock(GraphiteSink.Graphite.class);
+      when(mockGraphite.isConnected()).thenReturn(true);
+      return mockGraphite;
+    }
+
     @Test
     public void testPutMetrics() {
         GraphiteSink sink = new GraphiteSink();
@@ -61,18 +68,18 @@ public class TestGraphiteMetrics {
         metrics.add(makeMetric("foo2", 2.25));
         MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics);
 
-        OutputStreamWriter mockWriter = mock(OutputStreamWriter.class);
         ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
-        Whitebox.setInternalState(sink, "writer", mockWriter);
+        final GraphiteSink.Graphite mockGraphite = makeGraphite();
+        Whitebox.setInternalState(sink, "graphite", mockGraphite);
         sink.putMetrics(record);
 
         try {
-            verify(mockWriter).write(argument.capture());
+          verify(mockGraphite).write(argument.capture());
         } catch (IOException e) {
-            e.printStackTrace();
+          e.printStackTrace();
         }
 
-        String result = argument.getValue().toString();
+        String result = argument.getValue();
 
         assertEquals(true,
             result.equals("null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n" +
@@ -86,24 +93,25 @@ public class TestGraphiteMetrics {
         GraphiteSink sink = new GraphiteSink();
         List<MetricsTag> tags = new ArrayList<MetricsTag>();
         tags.add(new MetricsTag(MsInfo.Context, "all"));
-        tags.add(new MetricsTag(MsInfo.Hostname, null));
+      tags.add(new MetricsTag(MsInfo.Hostname, null));
         Set<AbstractMetric> metrics = new HashSet<AbstractMetric>();
         metrics.add(makeMetric("foo1", 1));
         metrics.add(makeMetric("foo2", 2));
         MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics);
 
-        OutputStreamWriter mockWriter = mock(OutputStreamWriter.class);
+
         ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
-        Whitebox.setInternalState(sink, "writer", mockWriter);
+        final GraphiteSink.Graphite mockGraphite = makeGraphite();
+        Whitebox.setInternalState(sink, "graphite", mockGraphite);
         sink.putMetrics(record);
 
         try {
-            verify(mockWriter).write(argument.capture());
+            verify(mockGraphite).write(argument.capture());
         } catch (IOException e) {
             e.printStackTrace();
         }
 
-        String result = argument.getValue().toString();
+        String result = argument.getValue();
 
         assertEquals(true,
             result.equals("null.all.Context.Context=all.foo1 1 10\n" + 
@@ -120,8 +128,8 @@ public class TestGraphiteMetrics {
 
       // setup GraphiteSink
       GraphiteSink sink = new GraphiteSink();
-      ByteArrayOutputStream out = new ByteArrayOutputStream();
-      Whitebox.setInternalState(sink, "writer", new OutputStreamWriter(out));
+      final GraphiteSink.Graphite mockGraphite = makeGraphite();
+      Whitebox.setInternalState(sink, "graphite", mockGraphite);
 
       // given two metrics records with timestamps 1000 milliseconds apart.
       List<MetricsTag> tags = Collections.emptyList();
@@ -141,15 +149,16 @@ public class TestGraphiteMetrics {
       }
 
       // then the timestamps in the graphite stream should differ by one second.
-      String expectedOutput
-        = "null.default.Context.foo1 1 1000000000\n"
-        + "null.default.Context.foo1 1 1000000001\n";
-      assertEquals(expectedOutput, out.toString());
+      try {
+        verify(mockGraphite).write(eq("null.default.Context.foo1 1 1000000000\n"));
+        verify(mockGraphite).write(eq("null.default.Context.foo1 1 1000000001\n"));
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
     }
 
-
-    @Test(expected=MetricsException.class)
-    public void testCloseAndWrite() throws IOException {
+    @Test
+    public void testFailureAndPutMetrics() throws IOException {
       GraphiteSink sink = new GraphiteSink();
       List<MetricsTag> tags = new ArrayList<MetricsTag>();
       tags.add(new MetricsTag(MsInfo.Context, "all"));
@@ -159,18 +168,38 @@ public class TestGraphiteMetrics {
       metrics.add(makeMetric("foo2", 2.25));
       MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics);
 
-      OutputStreamWriter writer = mock(OutputStreamWriter.class);
+      final GraphiteSink.Graphite mockGraphite = makeGraphite();
+      Whitebox.setInternalState(sink, "graphite", mockGraphite);
+
+      // throw exception when first try
+      doThrow(new IOException("IO exception")).when(mockGraphite).write(anyString());
 
-      Whitebox.setInternalState(sink, "writer", writer);
-      sink.close();
       sink.putMetrics(record);
+      verify(mockGraphite).write(anyString());
+      verify(mockGraphite).close();
+
+      // reset mock and try again
+      reset(mockGraphite);
+      when(mockGraphite.isConnected()).thenReturn(false);
+
+      ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+      sink.putMetrics(record);
+
+      verify(mockGraphite).write(argument.capture());
+      String result = argument.getValue();
+
+      assertEquals(true,
+          result.equals("null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n" +
+          "null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n") ||
+          result.equals("null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n" +
+          "null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n"));
     }
 
     @Test
     public void testClose(){
         GraphiteSink sink = new GraphiteSink();
-        Writer mockWriter = mock(Writer.class);
-        Whitebox.setInternalState(sink, "writer", mockWriter);
+        final GraphiteSink.Graphite mockGraphite = makeGraphite();
+        Whitebox.setInternalState(sink, "graphite", mockGraphite);
         try {
             sink.close();
         } catch (IOException ioe) {
@@ -178,7 +207,7 @@ public class TestGraphiteMetrics {
         }
 
         try {
-            verify(mockWriter).close();
+            verify(mockGraphite).close();
         } catch (IOException ioe) {
             ioe.printStackTrace();
         }