You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2018/01/09 18:48:15 UTC

hadoop git commit: HADOOP-15161. s3a: Stream and common statistics missing from metrics Contributed by Sean Mackrory

Repository: hadoop
Updated Branches:
  refs/heads/trunk f725b9e26 -> b62a5ece9


HADOOP-15161. s3a: Stream and common statistics missing from metrics
Contributed by Sean Mackrory


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b62a5ece
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b62a5ece
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b62a5ece

Branch: refs/heads/trunk
Commit: b62a5ece95a6b5bbb17f273debd55bcbf0c5f28c
Parents: f725b9e
Author: Steve Loughran <st...@apache.org>
Authored: Tue Jan 9 18:46:52 2018 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Jan 9 18:46:52 2018 +0000

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  7 +-
 .../hadoop/fs/s3a/S3AInstrumentation.java       | 75 ++++++--------------
 .../org/apache/hadoop/fs/s3a/Statistic.java     |  8 +++
 .../apache/hadoop/fs/s3a/ITestS3AMetrics.java   | 24 ++++++-
 4 files changed, 55 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b62a5ece/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index a8147ed..62b97d6 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -688,7 +688,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
    */
   public FSDataInputStream open(Path f, int bufferSize)
       throws IOException {
-    checkNotClosed();
+    entryPoint(INVOCATION_OPEN);
     LOG.debug("Opening '{}' for reading; input policy = {}", f, inputPolicy);
     final FileStatus fileStatus = getFileStatus(f);
     if (fileStatus.isDirectory()) {
@@ -732,7 +732,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   public FSDataOutputStream create(Path f, FsPermission permission,
       boolean overwrite, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
-    checkNotClosed();
+    entryPoint(INVOCATION_CREATE);
     final Path path = qualify(f);
     String key = pathToKey(path);
     FileStatus status = null;
@@ -799,6 +799,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
       short replication,
       long blockSize,
       Progressable progress) throws IOException {
+    entryPoint(INVOCATION_CREATE_NON_RECURSIVE);
     Path parent = path.getParent();
     if (parent != null) {
       // expect this to raise an exception if there is no parent
@@ -1683,7 +1684,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   @Retries.RetryTranslated
   public boolean delete(Path f, boolean recursive) throws IOException {
     try {
-      checkNotClosed();
+      entryPoint(INVOCATION_DELETE);
       return innerDelete(innerGetFileStatus(f, true), recursive);
     } catch (FileNotFoundException e) {
       LOG.debug("Couldn't delete {} - does not exist", f);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b62a5ece/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index d843347..b883455 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.MetricsTag;
 import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
-import org.apache.hadoop.metrics2.lib.Interns;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
@@ -129,8 +128,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
   private final MutableCounterLong numberOfFakeDirectoryDeletes;
   private final MutableCounterLong numberOfDirectoriesCreated;
   private final MutableCounterLong numberOfDirectoriesDeleted;
-  private final Map<String, MutableCounterLong> streamMetrics =
-      new HashMap<>(30);
 
   /** Instantiate this without caring whether or not S3Guard is enabled. */
   private final S3GuardInstrumentation s3GuardInstrumentation
@@ -138,6 +135,9 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
 
   private static final Statistic[] COUNTERS_TO_CREATE = {
       INVOCATION_COPY_FROM_LOCAL_FILE,
+      INVOCATION_CREATE,
+      INVOCATION_CREATE_NON_RECURSIVE,
+      INVOCATION_DELETE,
       INVOCATION_EXISTS,
       INVOCATION_GET_FILE_STATUS,
       INVOCATION_GLOB_STATUS,
@@ -147,6 +147,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
       INVOCATION_LIST_LOCATED_STATUS,
       INVOCATION_LIST_STATUS,
       INVOCATION_MKDIRS,
+      INVOCATION_OPEN,
       INVOCATION_RENAME,
       OBJECT_COPY_REQUESTS,
       OBJECT_DELETE_REQUESTS,
@@ -196,27 +197,27 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
         "A unique identifier for the instance",
         fileSystemInstanceId.toString());
     registry.tag(METRIC_TAG_BUCKET, "Hostname from the FS URL", name.getHost());
-    streamOpenOperations = streamCounter(STREAM_OPENED);
-    streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS);
-    streamClosed = streamCounter(STREAM_CLOSED);
-    streamAborted = streamCounter(STREAM_ABORTED);
-    streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS);
-    streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS);
+    streamOpenOperations = counter(STREAM_OPENED);
+    streamCloseOperations = counter(STREAM_CLOSE_OPERATIONS);
+    streamClosed = counter(STREAM_CLOSED);
+    streamAborted = counter(STREAM_ABORTED);
+    streamSeekOperations = counter(STREAM_SEEK_OPERATIONS);
+    streamReadExceptions = counter(STREAM_READ_EXCEPTIONS);
     streamForwardSeekOperations =
-        streamCounter(STREAM_FORWARD_SEEK_OPERATIONS);
+        counter(STREAM_FORWARD_SEEK_OPERATIONS);
     streamBackwardSeekOperations =
-        streamCounter(STREAM_BACKWARD_SEEK_OPERATIONS);
-    streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED);
+        counter(STREAM_BACKWARD_SEEK_OPERATIONS);
+    streamBytesSkippedOnSeek = counter(STREAM_SEEK_BYTES_SKIPPED);
     streamBytesBackwardsOnSeek =
-        streamCounter(STREAM_SEEK_BYTES_BACKWARDS);
-    streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ);
-    streamReadOperations = streamCounter(STREAM_READ_OPERATIONS);
+        counter(STREAM_SEEK_BYTES_BACKWARDS);
+    streamBytesRead = counter(STREAM_SEEK_BYTES_READ);
+    streamReadOperations = counter(STREAM_READ_OPERATIONS);
     streamReadFullyOperations =
-        streamCounter(STREAM_READ_FULLY_OPERATIONS);
+        counter(STREAM_READ_FULLY_OPERATIONS);
     streamReadsIncomplete =
-        streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE);
-    streamBytesReadInClose = streamCounter(STREAM_CLOSE_BYTES_READ);
-    streamBytesDiscardedInAbort = streamCounter(STREAM_ABORT_BYTES_DISCARDED);
+        counter(STREAM_READ_OPERATIONS_INCOMPLETE);
+    streamBytesReadInClose = counter(STREAM_CLOSE_BYTES_READ);
+    streamBytesDiscardedInAbort = counter(STREAM_ABORT_BYTES_DISCARDED);
     numberOfFilesCreated = counter(FILES_CREATED);
     numberOfFilesCopied = counter(FILES_COPIED);
     bytesOfFilesCopied = counter(FILES_COPIED_BYTES);
