You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2020/08/18 00:28:39 UTC

[GitHub] [hadoop] jimmy-zuber-amzn commented on a change in pull request #2069: HADOOP-16830. IOStatistics API.

jimmy-zuber-amzn commented on a change in pull request #2069:
URL: https://github.com/apache/hadoop/pull/2069#discussion_r471833875



##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
##########
@@ -134,4 +137,23 @@ public void readFully(long position, byte[] buffer)
     throws IOException {
     readFully(position, buffer, 0, buffer.length);
   }
+
+  /**
+   * toString method returns the superclass toString, but if the subclass
+   * implements {@link IOStatisticsSource} then those statistics are
+   * extracted and included in the output.
+   * That is: statistics of subclasses are automatically reported.
+   * @return a string value.
+   */
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(super.toString());
+    sb.append('{');
+    if (this instanceof IOStatisticsSource) {
+      sb.append(IOStatisticsLogging.ioStatisticsSourceToString(
+          (IOStatisticsSource) this));
+    }
+    sb.append('}');
+    return sb.toString();
+  }

Review comment:
       It looks like this IOStatistics API primarily displays information through toString() messages in log statements. Is this the best way to integrate with Hadoop - or is there value to adding an explicit (possibly type safe) interface for objects to display their IOStatistics information?
   
   Also - does it make sense here to move the calls to `sb#append` inside of the if statement? That way, non instances will not have their string representation changed.

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A map of functions which can be invoked to dynamically
+ * create the value of an entry.
+ * @param <E> type of entry value.
+ */
+final class EvaluatingStatisticsMap<E extends Serializable> implements
+    Map<String, E> {
+
+  /**
+   * Functions to invoke when evaluating keys.
+   */
+  private final Map<String, Function<String, E>> evaluators
+      = new TreeMap<>();
+
+  /**
+   * Function to use when copying map values.
+   */
+  private final Function<E, E> copyFn;
+
+  /**
+   * Name for use in getter/error messages.
+   */
+  private final String name;
+
+  EvaluatingStatisticsMap(final String name) {
+    this(name, IOStatisticsBinding::passthroughFn);
+  }
+
+  EvaluatingStatisticsMap(final String name,
+      final Function<E, E> copyFn) {
+    this.name = name;
+    this.copyFn = copyFn;
+  }
+
+  /**
+   * add a mapping of a key to a function.
+   * @param key the key
+   * @param eval the evaluator
+   */
+  void addFunction(String key, Function<String, E> eval) {
+    evaluators.put(key, eval);
+  }

Review comment:
       Since `evaluators` is a `TreeMap`, concurrent calls to this method may result in incorrect results due to `TreeMap` not being threadsafe. Additionally, calls to `#keySet`, `#entries`, and `#snapshot` may see `ConcurrentModificationException`s. Given that there's no ordering requirement in the representation of this map, could a `ConcurrentHashMap` be used instead?

##########
File path: hadoop-common-project/hadoop-common/src/site/markdown/filesystem/iostatistics.md
##########
@@ -0,0 +1,432 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# Statistic collection with the IOStatistics API
+
+```java
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+```
+
+The `IOStatistics` API is intended to provide statistics on individual IO
+classes -such as input and output streams, *in a standard way which 
+applications can query*
+
+Many filesystem-related classes have implemented statistics gathering
+and provided private/unstable ways to query this, but as they were
+not common across implementations it was unsafe for applications
+to reference these values. Example: `S3AInputStream` and its statistics
+API. This is used in internal tests, but cannot be used downstream in
+applications such as Apache Hive or Apache HBase.
+
+The IOStatistics API is intended to 
+
+1. Be instance specific:, rather than shared across multiple instances
+   of a class, or thread local.
+1. Be public and stable enough to be used by applications.
+1. Be easy to use in applications written in Java, Scala, and, via libhdfs, C/C++
+1. Have foundational interfaces and classes in the `hadoop-common` JAR.
+
+## Core Model
+
+Any class *may* implement `IOStatisticsSource` in order to
+provide statistics.
+
+Wrapper I/O Classes such as `FSDataInputStream` anc `FSDataOutputStream` *should*
+implement the interface and forward it to the wrapped class, if they also
+implement it -and return `null` if they do not.
+
+`IOStatisticsSource` implementations `getIOStatistics()` return an
+instance of `IOStatistics` enumerating the statistics of that specific
+instance.
+
+The `IOStatistics` Interface exports five kinds of statistic:
+
+
+| Category | Type | Description |
+|------|------|-------------|
+| `counter`        | `long`          | a counter which may increase in value; SHOULD BE >= 0 |
+| `gauge`          | `long`          | an arbitrary value which can down as well as up; SHOULD BE >= 0|
+| `minimum`        | `long`          | an minimum value; MAY BE negative |
+| `maximum`        | `long`          | a maximum value;  MAY BE negative |
+| `meanStatistic` | `MeanStatistic` | an arithmetic mean and sample size; mean MAY BE negative|
+
+Four are simple `long` values, with the variations how they are likely to
+change and how they are aggregated.
+
+
+#### Aggregation of Statistic Values
+
+For the different statistic category, the result of `aggregate(x, y)` is
+
+| Category         | Aggregation |
+|------------------|-------------|
+| `counter`        | `min(0, x) + min(0, y)`     |

Review comment:
       don't you mean `max(0, x)`?

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.MeanStatistic;
+
+import static org.apache.hadoop.fs.statistics.IOStatistics.MIN_UNSET_VALUE;
+
+/**
+ * Support for implementing IOStatistics interfaces.
+ */
+public final class IOStatisticsBinding {
+
+  /** Pattern used for each entry. */
+  public static final String ENTRY_PATTERN = "(%s=%s)";
+
+  /** String to return when a source is null. */
+  @VisibleForTesting
+  public static final String NULL_SOURCE = "()";
+
+  private IOStatisticsBinding() {
+  }
+
+  /**
+   * Create  IOStatistics from a storage statistics instance.
+   * <p></p>
+   * This will be updated as the storage statistics change.
+   * @param storageStatistics source data.
+   * @return an IO statistics source.
+   */
+  public static IOStatistics fromStorageStatistics(
+      StorageStatistics storageStatistics) {
+    DynamicIOStatisticsBuilder builder = dynamicIOStatistics();
+    Iterator<StorageStatistics.LongStatistic> it = storageStatistics
+        .getLongStatistics();
+    while (it.hasNext()) {
+      StorageStatistics.LongStatistic next = it.next();
+      builder.withLongFunctionCounter(next.getName(),
+          k -> storageStatistics.getLong(k));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Create a builder for dynamic IO Statistics.
+   * @return a builder to be completed.
+   */
+  public static DynamicIOStatisticsBuilder dynamicIOStatistics() {
+    return new DynamicIOStatisticsBuilder();
+  }
+
+  /**
+   * Get the shared instance of the immutable empty statistics
+   * object.
+   * @return an empty statistics object.
+   */
+  public static IOStatistics emptyStatistics() {
+    return EmptyIOStatistics.getInstance();
+  }
+
+  /**
+   * Take an IOStatistics instance and wrap it in a source.
+   * @param statistics statistics.
+   * @return a source which will return the values
+   */
+  public static IOStatisticsSource wrap(IOStatistics statistics) {
+    return new SourceWrappedStatistics(statistics);
+  }
+
+  /**
+   * Create a builder for an {@link IOStatisticsStore}.
+   *
+   * @return a builder instance.
+   */
+  public static IOStatisticsStoreBuilder iostatisticsStore() {
+    return new IOStatisticsStoreBuilderImpl();
+  }
+
+  /**
+   * Convert an entry to the string format used in logging.
+   *
+   * @param entry entry to evaluate
+   * @param <E> entry type
+   * @return formatted string
+   */
+  public static <E> String entryToString(
+      final Map.Entry<String, E> entry) {
+    return entryToString(entry.getKey(), entry.getValue());
+  }
+
+  /**
+   * Convert entry values to the string format used in logging.
+   *
+   * @param name statistic name
+   * @param value stat value
+   * @return formatted string
+   */
+  public static <E> String entryToString(
+      final String name, final E value) {
+    return String.format(
+        ENTRY_PATTERN,
+        name,
+        value);
+  }
+
+  /**
+   * Copy into the dest map all the source entries.
+   * The destination is cleared first.
+   * @param <E> entry type
+   * @param dest destination of the copy
+   * @param source source
+   * @param copyFn function to copy entries
+   * @return the destination.
+   */
+  private static <E> Map<String, E> copyMap(
+      Map<String, E> dest,
+      Map<String, E> source,
+      Function<E, E> copyFn) {
+    // we have to clone the values so that they aren't
+    // bound to the original values
+    dest.clear();
+    source.entrySet()
+        .forEach(entry ->
+            dest.put(entry.getKey(), copyFn.apply(entry.getValue())));
+    return dest;
+  }
+
+  /**
+   * A passthrough copy operation suitable for immutable
+   * types, including numbers.
+   * @param src source object
+   * @return the source object
+   */
+  public static <E extends Serializable> E passthroughFn(E src) {
+    return src;
+  }
+
+  /**
+   * Take a snapshot of a supplied map, where the copy option simply
+   * uses the existing value.
+   * <p></p>
+   * For this to be safe, the map must refer to immutable objects.
+   * @param source source map
+   * @param <E> type of values.
+   * @return a new map referencing the same values.
+   */
+  public static <E extends Serializable> TreeMap<String, E> snapshotMap(
+      Map<String, E> source) {
+    return snapshotMap(source,
+        IOStatisticsBinding::passthroughFn);
+  }
+
+  /**
+   * Take a snapshot of a supplied map, using the copy function
+   * to replicate the source values.
+   * @param source source map
+   * @param copyFn function to copy the value
+   * @param <E> type of values.
+   * @return a new map referencing the same values.
+   */
+  public static <E extends Serializable> TreeMap<String, E> snapshotMap(
+      Map<String, E> source,
+      Function<E, E> copyFn) {
+    TreeMap<String, E> dest = new TreeMap<>();
+    copyMap(dest, source, copyFn);
+    return dest;
+  }
+
+  /**
+   * Aggregate two maps so that the destination.
+   * @param <E> type of values
+   * @param dest destination map.
+   * @param other other map
+   * @param aggregateFn function to aggregate the values.
+   * @param copyFn function to copy the value
+   */
+  public static <E> void aggregateMaps(
+      Map<String, E> dest,
+      Map<String, E> other,
+      BiFunction<E, E, E> aggregateFn,
+      final Function<E, E> copyFn) {
+    // scan through the other hand map; copy
+    // any values not in the left map,
+    // aggregate those for which there is already
+    // an entry
+    other.entrySet().forEach(entry -> {
+      String key = entry.getKey();
+      E rVal = entry.getValue();
+      E lVal = dest.get(key);
+      if (lVal == null) {
+        dest.put(key, copyFn.apply(rVal));
+      } else {
+        dest.put(key, aggregateFn.apply(lVal, rVal));
+      }
+    });
+  }
+
+  public static Long aggregateCounters(Long l, Long r) {

Review comment:
       are any of these `Long`s nullable? Looks like they explode whenever you try to do math on null Longs:
   
   https://repl.it/repls/LongModernCase#Main.java

##########
File path: hadoop-common-project/hadoop-common/src/site/markdown/filesystem/iostatistics.md
##########
@@ -0,0 +1,432 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# Statistic collection with the IOStatistics API
+
+```java
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+```
+
+The `IOStatistics` API is intended to provide statistics on individual IO
+classes -such as input and output streams, *in a standard way which 
+applications can query*
+
+Many filesystem-related classes have implemented statistics gathering
+and provided private/unstable ways to query this, but as they were
+not common across implementations it was unsafe for applications
+to reference these values. Example: `S3AInputStream` and its statistics
+API. This is used in internal tests, but cannot be used downstream in
+applications such as Apache Hive or Apache HBase.
+
+The IOStatistics API is intended to 
+
+1. Be instance specific:, rather than shared across multiple instances
+   of a class, or thread local.
+1. Be public and stable enough to be used by applications.
+1. Be easy to use in applications written in Java, Scala, and, via libhdfs, C/C++
+1. Have foundational interfaces and classes in the `hadoop-common` JAR.
+
+## Core Model
+
+Any class *may* implement `IOStatisticsSource` in order to
+provide statistics.
+
+Wrapper I/O Classes such as `FSDataInputStream` anc `FSDataOutputStream` *should*
+implement the interface and forward it to the wrapped class, if they also
+implement it -and return `null` if they do not.
+
+`IOStatisticsSource` implementations `getIOStatistics()` return an
+instance of `IOStatistics` enumerating the statistics of that specific
+instance.
+
+The `IOStatistics` Interface exports five kinds of statistic:
+
+
+| Category | Type | Description |
+|------|------|-------------|
+| `counter`        | `long`          | a counter which may increase in value; SHOULD BE >= 0 |
+| `gauge`          | `long`          | an arbitrary value which can down as well as up; SHOULD BE >= 0|
+| `minimum`        | `long`          | an minimum value; MAY BE negative |
+| `maximum`        | `long`          | a maximum value;  MAY BE negative |
+| `meanStatistic` | `MeanStatistic` | an arithmetic mean and sample size; mean MAY BE negative|
+
+Four are simple `long` values, with the variations how they are likely to
+change and how they are aggregated.
+
+
+#### Aggregation of Statistic Values
+
+For the different statistic category, the result of `aggregate(x, y)` is
+
+| Category         | Aggregation |
+|------------------|-------------|
+| `counter`        | `min(0, x) + min(0, y)`     |
+| `gauge`          | `min(0, x) + min(0, y)` |

Review comment:
       This doesn't match the aggregation I saw above.

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A map of functions which can be invoked to dynamically
+ * create the value of an entry.
+ * @param <E> type of entry value.
+ */
+final class EvaluatingStatisticsMap<E extends Serializable> implements

Review comment:
       Does this class have thread safety requirements / assumptions?

##########
File path: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDynamicIOStatistics.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.statistics.impl.SourceWrappedStatistics;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertCounterStatisticIsTracked;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertCounterStatisticIsUntracked;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyCounterStatisticValue;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatisticsSource;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.ENTRY_PATTERN;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.NULL_SOURCE;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.dynamicIOStatistics;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatistics;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * verify dynamic statistics are dynamic, except when you iterate through
+ * them, along with other tests of the class's behavior.
+ */
+public class TestDynamicIOStatistics extends AbstractHadoopTestBase {

Review comment:
       Worth testing some of the thread safety assertions here?

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
##########
@@ -134,4 +137,23 @@ public void readFully(long position, byte[] buffer)
     throws IOException {
     readFully(position, buffer, 0, buffer.length);
   }
+
+  /**
+   * toString method returns the superclass toString, but if the subclass
+   * implements {@link IOStatisticsSource} then those statistics are
+   * extracted and included in the output.
+   * That is: statistics of subclasses are automatically reported.
+   * @return a string value.
+   */
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(super.toString());
+    sb.append('{');
+    if (this instanceof IOStatisticsSource) {
+      sb.append(IOStatisticsLogging.ioStatisticsSourceToString(
+          (IOStatisticsSource) this));
+    }
+    sb.append('}');
+    return sb.toString();
+  }

Review comment:
       Ah nevermind I see the IOStatisticsSnapshot class is serializable and allows interprocess communication of this dataformat / an API to use.

##########
File path: hadoop-common-project/hadoop-common/src/site/markdown/filesystem/iostatistics.md
##########
@@ -0,0 +1,432 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# Statistic collection with the IOStatistics API
+
+```java
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+```
+
+The `IOStatistics` API is intended to provide statistics on individual IO
+classes -such as input and output streams, *in a standard way which 
+applications can query*
+
+Many filesystem-related classes have implemented statistics gathering
+and provided private/unstable ways to query this, but as they were
+not common across implementations it was unsafe for applications
+to reference these values. Example: `S3AInputStream` and its statistics
+API. This is used in internal tests, but cannot be used downstream in
+applications such as Apache Hive or Apache HBase.
+
+The IOStatistics API is intended to 
+
+1. Be instance specific:, rather than shared across multiple instances
+   of a class, or thread local.
+1. Be public and stable enough to be used by applications.
+1. Be easy to use in applications written in Java, Scala, and, via libhdfs, C/C++
+1. Have foundational interfaces and classes in the `hadoop-common` JAR.
+
+## Core Model
+
+Any class *may* implement `IOStatisticsSource` in order to
+provide statistics.
+
+Wrapper I/O Classes such as `FSDataInputStream` anc `FSDataOutputStream` *should*
+implement the interface and forward it to the wrapped class, if they also
+implement it -and return `null` if they do not.
+
+`IOStatisticsSource` implementations `getIOStatistics()` return an
+instance of `IOStatistics` enumerating the statistics of that specific
+instance.
+
+The `IOStatistics` Interface exports five kinds of statistic:
+
+
+| Category | Type | Description |
+|------|------|-------------|
+| `counter`        | `long`          | a counter which may increase in value; SHOULD BE >= 0 |
+| `gauge`          | `long`          | an arbitrary value which can down as well as up; SHOULD BE >= 0|
+| `minimum`        | `long`          | an minimum value; MAY BE negative |
+| `maximum`        | `long`          | a maximum value;  MAY BE negative |
+| `meanStatistic` | `MeanStatistic` | an arithmetic mean and sample size; mean MAY BE negative|
+
+Four are simple `long` values, with the variations how they are likely to
+change and how they are aggregated.
+
+
+#### Aggregation of Statistic Values
+
+For the different statistic category, the result of `aggregate(x, y)` is
+
+| Category         | Aggregation |
+|------------------|-------------|
+| `counter`        | `min(0, x) + min(0, y)`     |
+| `gauge`          | `min(0, x) + min(0, y)` |
+| `minimum`        | `min(x, y)` |
+| `maximum`        | `max(x, y)` |
+| `meanStatistic` | calculation of the mean of `x` and `y` ) |
+
+
+#### Class `MeanStatistic`
+
+## package `org.apache.hadoop.fs.statistics`
+
+This package contains the public statistics APIs intended
+for use by applications.
+
+<!--  ============================================================= -->
+<!--  Class: MeanStatistic -->
+<!--  ============================================================= -->
+
+`MeanStatistic` is a tuple of `(mean, samples)` to support aggregation.
+
+A `MeanStatistic`  with a sample of `0` is considered an empty statistic.
+
+All `MeanStatistic` instances where `sample = 0` are considered equal,
+irrespective of the `mean` value.
+
+Algorithm to calculate the mean :
+
+```python
+if x.samples = 0:
+    y
+else if y.samples = 0 :
+    x
+else:
+    samples' = x.samples + y.samples
+    mean' = (x.mean * x.samples) + (y.mean * y.samples) / samples'
+    (samples', mean')
+```
+
+Implicitly, this means that if both samples are empty, then the aggregate value is also empty.
+
+```java
+public final class MeanStatistic implements Serializable, Cloneable {
+  /**
+   * Arithmetic mean.
+   */
+  private double mean;
+
+  /**
+   * Number of samples used to calculate
+   * the mean.
+   */
+  private long samples;
+
+  /**
+   * Get the mean value.
+   * @return the mean
+   */
+  public double getMean() {
+    return mean;
+  }
+
+  /**
+   * Get the sample count.
+   * @return the sample count; 0 means empty
+   */
+  public long getSamples() {
+    return samples;
+  }
+
+  /**
+   * Is a statistic empty?
+   * @return true if the sample count is 0
+   */
+  public boolean isEmpty() {
+    return samples == 0;
+  }
+   /**
+   * Add another mean statistic to create a new statistic.
+   * When adding two statistics, if either is empty then
+   * a copy of the non-empty statistic is returned.
+   * If both are empty then a new empty statistic is returned.
+   *
+   * @param other other value
+   * @return the aggregate mean
+   */
+  public MeanStatistic add(final MeanStatistic other) {
+    /* Implementation elided. */
+  }
+  @Override
+  public int hashCode() {
+    return Objects.hash(mean, samples);
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) { return true; }
+    if (o == null || getClass() != o.getClass()) { return false; }
+    MeanStatistic that = (MeanStatistic) o;
+    if (this.isEmpty()) {
+      return that.isEmpty();
+    }
+    return Double.compare(that.mean, mean) == 0 &&
+        samples == that.samples;
+  }
+
+  @Override
+  public MeanStatistic clone() {
+    return new MeanStatistic(this);
+  }
+
+  public MeanStatistic copy() {
+    return new MeanStatistic(this);
+  }
+
+}
+```
+
+<!--  ============================================================= -->
+<!--  Interface: IOStatisticsSource -->
+<!--  ============================================================= -->
+
+### class `org.apache.hadoop.fs.statistics.IOStatisticsSource`
+
+```java
+
+/**
+ * A source of IO statistics.
+ * These statistics MUST be instance specific, not thread local.
+ */
+@InterfaceStability.Unstable
+public interface IOStatisticsSource {
+
+  /**
+   * Return a statistics instance.
+   * It is not a requirement that the same instance is returned every time.
+   * {@link IOStatisticsSource}.
+   * If the object implementing this is Closeable, this method
+   * may return null if invoked on a closed object, even if
+   * it returns a valid instance when called earlier.
+   * @return an IOStatistics instance or null
+   */
+  IOStatistics getIOStatistics();
+}
+```
+
+This is the interface which an object instance MUST implement if they are a source of
+IOStatistics information.
+
+#### Invariants
+
+The result of `getIOStatistics()` must be one of
+
+* `null`
+* an immutable `IOStatistics` for which each map of entries is
+an empty map.
+* an instance of an `IOStatistics` whose statistics MUST BE unique to that
+instance of the class implementing `IOStatisticsSource`.
+
+Less formally: if the statistics maps returned are non-empty, all the statistics
+must be collected from the current instance, and not from any other instances, the way
+some of the `FileSystem` statistics are collected.
+
+
+The result of `getIOStatistics()`, if non-null, MAY be a different instance
+on every invocation.
+
+
+<!--  ============================================================= -->
+<!--  Interface: IOStatistics -->
+<!--  ============================================================= -->
+
+### class `org.apache.hadoop.fs.statistics.IOStatistics`
+
+These are per-instance statistics provided by an object which
+implements `IOStatisticsSource`.
+
+```java
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface IOStatistics {
+
+  /**
+   * Map of counters.
+   * @return the current map of counters.
+   */
+  Map<String, Long> counters();
+
+  /**
+   * Map of gauges.
+   * @return the current map of gauges.
+   */
+  Map<String, Long> gauges();
+
+  /**
+   * Map of minumums.
+   * @return the current map of minumums.
+   */
+  Map<String, Long> minumums();
+
+  /**
+   * Map of maximums.
+   * @return the current map of maximums.
+   */
+  Map<String, Long> maximums();
+
+  /**
+   * Map of meanStatistics.
+   * @return the current map of MeanStatistic statistics.
+   */
+  Map<String, MeanStatistic> meanStatistics();
+
+}
+```
+
+### Statistic Naming
+
+The naming policy of statistics is designed to be readable, shareable
+and ideally consistent across `IOStatisticSource` implementations.
+
+* Characters in key names MUST match the regular expression
+  `[a-z|0-9|_]` with the exception of the first character, which
+  MUST be in the range `[a-z]`. Thus the full regular expression
+  for a valid statistic name is:
+
+        [a-z][a-z|0-9|_]+
+
+* Where possible, the names of statistics SHOULD be those defined
+  with common names.
+
+        org.apache.hadoop.fs.statistics.StreamStatisticNames
+        org.apache.hadoop.fs.statistics.StoreStatisticNames
+
+   Note 1.: these are evolving; for clients to safely reference their
+   statistics by name they SHOULD be copied to the application.
+   (i.e. for an application compiled hadoop 3.4.2 to link against hadoop 3.4.1,
+   copy the strings).
+ 
+   Note 2: keys defined in these classes SHALL NOT be removed
+   from subsequent Hadoop releases.
+
+* A common statistic name MUST NOT be used to report any other statistic and
+  MUST use the pre-defined unit of measurement.
+
+* A statistic name in one of the maps SHOULD NOT be re-used in another map.
+  This aids diagnostics of logged statistics.
+
+### Statistic Maps
+
+For each map of statistics returned:
+
+* The operations to add/remove entries are unsupported: the map returned
+  It MAY be mutable by the source of statistics.
+
+* The map MAY be emoty.
+
+* The map keys each represent a measured statistic.
+
+* The set of keys in a map SHOULD remain unchanged, and MUST NOT remove keys.
+
+* The statistics SHOULD be dynamic: every lookup of an entry SHOULD
+  return the latest value.
+
+* The values MAY change across invocations of `Map.values()` and `Map.entries()`
+
+* The update MAY be in the `iterable()` calls of the iterators returned,
+  or MAY be in the actual `iterable.next()` operation. That is: there is
+  no guarantee as to when the evaluation takes place.
+
+* The returned `Map.Entry` instances MUST return the same value on
+ repeated `getValue()` calls.
+
+* Queries of statistics SHOULD Be fast and non-blocking to the extent
+ that if invoked during a long operation, they will prioritize
+ returning fast over most timely values.
+
+* The statistics MAY lag; especially for statistics collected in separate
+ operations (e.g stream IO statistics as provided by a filesystem
+ instance).
+
+* Statistics which represent time SHOULD use milliseconds as their unit.
+
+* Statistics which represent time and use a different unit MUST document
+  the unit used.
+
+### Thread Model
+
+1. An instance of `IOStatistics` can be shared across threads;
+
+1. Read access to the supplied statistics maps must be thread safe.

Review comment:
       Does the current version support this? I see we're using regular (TreeMap or HashMap) map impls that may break if you try to iterate over them while someone else is writing.

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.MeanStatistic;
+
+import static org.apache.hadoop.fs.statistics.IOStatistics.MIN_UNSET_VALUE;
+
+/**
+ * Support for implementing IOStatistics interfaces.
+ */
+public final class IOStatisticsBinding {
+
+  /** Pattern used for each entry. */
+  public static final String ENTRY_PATTERN = "(%s=%s)";
+
+  /** String to return when a source is null. */
+  @VisibleForTesting
+  public static final String NULL_SOURCE = "()";
+
+  private IOStatisticsBinding() {
+  }
+
+  /**
+   * Create  IOStatistics from a storage statistics instance.
+   * <p></p>
+   * This will be updated as the storage statistics change.
+   * @param storageStatistics source data.
+   * @return an IO statistics source.
+   */
+  public static IOStatistics fromStorageStatistics(
+      StorageStatistics storageStatistics) {
+    DynamicIOStatisticsBuilder builder = dynamicIOStatistics();
+    Iterator<StorageStatistics.LongStatistic> it = storageStatistics
+        .getLongStatistics();
+    while (it.hasNext()) {
+      StorageStatistics.LongStatistic next = it.next();
+      builder.withLongFunctionCounter(next.getName(),
+          k -> storageStatistics.getLong(k));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Create a builder for dynamic IO Statistics.
+   * @return a builder to be completed.
+   */
+  public static DynamicIOStatisticsBuilder dynamicIOStatistics() {
+    return new DynamicIOStatisticsBuilder();
+  }
+
+  /**
+   * Get the shared instance of the immutable empty statistics
+   * object.
+   * @return an empty statistics object.
+   */
+  public static IOStatistics emptyStatistics() {
+    return EmptyIOStatistics.getInstance();
+  }
+
+  /**
+   * Take an IOStatistics instance and wrap it in a source.
+   * @param statistics statistics.
+   * @return a source which will return the values
+   */
+  public static IOStatisticsSource wrap(IOStatistics statistics) {
+    return new SourceWrappedStatistics(statistics);
+  }
+
+  /**
+   * Create a builder for an {@link IOStatisticsStore}.
+   *
+   * @return a builder instance.
+   */
+  public static IOStatisticsStoreBuilder iostatisticsStore() {
+    return new IOStatisticsStoreBuilderImpl();
+  }
+
+  /**
+   * Convert an entry to the string format used in logging.
+   *
+   * @param entry entry to evaluate
+   * @param <E> entry type
+   * @return formatted string
+   */
+  public static <E> String entryToString(
+      final Map.Entry<String, E> entry) {
+    return entryToString(entry.getKey(), entry.getValue());
+  }
+
+  /**
+   * Convert entry values to the string format used in logging.
+   *
+   * @param name statistic name
+   * @param value stat value
+   * @return formatted string
+   */
+  public static <E> String entryToString(
+      final String name, final E value) {
+    return String.format(
+        ENTRY_PATTERN,
+        name,
+        value);
+  }
+
+  /**
+   * Copy into the dest map all the source entries.
+   * The destination is cleared first.
+   * @param <E> entry type
+   * @param dest destination of the copy
+   * @param source source
+   * @param copyFn function to copy entries
+   * @return the destination.
+   */
+  private static <E> Map<String, E> copyMap(
+      Map<String, E> dest,
+      Map<String, E> source,
+      Function<E, E> copyFn) {
+    // we have to clone the values so that they aren't
+    // bound to the original values
+    dest.clear();
+    source.entrySet()
+        .forEach(entry ->
+            dest.put(entry.getKey(), copyFn.apply(entry.getValue())));
+    return dest;
+  }
+
+  /**
+   * A passthrough copy operation suitable for immutable
+   * types, including numbers.
+   * @param src source object
+   * @return the source object
+   */
+  public static <E extends Serializable> E passthroughFn(E src) {
+    return src;
+  }
+
+  /**
+   * Take a snapshot of a supplied map, where the copy option simply
+   * uses the existing value.
+   * <p></p>
+   * For this to be safe, the map must refer to immutable objects.
+   * @param source source map
+   * @param <E> type of values.
+   * @return a new map referencing the same values.
+   */
+  public static <E extends Serializable> TreeMap<String, E> snapshotMap(
+      Map<String, E> source) {
+    return snapshotMap(source,
+        IOStatisticsBinding::passthroughFn);
+  }
+
+  /**
+   * Take a snapshot of a supplied map, using the copy function
+   * to replicate the source values.
+   * @param source source map
+   * @param copyFn function to copy the value
+   * @param <E> type of values.
+   * @return a new map referencing the same values.
+   */
+  public static <E extends Serializable> TreeMap<String, E> snapshotMap(
+      Map<String, E> source,
+      Function<E, E> copyFn) {
+    TreeMap<String, E> dest = new TreeMap<>();
+    copyMap(dest, source, copyFn);
+    return dest;
+  }
+
+  /**
+   * Aggregate two maps so that the destination.
+   * @param <E> type of values
+   * @param dest destination map.
+   * @param other other map
+   * @param aggregateFn function to aggregate the values.
+   * @param copyFn function to copy the value
+   */
+  public static <E> void aggregateMaps(
+      Map<String, E> dest,
+      Map<String, E> other,
+      BiFunction<E, E, E> aggregateFn,
+      final Function<E, E> copyFn) {
+    // scan through the other hand map; copy
+    // any values not in the left map,
+    // aggregate those for which there is already
+    // an entry
+    other.entrySet().forEach(entry -> {
+      String key = entry.getKey();
+      E rVal = entry.getValue();
+      E lVal = dest.get(key);
+      if (lVal == null) {
+        dest.put(key, copyFn.apply(rVal));
+      } else {
+        dest.put(key, aggregateFn.apply(lVal, rVal));
+      }
+    });
+  }
+
+  public static Long aggregateCounters(Long l, Long r) {

Review comment:
       If they're not nullable, could we use the primitive `long` type instead?

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsStoreImpl.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import javax.annotation.Nullable;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.MeanStatistic;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MIN;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.dynamicIOStatistics;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.maybeUpdateMaximum;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.maybeUpdateMinimum;
+
+/**
+ * Implementation of {@link IOStatisticsStore}.
+ */
+final class IOStatisticsStoreImpl extends WrappedIOStatistics
+    implements IOStatisticsStore {
+
+  /**
+   * Log changes at debug.
+   * Noisy, but occasionally useful.
+   */
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsStoreImpl.class);
+
+  private final Map<String, AtomicLong> counterMap = new HashMap<>();
+

Review comment:
       I'd consider using a ConcurrentHashMap if possible

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsLogging.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
+
+/**
+ * Utility operations convert IO Statistics sources/instances
+ * to strings, especially for robustly logging.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class IOStatisticsLogging {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsLogging.class);
+
+  private IOStatisticsLogging() {
+  }
+
+  /**
+   * Extract the statistics from a source object -or ""
+   * if it is not a source of statistics
+   * <p>
+   * Exceptions are caught and downgraded to debug logging.
+   * @param source source of statistics.
+   * @return a string for logging.
+   */
+  public static String ioStatisticsSourceToString(@Nullable Object source) {
+    try {
+      return ioStatisticsToString(retrieveIOStatistics(source));
+    } catch (RuntimeException e) {
+      LOG.debug("Ignoring", e);
+      return "";
+    }
+  }
+
+  /**
+   * Convert IOStatistics to a string form.
+   * @param statistics A statistics instance.
+   * @return string value or the empty string if null
+   */
+  public static String ioStatisticsToString(
+      @Nullable final IOStatistics statistics) {
+    if (statistics != null) {
+      StringBuilder sb = new StringBuilder();
+      mapToString(sb, "counters", statistics.counters());
+      mapToString(sb, "gauges", statistics.gauges());
+      mapToString(sb, "minimums", statistics.minimums());
+      mapToString(sb, "maximums", statistics.maximums());
+      mapToString(sb, "means", statistics.meanStatistics());
+
+      return sb.toString();
+    } else {
+      return "";
+    }
+  }
+
+  private static <E> void mapToString(StringBuilder sb,
+      final String type,
+      final Map<String, E> map) {
+    int count = 0;
+    sb.append(type);
+    sb.append("=(");
+    for (Map.Entry<String, E> entry : map.entrySet()) {
+      if (count > 0) {
+        sb.append(' ');
+      }
+      count++;
+      sb.append(IOStatisticsBinding.entryToString(
+          entry.getKey(), entry.getValue()));
+    }

Review comment:
       if we have java streams then we could do something like:
   
   ```java
   sb.append(map.entrySet().stream()
     .map(entry -> IOStatisticsBinding.entryToString(entry.getKey(), entry.getValue())
     .collect(Collectors.joining(" ")));
   ````
   
   (FYI wrote this bit without a compiler so please verify)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org