You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by at...@apache.org on 2014/11/22 01:36:59 UTC

[2/2] hadoop git commit: HDFS-7331. Add Datanode network counts to datanode jmx page. Contributed by Charles Lamb.

HDFS-7331. Add Datanode network counts to datanode jmx page. Contributed by Charles Lamb.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2d4f3e56
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2d4f3e56
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2d4f3e56

Branch: refs/heads/trunk
Commit: 2d4f3e567e4bb8068c028de12df118a4f3fa6343
Parents: b8c094b
Author: Aaron T. Myers <at...@apache.org>
Authored: Fri Nov 21 16:34:08 2014 -0800
Committer: Aaron T. Myers <at...@apache.org>
Committed: Fri Nov 21 16:36:39 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 ++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  2 +
 .../hadoop/hdfs/server/datanode/DataNode.java   | 47 ++++++++++++++++++++
 .../hdfs/server/datanode/DataNodeMXBean.java    |  7 +++
 .../hdfs/server/datanode/DataXceiver.java       | 27 ++++++-----
 .../server/datanode/TestDataNodeMetrics.java    | 18 ++++++++
 6 files changed, 94 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d4f3e56/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 408a6ed..3f12cec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -385,6 +385,9 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7420. Delegate permission checks to FSDirectory. (wheat9)
 
+    HDFS-7331. Add Datanode network counts to datanode jmx page. (Charles Lamb
+    via atm)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d4f3e56/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index af18f4d..78cae9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -155,6 +155,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final float   DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10.0f;
   public static final String  DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES = "dfs.datanode.ram.disk.low.watermark.bytes";
   public static final long    DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT = DFS_BLOCK_SIZE_DEFAULT;
+  public static final String  DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size";
+  public static final int     DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
 
   // This setting is for testing/internal use only.
   public static final String  DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d4f3e56/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a53698a..2ff6870 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -38,6 +38,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
@@ -77,6 +79,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -84,6 +87,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.ObjectName;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -299,6 +305,9 @@ public class DataNode extends ReconfigurableBase
   DataNodeMetrics metrics;
   private InetSocketAddress streamingAddr;
   
+  // See the note below in incrDatanodeNetworkErrors re: concurrency.
+  private LoadingCache<String, Map<String, Long>> datanodeNetworkCounts;
+
   private String hostName;
   private DatanodeID id;
   
@@ -414,6 +423,20 @@ public class DataNode extends ReconfigurableBase
       shutdown();
       throw ie;
     }
+    final int dncCacheMaxSize =
+        conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY,
+            DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ;
+    datanodeNetworkCounts =
+        CacheBuilder.newBuilder()
+            .maximumSize(dncCacheMaxSize)
+            .build(new CacheLoader<String, Map<String, Long>>() {
+              @Override
+              public Map<String, Long> load(String key) throws Exception {
+                final Map<String, Long> ret = new HashMap<String, Long>();
+                ret.put("networkErrors", 0L);
+                return ret;
+              }
+            });
   }
 
   @Override
@@ -1767,6 +1790,30 @@ public class DataNode extends ReconfigurableBase
   public int getXceiverCount() {
     return threadGroup == null ? 0 : threadGroup.activeCount();
   }