@@ -283,20 +284,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
   }
 
   /**
-   * Create a counter in the stream map: these are unregistered in the public
-   * metrics.
-   * @param name counter name
-   * @param desc counter description
-   * @return a new counter
-   */
-  protected final MutableCounterLong streamCounter(String name, String desc) {
-    MutableCounterLong counter = new MutableCounterLong(
-        Interns.info(name, desc), 0L);
-    streamMetrics.put(name, counter);
-    return counter;
-  }
-
-  /**
    * Create a counter in the registry.
    * @param op statistic to count
    * @return a new counter
@@ -306,16 +293,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
   }
 
   /**
-   * Create a counter in the stream map: these are unregistered in the public
-   * metrics.
-   * @param op statistic to count
-   * @return a new counter
-   */
-  protected final MutableCounterLong streamCounter(Statistic op) {
-    return streamCounter(op.getSymbol(), op.getDescription());
-  }
-
-  /**
    * Create a gauge in the registry.
    * @param name name gauge name
    * @param desc description
@@ -365,11 +342,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
         prefix,
         separator, suffix);
     registry.snapshot(metricBuilder, all);
-    for (Map.Entry<String, MutableCounterLong> entry:
-        streamMetrics.entrySet()) {
-      metricBuilder.tuple(entry.getKey(),
-          Long.toString(entry.getValue().value()));
-    }
     return metricBuilder.toString();
   }
 
@@ -447,9 +419,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
    */
   public MutableMetric lookupMetric(String name) {
     MutableMetric metric = getRegistry().get(name);
-    if (metric == null) {
-      metric = streamMetrics.get(name);
-    }
     return metric;
   }
 
