You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2016/02/26 00:47:41 UTC

hadoop git commit: HADOOP-12824. Collect network and disk usage on the node running Windows. Contributed by Inigo Goiri.

Repository: hadoop
Updated Branches:
  refs/heads/trunk c4d4df8de -> b2951f9fb


HADOOP-12824. Collect network and disk usage on the node running Windows. Contributed by Inigo Goiri.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b2951f9f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b2951f9f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b2951f9f

Branch: refs/heads/trunk
Commit: b2951f9fbccee8aeab04c1f5ee3fc6db1ef6b2da
Parents: c4d4df8
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Thu Feb 25 15:46:53 2016 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu Feb 25 15:47:05 2016 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../org/apache/hadoop/util/SysInfoWindows.java  |  30 +++-
 .../src/main/winutils/systeminfo.c              | 169 ++++++++++++++++++-
 .../apache/hadoop/util/TestSysInfoWindows.java  |  19 ++-
 4 files changed, 204 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2951f9f/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index c91820e..e5ce0ee 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -1158,6 +1158,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12535. Run FileSystem contract tests with hadoop-azure.
     (Madhumita Chakraborty via cnauroth)
 
+    HADOOP-12824. Collect network and disk usage on the node running Windows.
+    (Inigo Goiri via xyao)
+
   OPTIMIZATIONS
 
     HADOOP-11785. Reduce the number of listStatus operation in distcp

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2951f9f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
index b65569b..de0c43b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
@@ -44,6 +44,10 @@ public class SysInfoWindows extends SysInfo {
   private long cpuFrequencyKhz;
   private long cumulativeCpuTimeMs;
   private float cpuUsage;
+  private long storageBytesRead;
+  private long storageBytesWritten;
+  private long netBytesRead;
+  private long netBytesWritten;
 
   private long lastRefreshTime;
   static final int REFRESH_INTERVAL_MS = 1000;
@@ -67,6 +71,10 @@ public class SysInfoWindows extends SysInfo {
     cpuFrequencyKhz = -1;
     cumulativeCpuTimeMs = -1;
     cpuUsage = -1;
+    storageBytesRead = -1;
+    storageBytesWritten = -1;
+    netBytesRead = -1;
+    netBytesWritten = -1;
   }
 
   String getSystemInfoInfoFromShell() {
@@ -91,7 +99,7 @@ public class SysInfoWindows extends SysInfo {
       reset();
       String sysInfoStr = getSystemInfoInfoFromShell();
       if (sysInfoStr != null) {
-        final int sysInfoSplitCount = 7;
+        final int sysInfoSplitCount = 11;
         String[] sysInfo = sysInfoStr.substring(0, sysInfoStr.indexOf("\r\n"))
             .split(",");
         if (sysInfo.length == sysInfoSplitCount) {
@@ -103,6 +111,10 @@ public class SysInfoWindows extends SysInfo {
             numProcessors = Integer.parseInt(sysInfo[4]);
             cpuFrequencyKhz = Long.parseLong(sysInfo[5]);
             cumulativeCpuTimeMs = Long.parseLong(sysInfo[6]);
+            storageBytesRead = Long.parseLong(sysInfo[7]);
+            storageBytesWritten = Long.parseLong(sysInfo[8]);
+            netBytesRead = Long.parseLong(sysInfo[9]);
+            netBytesWritten = Long.parseLong(sysInfo[10]);
             if (lastCumCpuTimeMs != -1) {
               /**
                * This number will be the aggregated usage across all cores in
@@ -203,27 +215,27 @@ public class SysInfoWindows extends SysInfo {
   /** {@inheritDoc} */
   @Override
   public long getNetworkBytesRead() {
-    // TODO unimplemented
-    return 0L;
+    refreshIfNeeded();
+    return netBytesRead;
   }
 
   /** {@inheritDoc} */
   @Override
   public long getNetworkBytesWritten() {
-    // TODO unimplemented
-    return 0L;
+    refreshIfNeeded();
+    return netBytesWritten;
   }
 
   @Override
   public long getStorageBytesRead() {
-    // TODO unimplemented
-    return 0L;
+    refreshIfNeeded();
+    return storageBytesRead;
   }
 
   @Override
   public long getStorageBytesWritten() {
-    // TODO unimplemented
-    return 0L;
+    refreshIfNeeded();
+    return storageBytesWritten;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2951f9f/hadoop-common-project/hadoop-common/src/main/winutils/systeminfo.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/systeminfo.c b/hadoop-common-project/hadoop-common/src/main/winutils/systeminfo.c
index 48f03ed..b7093a7 100644
--- a/hadoop-common-project/hadoop-common/src/main/winutils/systeminfo.c
+++ b/hadoop-common-project/hadoop-common/src/main/winutils/systeminfo.c
@@ -18,6 +18,8 @@
 #include "winutils.h"
 #include <psapi.h>
 #include <PowrProf.h>
+#include <pdh.h>
+#include <pdhmsg.h>
 
 #ifdef PSAPI_VERSION
 #undef PSAPI_VERSION
@@ -25,6 +27,12 @@
 #define PSAPI_VERSION 1
 #pragma comment(lib, "psapi.lib")
 #pragma comment(lib, "Powrprof.lib")
+#pragma comment(lib, "pdh.lib")
+
+CONST PWSTR COUNTER_PATH_NET_READ_ALL   = L"\\Network Interface(*)\\Bytes Received/Sec";
+CONST PWSTR COUNTER_PATH_NET_WRITE_ALL  = L"\\Network Interface(*)\\Bytes Sent/Sec";
+CONST PWSTR COUNTER_PATH_DISK_READ_ALL  = L"\\LogicalDisk(*)\\Disk Read Bytes/sec";
+CONST PWSTR COUNTER_PATH_DISK_WRITE_ALL = L"\\LogicalDisk(*)\\Disk Write Bytes/sec";
 
 typedef struct _PROCESSOR_POWER_INFORMATION {
    ULONG  Number;
@@ -57,6 +65,7 @@ int SystemInfo()
   PROCESSOR_POWER_INFORMATION const *ppi;
   ULONGLONG cpuFrequencyKhz;
   NTSTATUS status;
+  LONGLONG diskRead, diskWrite, netRead, netWrite;
 
   ZeroMemory(&memInfo, sizeof(PERFORMANCE_INFORMATION));
   memInfo.cb = sizeof(PERFORMANCE_INFORMATION);
@@ -105,8 +114,16 @@ int SystemInfo()
   cpuFrequencyKhz = ppi->MaxMhz*1000;
   LocalFree(pBuffer);
 
-  fwprintf_s(stdout, L"%Iu,%Iu,%Iu,%Iu,%u,%I64u,%I64u\n", vmemSize, memSize,
-    vmemFree, memFree, sysInfo.dwNumberOfProcessors, cpuFrequencyKhz, cpuTimeMs);
+  status = GetDiskAndNetwork(&diskRead, &diskWrite, &netRead, &netWrite);
+  if(0 != status)
+  {
+    fwprintf_s(stderr, L"Error in GetDiskAndNetwork. Err:%d\n", status);
+    return EXIT_FAILURE;
+  }
+
+  fwprintf_s(stdout, L"%Iu,%Iu,%Iu,%Iu,%u,%I64u,%I64u,%I64d,%I64d,%I64d,%I64d\n", vmemSize, memSize,
+    vmemFree, memFree, sysInfo.dwNumberOfProcessors, cpuFrequencyKhz, cpuTimeMs,
+    diskRead, diskWrite, netRead, netWrite);
 
   return EXIT_SUCCESS;
 }
@@ -120,5 +137,151 @@ void SystemInfoUsage()
     VirtualMemorySize(bytes),PhysicalMemorySize(bytes),\n\
     FreeVirtualMemory(bytes),FreePhysicalMemory(bytes),\n\
     NumberOfProcessors,CpuFrequency(Khz),\n\
-    CpuTime(MilliSec,Kernel+User)\n");
+    CpuTime(MilliSec,Kernel+User),\n\
+    DiskRead(bytes),DiskWrite(bytes),\n\
+    NetworkRead(bytes),NetworkWrite(bytes)\n");
+}
+
+int GetDiskAndNetwork(LONGLONG* diskRead, LONGLONG* diskWrite, LONGLONG* netRead, LONGLONG* netWrite)
+{
+  int ret = EXIT_SUCCESS;
+  PDH_STATUS status = ERROR_SUCCESS;
+  PDH_HQUERY hQuery = NULL;
+  PDH_HCOUNTER hCounterNetRead = NULL;
+  PDH_HCOUNTER hCounterNetWrite = NULL;
+  PDH_HCOUNTER hCounterDiskRead = NULL;
+  PDH_HCOUNTER hCounterDiskWrite = NULL;
+  DWORD i;
+
+  if(status = PdhOpenQuery(NULL, 0, &hQuery))
+  {
+    fwprintf_s(stderr, L"PdhOpenQuery failed with 0x%x.\n", status);
+    ret = EXIT_FAILURE;
+    goto cleanup;
+  }
+
+  // Add each one of the counters with wild cards
+  if(status = PdhAddCounter(hQuery, COUNTER_PATH_NET_READ_ALL, 0, &hCounterNetRead))
+  {
+    fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_NET_READ_ALL, status);
+    ret = EXIT_FAILURE;
+    goto cleanup;
+  }
+  if(status = PdhAddCounter(hQuery, COUNTER_PATH_NET_WRITE_ALL, 0, &hCounterNetWrite))
+  {
+    fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_NET_WRITE_ALL, status);
+    ret = EXIT_FAILURE;
+    goto cleanup;
+  }
+  if(status = PdhAddCounter(hQuery, COUNTER_PATH_DISK_READ_ALL, 0, &hCounterDiskRead))
+  {
+    fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_DISK_READ_ALL, status);
+    ret = EXIT_FAILURE;
+    goto cleanup;
+  }
+  if(status = PdhAddCounter(hQuery, COUNTER_PATH_DISK_WRITE_ALL, 0, &hCounterDiskWrite))
+  {
+    fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_DISK_WRITE_ALL, status);
+    ret = EXIT_FAILURE;
+    goto cleanup;
+  }
+
+  if(status = PdhCollectQueryData(hQuery))
+  {
+    fwprintf_s(stderr, L"PdhCollectQueryData() failed with 0x%x.\n", status);
+    ret = EXIT_FAILURE;
+    goto cleanup;
+  }
+
+  // Read and aggregate counters
+  status = ReadTotalCounter(hCounterNetRead, netRead);
+  if(ERROR_SUCCESS != status)
+  {
+    fwprintf_s(stderr, L"ReadTotalCounter(Network Read): Error 0x%x.\n", status);
+    ret = EXIT_FAILURE;
+  }
+
+  status = ReadTotalCounter(hCounterNetWrite, netWrite);
+  if(ERROR_SUCCESS != status)
+  {
+    fwprintf_s(stderr, L"ReadTotalCounter(Network Write): Error 0x%x.\n", status);
+    ret = EXIT_FAILURE;
+  }
+
+  status = ReadTotalCounter(hCounterDiskRead, diskRead);
+  if(ERROR_SUCCESS != status)
+  {
+    fwprintf_s(stderr, L"ReadTotalCounter(Disk Read): Error 0x%x.\n", status);
+    ret = EXIT_FAILURE;
+  }
+
+  status = ReadTotalCounter(hCounterDiskWrite, diskWrite);
+  if(ERROR_SUCCESS != status)
+  {
+    fwprintf_s(stderr, L"ReadTotalCounter(Disk Write): Error 0x%x.\n", status);
+    ret = EXIT_FAILURE;
+  }
+
+cleanup:
+  if (hQuery)
+  {
+    status = PdhCloseQuery(hQuery);
+  }
+
+  return ret;
+}
+
+PDH_STATUS ReadTotalCounter(PDH_HCOUNTER hCounter, LONGLONG* ret)
+{
+  PDH_STATUS status = ERROR_SUCCESS;
+  DWORD i = 0;
+  DWORD dwBufferSize = 0;
+  DWORD dwItemCount = 0;
+  PDH_RAW_COUNTER_ITEM *pItems = NULL;
+
+  // Initialize output
+  *ret = 0;
+
+  // Get the required size of the pItems buffer
+  status = PdhGetRawCounterArray(hCounter, &dwBufferSize, &dwItemCount, NULL);
+  if (PDH_MORE_DATA == status)
+  {
+    pItems = (PDH_RAW_COUNTER_ITEM *) malloc(dwBufferSize);
+    if (pItems)
+    {
+      // Actually query the counter
+      status = PdhGetRawCounterArray(hCounter, &dwBufferSize, &dwItemCount, pItems);
+      if (ERROR_SUCCESS == status) {
+        for (i = 0; i < dwItemCount; i++) {
+          if (wcscmp(L"_Total", pItems[i].szName) == 0) {
+            *ret = pItems[i].RawValue.FirstValue;
+            break;
+          } else {
+            *ret += pItems[i].RawValue.FirstValue;
+          }
+        }
+      } else {
+        *ret = -1;
+        goto cleanup;
+      }
+      // Reset structures
+      free(pItems);
+      pItems = NULL;
+      dwBufferSize = dwItemCount = 0;
+    } else {
+      *ret = -1;
+      status = PDH_NO_DATA;
+      goto cleanup;
+    }
+  } else {
+    *ret = -1;
+    goto cleanup;
+  }
+
+cleanup:
+  if (pItems) {
+    free(pItems);
+  }
+
+  return status;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2951f9f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java
index 2544e7c..5551576 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java
@@ -47,7 +47,8 @@ public class TestSysInfoWindows {
   public void parseSystemInfoString() {
     SysInfoWindowsMock tester = new SysInfoWindowsMock();
     tester.setSysinfoString(
-        "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n");
+        "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812," +
+        "1234567,2345678,3456789,4567890\r\n");
     // info str derived from windows shell command has \r\n termination
     assertEquals(17177038848L, tester.getVirtualMemorySize());
     assertEquals(8589467648L, tester.getPhysicalMemorySize());
@@ -57,6 +58,10 @@ public class TestSysInfoWindows {
     assertEquals(1, tester.getNumCores());
     assertEquals(2805000L, tester.getCpuFrequency());
     assertEquals(6261812L, tester.getCumulativeCpuTime());
+    assertEquals(1234567L, tester.getStorageBytesRead());
+    assertEquals(2345678L, tester.getStorageBytesWritten());
+    assertEquals(3456789L, tester.getNetworkBytesRead());
+    assertEquals(4567890L, tester.getNetworkBytesWritten());
     // undef on first call
     assertEquals((float)CpuTimeTracker.UNAVAILABLE,
         tester.getCpuUsagePercentage(), 0.0);
@@ -68,7 +73,8 @@ public class TestSysInfoWindows {
   public void refreshAndCpuUsage() throws InterruptedException {
     SysInfoWindowsMock tester = new SysInfoWindowsMock();
     tester.setSysinfoString(
-        "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n");
+        "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812," +
+        "1234567,2345678,3456789,4567890\r\n");
     // info str derived from windows shell command has \r\n termination
     tester.getAvailablePhysicalMemorySize();
     // verify information has been refreshed
@@ -79,7 +85,8 @@ public class TestSysInfoWindows {
         tester.getNumVCoresUsed(), 0.0);
 
     tester.setSysinfoString(
-        "17177038848,8589467648,15232745472,5400417792,1,2805000,6263012\r\n");
+        "17177038848,8589467648,15232745472,5400417792,1,2805000,6263012," +
+        "1234567,2345678,3456789,4567890\r\n");
     tester.getAvailablePhysicalMemorySize();
     // verify information has not been refreshed
     assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());
@@ -106,12 +113,14 @@ public class TestSysInfoWindows {
     // test with 12 cores
     SysInfoWindowsMock tester = new SysInfoWindowsMock();
     tester.setSysinfoString(
-        "17177038848,8589467648,15232745472,6400417792,12,2805000,6261812\r\n");
+        "17177038848,8589467648,15232745472,6400417792,12,2805000,6261812," +
+        "1234567,2345678,3456789,4567890\r\n");
     // verify information has been refreshed
     assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());
 
     tester.setSysinfoString(
-        "17177038848,8589467648,15232745472,5400417792,12,2805000,6263012\r\n");
+        "17177038848,8589467648,15232745472,5400417792,12,2805000,6263012," +
+        "1234567,2345678,3456789,4567890\r\n");
     // verify information has not been refreshed
     assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());