+
+  @Override // DataNodeMXBean
+  public Map<String, Map<String, Long>> getDatanodeNetworkCounts() {
+    return datanodeNetworkCounts.asMap();
+  }
+
+  void incrDatanodeNetworkErrors(String host) {
+    metrics.incrDatanodeNetworkErrors();
+
+    /*
+     * Synchronizing on the whole cache is a big hammer, but since it's only
+     * accumulating errors, it should be ok. If this is ever expanded to include
+     * non-error stats, then finer-grained concurrency should be applied.
+     */
+    synchronized (datanodeNetworkCounts) {
+      try {
+        final Map<String, Long> curCount = datanodeNetworkCounts.get(host);
+        curCount.put("networkErrors", curCount.get("networkErrors") + 1L);
+        datanodeNetworkCounts.put(host, curCount);
+      } catch (ExecutionException e) {
+        LOG.warn("failed to increment network error counts for " + host);
+      }
+    }
+  }
   
   int getXmitsInProgress() {
     return xmitsInProgress.get();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d4f3e56/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
index 8e80c58..92abd88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.datanode;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import java.util.Map;
+
 /**
  * 
  * This is the JMX management interface for data node information
@@ -76,4 +78,9 @@ public interface DataNodeMXBean {
    * actively transferring blocks.
    */
   public int getXceiverCount();
+
+  /**
+   * Gets the network error counts on a per-Datanode basis.
+   */
+  public Map<String, Map<String, Long>> getDatanodeNetworkCounts();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d4f3e56/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index a235c20..61b9c67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -97,6 +97,7 @@ class DataXceiver extends Receiver implements Runnable {
   
   private Peer peer;
   private final String remoteAddress; // address of remote side
+  private final String remoteAddressWithoutPort; // only the address, no port
   private final String localAddress;  // local address of this daemon
   private final DataNode datanode;
   private final DNConf dnConf;
@@ -129,6 +130,9 @@ class DataXceiver extends Receiver implements Runnable {
     this.dataXceiverServer = dataXceiverServer;
     this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
     remoteAddress = peer.getRemoteAddressString();
+    final int colonIdx = remoteAddress.indexOf(':');
+    remoteAddressWithoutPort =
+        (colonIdx < 0) ? remoteAddress : remoteAddress.substring(0, colonIdx);
     localAddress = peer.getLocalAddressString();
 
     if (LOG.isDebugEnabled()) {
@@ -222,7 +226,7 @@ class DataXceiver extends Receiver implements Runnable {
               LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
             }
           } else {
-            datanode.metrics.incrDatanodeNetworkErrors();
+            incrDatanodeNetworkErrors();
             throw err;
           }
           break;
@@ -521,7 +525,7 @@ class DataXceiver extends Receiver implements Runnable {
         } catch (IOException ioe) {
           LOG.debug("Error reading client status response. Will close connection.", ioe);
           IOUtils.closeStream(out);
-          datanode.metrics.incrDatanodeNetworkErrors();
+          incrDatanodeNetworkErrors();
         }
       } else {
         IOUtils.closeStream(out);
@@ -543,7 +547,7 @@ class DataXceiver extends Receiver implements Runnable {
       if (!(ioe instanceof SocketTimeoutException)) {
         LOG.warn(dnR + ":Got exception while serving " + block + " to "
           + remoteAddress, ioe);
-        datanode.metrics.incrDatanodeNetworkErrors();
+        incrDatanodeNetworkErrors();
       }
       throw ioe;
     } finally {
@@ -722,7 +726,7 @@ class DataXceiver extends Receiver implements Runnable {
             LOG.info(datanode + ":Exception transfering " +
                      block + " to mirror " + mirrorNode +
                      "- continuing without the mirror", e);
-            datanode.metrics.incrDatanodeNetworkErrors();
+            incrDatanodeNetworkErrors();
           }
         }
       }
@@ -777,7 +781,7 @@ class DataXceiver extends Receiver implements Runnable {
       
     } catch (IOException ioe) {
       LOG.info("opWriteBlock " + block + " received exception " + ioe);
-      datanode.metrics.incrDatanodeNetworkErrors();
+      incrDatanodeNetworkErrors();
       throw ioe;
     } finally {
       // close all opened streams
@@ -813,7 +817,7 @@ class DataXceiver extends Receiver implements Runnable {
       writeResponse(Status.SUCCESS, null, out);
     } catch (IOException ioe) {
       LOG.info("transferBlock " + blk + " received exception " + ioe);
-      datanode.metrics.incrDatanodeNetworkErrors();
+      incrDatanodeNetworkErrors();
       throw ioe;
     } finally {
       IOUtils.closeStream(out);
@@ -908,7 +912,7 @@ class DataXceiver extends Receiver implements Runnable {
       out.flush();
     } catch (IOException ioe) {
       LOG.info("blockChecksum " + block + " received exception " + ioe);
-      datanode.metrics.incrDatanodeNetworkErrors();
+      incrDatanodeNetworkErrors();
       throw ioe;
     } finally {
       IOUtils.closeStream(out);
@@ -975,7 +979,7 @@ class DataXceiver extends Receiver implements Runnable {
     } catch (IOException ioe) {
       isOpSuccess = false;
       LOG.info("opCopyBlock " + block + " received exception " + ioe);
-      datanode.metrics.incrDatanodeNetworkErrors();
+      incrDatanodeNetworkErrors();
       throw ioe;
     } finally {
       dataXceiverServer.balanceThrottler.release();
@@ -1108,7 +1112,7 @@ class DataXceiver extends Receiver implements Runnable {
       LOG.info(errMsg);
       if (!IoeDuringCopyBlockOperation) {
         // Don't double count IO errors
-        datanode.metrics.incrDatanodeNetworkErrors();
+        incrDatanodeNetworkErrors();
       }
       throw ioe;
     } finally {
@@ -1128,7 +1132,7 @@ class DataXceiver extends Receiver implements Runnable {
         sendResponse(opStatus, errMsg);
       } catch (IOException ioe) {
         LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
-        datanode.metrics.incrDatanodeNetworkErrors();
+        incrDatanodeNetworkErrors();
       }
       IOUtils.closeStream(proxyOut);
       IOUtils.closeStream(blockReceiver);
@@ -1182,6 +1186,9 @@ class DataXceiver extends Receiver implements Runnable {
     out.flush();
   }
   
+  private void incrDatanodeNetworkErrors() {
+    datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort);
+  }
 
   private void checkAccess(OutputStream out, final boolean reply, 
       final ExtendedBlock blk,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d4f3e56/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
index 90112af..0b85d35 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
@@ -27,7 +27,9 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
@@ -48,6 +50,9 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 public class TestDataNodeMetrics {
   private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class);
 
@@ -217,9 +222,22 @@ public class TestDataNodeMetrics {
       out.writeBytes("old gs data\n");
       out.hflush();
 
+      /* Test the metric. */
       final MetricsRecordBuilder dnMetrics =
           getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
       assertCounter("DatanodeNetworkErrors", 1L, dnMetrics);
+
+      /* Test JMX datanode network counts. */
+      final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      final ObjectName mxbeanName =
+          new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo");
+      final Object dnc =
+          mbs.getAttribute(mxbeanName, "DatanodeNetworkCounts");
+      final String allDnc = dnc.toString();
+      assertTrue("expected to see loopback address",
+          allDnc.indexOf("127.0.0.1") >= 0);
+      assertTrue("expected to see networkErrors",
+          allDnc.indexOf("networkErrors") >= 0);
     } finally {
       IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
       if (cluster != null) {