@@ -1141,10 +1110,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
   public Map<String, Long> toMap() {
     MetricsToMap metricBuilder = new MetricsToMap(null);
     registry.snapshot(metricBuilder, true);
-    for (Map.Entry<String, MutableCounterLong> entry :
-        streamMetrics.entrySet()) {
-      metricBuilder.tuple(entry.getKey(), entry.getValue().value());
-    }
     return metricBuilder.getMap();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b62a5ece/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 871d7c5..bb30f1f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -49,6 +49,12 @@ public enum Statistic {
   IGNORED_ERRORS("ignored_errors", "Errors caught and ignored"),
   INVOCATION_COPY_FROM_LOCAL_FILE(CommonStatisticNames.OP_COPY_FROM_LOCAL_FILE,
       "Calls of copyFromLocalFile()"),
+  INVOCATION_CREATE(CommonStatisticNames.OP_CREATE,
+      "Calls of create()"),
+  INVOCATION_CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE,
+      "Calls of createNonRecursive()"),
+  INVOCATION_DELETE(CommonStatisticNames.OP_DELETE,
+      "Calls of delete()"),
   INVOCATION_EXISTS(CommonStatisticNames.OP_EXISTS,
       "Calls of exists()"),
   INVOCATION_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS,
@@ -67,6 +73,8 @@ public enum Statistic {
       "Calls of listStatus()"),
   INVOCATION_MKDIRS(CommonStatisticNames.OP_MKDIRS,
       "Calls of mkdirs()"),
+  INVOCATION_OPEN(CommonStatisticNames.OP_OPEN,
+      "Calls of open()"),
   INVOCATION_RENAME(CommonStatisticNames.OP_RENAME,
       "Calls of rename()"),
   OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b62a5ece/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
index 182990c..e92ce78 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.InputStream;
 
 /**
  * Test s3a performance metrics register and output.
@@ -34,7 +35,7 @@ public class ITestS3AMetrics extends AbstractS3ATestBase {
   public void testMetricsRegister()
       throws IOException, InterruptedException {
     S3AFileSystem fs = getFileSystem();
-    Path dest = new Path("newfile1");
+    Path dest = path("testMetricsRegister");
     ContractTestUtils.touch(fs, dest);
 
     String targetMetricSource = "S3AMetrics1" + "-" + fs.getBucket();
@@ -48,4 +49,25 @@ public class ITestS3AMetrics extends AbstractS3ATestBase {
     assertEquals("Metrics system should report single file created event",
         1, fileCreated.value());
   }
+
+  @Test
+  public void testStreamStatistics() throws IOException {
+    S3AFileSystem fs = getFileSystem();
+    Path file = path("testStreamStatistics");
+    byte[] data = "abcdefghijklmnopqrstuvwxyz".getBytes();
+    ContractTestUtils.createFile(fs, file, false, data);
+
+    try (InputStream inputStream = fs.open(file)) {
+      while (inputStream.read(data) != -1) {
+        LOG.debug("Read batch of data from input stream...");
+      }
+    }
+
+    MutableCounterLong read = (MutableCounterLong)
+        fs.getInstrumentation().getRegistry()
+        .get(Statistic.STREAM_SEEK_BYTES_READ.getSymbol());
+    assertEquals("Stream statistics were not merged", 26, read.value());
+  }
+
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org