You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/02/02 08:26:32 UTC

[GitHub] [hadoop] mehakmeet commented on a change in pull request #3930: HADOOP-18091. S3A auditing leaks memory through ThreadLocal references

mehakmeet commented on a change in pull request #3930:
URL: https://github.com/apache/hadoop/pull/3930#discussion_r797340367



##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.lang.ref.WeakReference;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A map of keys type K to objects of type V which uses weak references,
+ * so does lot leak memory through long-lived references
+ * <i>at the expense of losing references when GC takes place.</i>.
+ *
+ * This class is intended be used instead of ThreadLocal storage when
+ * references are to be cleaned up when the instance holding.
+ * In this use case, the key is the Long key.
+ *
+ * Concurrency.
+ * The class assumes that map entries are rarely contended for when writing,
+ * and that not blocking other threads is more important than atomicity.
+ * - a ConcurrentHashMap is used to map keys to weak references, with
+ *   all its guarantees.
+ * - there is no automatic pruning.
+ * - see {@link #create(Object)} for the concurrency semantics on entry creation.
+ */
+@InterfaceAudience.Private
+public class WeakReferenceMap<K, V> {
+
+  /**
+   * The reference map.
+   */
+  private final Map<K, WeakReference<V>> map = new ConcurrentHashMap<>();
+
+  /**
+   * Supplier of new instances.
+   */
+  private final Function<? super K, ? extends V> factory;
+
+  /**
+   * Nullable callback when a get on a key got a weak reference back.
+   * The assumption is that this is for logging/stats, which is why
+   * no attempt is made to use the call as a supplier of a new value.
+   */
+  private final Consumer<? super K> referenceLost;
+
+  /**
+   * Counter of references lost.
+   */
+  private final AtomicLong referenceLostCount = new AtomicLong();
+
+  /**
+   * Counter of entries created.
+   */
+  private final AtomicLong entriesCreatedCount = new AtomicLong();
+
+  /**
+   * instantiate.
+   * @param factory supplier of new instances
+   * @param referenceLost optional callback on lost references.
+   */
+  public WeakReferenceMap(
+      Function<? super K, ? extends V> factory,
+      @Nullable final Consumer<? super K> referenceLost) {
+
+    this.factory = requireNonNull(factory);
+    this.referenceLost = referenceLost;
+  }
+
+  @Override
+  public String toString() {
+    return "WeakReferenceMap{" +
+        "size=" + size() +
+        ", referenceLostCount=" + referenceLostCount +
+        ", entriesCreatedCount=" + entriesCreatedCount +
+        '}';
+  }
+
+  /**
+   * Map size.
+   * @return the current map size.
+   */
+  public int size() {
+    return map.size();
+  }
+
+  /**
+   * Clear all entries.
+   */
+  public void clear() {
+    map.clear();
+  }
+
+  /**
+   * look up the value, returning the possibly empty weak reference
+   * to a value, or null if no value was found.
+   * @param key key to look up
+   * @return null if there is no entry, a weak reference if found
+   */
+  public WeakReference<V> lookup(K key) {
+    return map.get(key);
+  }
+
+  /**
+   * Get the value, creating if needed.
+   * @param key key.
+   * @return an instance.
+   */
+  public V get(K key) {
+    final WeakReference<V> current = lookup(key);
+    V val = resolve(current);
+    if (val != null) {
+      // all good.
+      return  val;
+    }
+
+    // here, either no ref, or the value is null
+    if (current != null) {
+      noteLost(key);
+    }
+    return create(key);
+  }
+
+  /**
+   * Create a new instance under a key.
+   * The instance is created, added to the map and then the
+   * map value retrieved.
+   * This ensures that the reference returned is that in the map,
+   * even if there is more than one entry being created at the same time.
+   * @param key key
+   * @return the value
+   */
+  public V create(K key) {
+    entriesCreatedCount.incrementAndGet();
+    WeakReference<V> newRef = new WeakReference<>(
+        requireNonNull(factory.apply(key)));
+    map.put(key, newRef);
+    return map.get(key).get();
+  }
+
+  /**
+   * Put a value under the key.
+   * A null value can be put, though on a get() call
+   * a new entry is generated
+   *
+   * @param key key
+   * @param value value
+   * @return any old non-null reference.
+   */
+  public V put(K key, V value) {
+    return resolve(map.put(key, new WeakReference<>(value)));
+  }
+
+  /**
+   * Remove any value under the key.
+   * @param key key
+   * @return any old non-null reference.
+   */
+  public V remove(K key) {
+    return resolve(map.remove(key));
+  }
+
+  /**
+   * Does the map have a valid reference for this object?
+   * no-side effects: there's no attempt to notify or cleanup
+   * if the reference is null.
+   * @param key key to look up
+   * @return true if there is a valid reference.
+   */
+  public boolean containsKey(K key) {
+    final WeakReference<V> current = lookup(key);
+    return resolve(current) != null;
+  }
+
+  /**
+   * Given a possibly null weak reference, resolve
+   * its value.
+   * @param r reference to resolve
+   * @return the value or null
+   */
+  private V resolve(WeakReference<V> r) {
+    return r == null ? null : r.get();
+  }
+
+  /**
+   * Prune all null weak references, calling the referenceLost
+   * callback for each one.
+   *
+   * non-atomic and non-blocking.
+   * @return the number of entries pruned.
+   */
+  public int prune() {

Review comment:
       Why do we need to prune for WeakReferences, thought the point of using them was that we don't need to manually do this, and they'll be cleared automatically through the gc? Maybe I'm not understanding this right, please let me know

##########
File path: hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
##########
@@ -82,3 +82,6 @@ log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO
 #log4j.logger.org.apache.hadoop.fs.s3a.audit=DEBUG
 # log request creation, span lifecycle and other low-level details
 #log4j.logger.org.apache.hadoop.fs.s3a.audit=TRACE
+
+# uncomment this to trace where context entries are set
+# log4j.logger.org.apache.hadoop.fs.audit.CommonAuditContext=TRACE

Review comment:
       nit: Newline at eof

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.util.WeakReferenceMap;
+
+/**
+ * A WeakReferenceMap for threads.
+ * @param <V> value type of the map
+ */
+public class WeakReferenceThreadMap<V> extends WeakReferenceMap<Long, V> {
+
+  public WeakReferenceThreadMap(final Function<? super Long, ? extends V> factory,
+      @Nullable final Consumer<? super Long> referenceLost) {
+    super(factory, referenceLost);
+  }
+
+  public V getForCurrentThread() {
+    return get(curentThreadId());
+  }
+
+  public V removeForCurrentThread() {
+    return remove(curentThreadId());
+  }
+
+  public long curentThreadId() {

Review comment:
       typo: "current"

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java
##########
@@ -111,6 +118,14 @@
   public static final String NOT_A_WRAPPED_SPAN
       = "Span attached to request is not a wrapped span";
 
+  /**
+   * Arbritary threshold for triggering pruning on deactivation.

Review comment:
       typo: "Arbitrary"

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.lang.ref.WeakReference;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A map of keys type K to objects of type V which uses weak references,
+ * so does lot leak memory through long-lived references
+ * <i>at the expense of losing references when GC takes place.</i>.
+ *
+ * This class is intended be used instead of ThreadLocal storage when
+ * references are to be cleaned up when the instance holding.
+ * In this use case, the key is the Long key.
+ *
+ * Concurrency.
+ * The class assumes that map entries are rarely contended for when writing,
+ * and that not blocking other threads is more important than atomicity.
+ * - a ConcurrentHashMap is used to map keys to weak references, with
+ *   all its guarantees.
+ * - there is no automatic pruning.
+ * - see {@link #create(Object)} for the concurrency semantics on entry creation.
+ */
+@InterfaceAudience.Private
+public class WeakReferenceMap<K, V> {
+
+  /**
+   * The reference map.
+   */
+  private final Map<K, WeakReference<V>> map = new ConcurrentHashMap<>();
+
+  /**
+   * Supplier of new instances.
+   */
+  private final Function<? super K, ? extends V> factory;
+
+  /**
+   * Nullable callback when a get on a key got a weak reference back.
+   * The assumption is that this is for logging/stats, which is why
+   * no attempt is made to use the call as a supplier of a new value.
+   */
+  private final Consumer<? super K> referenceLost;
+
+  /**
+   * Counter of references lost.
+   */
+  private final AtomicLong referenceLostCount = new AtomicLong();
+
+  /**
+   * Counter of entries created.
+   */
+  private final AtomicLong entriesCreatedCount = new AtomicLong();
+
+  /**
+   * instantiate.
+   * @param factory supplier of new instances
+   * @param referenceLost optional callback on lost references.
+   */
+  public WeakReferenceMap(
+      Function<? super K, ? extends V> factory,
+      @Nullable final Consumer<? super K> referenceLost) {
+
+    this.factory = requireNonNull(factory);
+    this.referenceLost = referenceLost;
+  }
+
+  @Override
+  public String toString() {
+    return "WeakReferenceMap{" +
+        "size=" + size() +
+        ", referenceLostCount=" + referenceLostCount +
+        ", entriesCreatedCount=" + entriesCreatedCount +
+        '}';
+  }
+
+  /**
+   * Map size.
+   * @return the current map size.
+   */
+  public int size() {
+    return map.size();
+  }
+
+  /**
+   * Clear all entries.
+   */
+  public void clear() {
+    map.clear();
+  }
+
+  /**
+   * look up the value, returning the possibly empty weak reference
+   * to a value, or null if no value was found.
+   * @param key key to look up
+   * @return null if there is no entry, a weak reference if found
+   */
+  public WeakReference<V> lookup(K key) {
+    return map.get(key);
+  }
+
+  /**
+   * Get the value, creating if needed.
+   * @param key key.
+   * @return an instance.
+   */
+  public V get(K key) {
+    final WeakReference<V> current = lookup(key);
+    V val = resolve(current);
+    if (val != null) {
+      // all good.
+      return  val;
+    }
+
+    // here, either no ref, or the value is null
+    if (current != null) {
+      noteLost(key);
+    }
+    return create(key);
+  }
+
+  /**
+   * Create a new instance under a key.
+   * The instance is created, added to the map and then the
+   * map value retrieved.
+   * This ensures that the reference returned is that in the map,
+   * even if there is more than one entry being created at the same time.
+   * @param key key
+   * @return the value
+   */
+  public V create(K key) {
+    entriesCreatedCount.incrementAndGet();
+    WeakReference<V> newRef = new WeakReference<>(
+        requireNonNull(factory.apply(key)));
+    map.put(key, newRef);
+    return map.get(key).get();
+  }
+
+  /**
+   * Put a value under the key.
+   * A null value can be put, though on a get() call
+   * a new entry is generated
+   *
+   * @param key key
+   * @param value value
+   * @return any old non-null reference.
+   */
+  public V put(K key, V value) {
+    return resolve(map.put(key, new WeakReference<>(value)));
+  }
+
+  /**
+   * Remove any value under the key.
+   * @param key key
+   * @return any old non-null reference.
+   */
+  public V remove(K key) {
+    return resolve(map.remove(key));
+  }
+
+  /**
+   * Does the map have a valid reference for this object?
+   * no-side effects: there's no attempt to notify or cleanup
+   * if the reference is null.
+   * @param key key to look up
+   * @return true if there is a valid reference.
+   */
+  public boolean containsKey(K key) {
+    final WeakReference<V> current = lookup(key);
+    return resolve(current) != null;
+  }
+
+  /**
+   * Given a possibly null weak reference, resolve
+   * its value.
+   * @param r reference to resolve
+   * @return the value or null
+   */
+  private V resolve(WeakReference<V> r) {
+    return r == null ? null : r.get();
+  }
+
+  /**
+   * Prune all null weak references, calling the referenceLost
+   * callback for each one.
+   *
+   * non-atomic and non-blocking.
+   * @return the number of entries pruned.
+   */
+  public int prune() {
+    int count = 0;
+    final Iterator<Map.Entry<K, WeakReference<V>>> it = map.entrySet().iterator();
+    while (it.hasNext()) {
+      final Map.Entry<K, WeakReference<V>> next = it.next();
+      if (next.getValue().get() == null) {
+        it.remove();
+        count++;
+        noteLost(next.getKey());
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Notify the reference lost callback.
+   * @param key key of lost reference
+   */
+  private void noteLost(final K key) {
+    // incrment local counter

Review comment:
       typo: "increment"

##########
File path: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java
##########
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.impl;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.audit.MemoryHungryAuditor;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_SERVICE_CLASSNAME;
+import static org.apache.hadoop.fs.s3a.audit.impl.ActiveAuditManagerS3A.PRUNE_THRESHOLD;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore;
+
+/**
+ * This test attempts to recreate the OOM problems of
+ * HADOOP-18091. S3A auditing leaks memory through ThreadLocal references
+ * it does this by creating a thread pool, then
+ * creates and destroys FS instances, with threads in
+ * the pool (but not the main JUnit test thread) creating
+ * audit spans.
+ *
+ * With a custom audit span with a large memory footprint,
+ * memory demands will be high, and if the closed instances
+ * don't get cleaned up, memory runs out.
+ * GCs are forced.
+ * It is critical no spans are created in the junit thread because they will
+ * last for the duration of the test JVM.
+ */
+@SuppressWarnings("ResultOfMethodCallIgnored")
+public class TestActiveAuditManagerThreadLeakage extends AbstractHadoopTestBase {
+  /**
+   * Logging.
+   */
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestActiveAuditManagerThreadLeakage.class);
+
+  /** how many managers to sequentially create. */
+  private static final int MANAGER_COUNT = 500;
+
+  /** size of long lived hread pool. */
+  private static final int THREAD_COUNT = 20;
+  private ExecutorService workers;
+
+  /**
+   * count of prunings which have taken place in the manager lifecycle
+   * operations.
+   */
+  private int pruneCount;
+
+  /**
+   * As audit managers are created they are added to the list,
+   * so we can verify they get GC'd.
+   */
+  private final List<WeakReference<ActiveAuditManagerS3A>> auditManagers =
+      new ArrayList<>();
+
+  /**
+   * When the service is stopped, the span map is
+   * cleared immediately.
+   */
+  @Test
+  public void testSpanMapClearedInServiceStop() throws IOException {
+    try (ActiveAuditManagerS3A auditManager =
+             new ActiveAuditManagerS3A(emptyStatisticsStore())) {
+      auditManager.init(createMemoryHungryConfiguration());
+      auditManager.start();
+      auditManager.getActiveAuditSpan();
+      // get the span map
+      final WeakReferenceThreadMap<?> spanMap
+          = auditManager.getActiveSpanMap();
+      Assertions.assertThat(spanMap.size())
+          .describedAs("map size")
+          .isEqualTo(1);
+      auditManager.stop();
+      Assertions.assertThat(spanMap.size())
+          .describedAs("map size")
+          .isEqualTo(0);
+    }
+  }
+
+  @Test
+  public void testMemoryLeak() throws Throwable {
+    workers = Executors.newFixedThreadPool(THREAD_COUNT);
+    for (int i = 0; i < MANAGER_COUNT; i++) {
+      final long oneAuditConsumption = createAndTestOneAuditor();
+      LOG.info("manager {} memory retained {}", i, oneAuditConsumption);
+    }
+
+    // pruning must have taken place.
+    // that's somewhat implicit in the test not going OOM.
+    // but if memory allocation in test runs increase, it
+    // may cease to hold. in which case: create more
+    // audit managers
+    LOG.info("Totel prune count {}", pruneCount);
+
+    Assertions.assertThat(pruneCount)
+        .describedAs("Totel prune count")
+        .isNotZero();
+
+    // now count number of audit managers GC'd
+    // some must have been GC'd, showing that no other
+    // references are being retained internally.
+    Assertions.assertThat(auditManagers.stream()
+            .filter((r) -> r.get() == null)
+            .count())
+        .describedAs("number of audit managers garbage collected")
+        .isNotZero();
+  }
+
+  /**
+   * Create, use and then shutdown one auditor in a unique thread.
+   * @return memory consumed/released
+   */
+  private long createAndTestOneAuditor() throws Exception {
+    long original = Runtime.getRuntime().freeMemory();
+    ExecutorService factory = Executors.newSingleThreadExecutor();
+
+    pruneCount += factory.submit(this::createAuditorAndWorkers).get();
+
+    factory.shutdown();
+    factory.awaitTermination(60, TimeUnit.SECONDS);
+
+    final long current = Runtime.getRuntime().freeMemory();
+    return current - original;
+
+  }
+
+  /**
+   * This is the core of the leakage test.
+   * Create an audit manager and spans across multiple threads.
+   * The spans are created in the long-lived pool, so if there is
+   * any bonding of the life of managers/spans to that of threads,
+   * it will surface as OOM events.
+   * @return count of weak references whose reference values were
+   * nullified.
+   */
+  private int createAuditorAndWorkers()
+      throws IOException, InterruptedException, ExecutionException {
+    try (ActiveAuditManagerS3A auditManager =
+             new ActiveAuditManagerS3A(emptyStatisticsStore())) {
+      auditManager.init(createMemoryHungryConfiguration());
+      auditManager.start();
+      LOG.info("Using {}", auditManager);
+      auditManagers.add(new WeakReference<>(auditManager));
+
+      // no guarantee every thread gets used, so track
+      // in a set. This will give us the thread ID of every
+      // entry in the map.
+
+      Set<Long> threadIds = new HashSet<>();
+
+      List<Future<Result>> futures = new ArrayList<>(THREAD_COUNT);
+
+      // perform the spanning operation in a long lived thread.
+      for (int i = 0; i < THREAD_COUNT; i++) {
+        futures.add(workers.submit(() -> spanningOperation(auditManager)));
+      }
+
+      // get the results and so determine the thread IDs
+      for (Future<Result> future : futures) {
+        final Result r = future.get();
+        threadIds.add(r.getThreadId());
+      }
+
+      final int threadsUsed = threadIds.size();
+      final Long[] threadIdArray = threadIds.toArray(new Long[0]);
+
+      // gc
+      System.gc();
+      // get the span map
+      final WeakReferenceThreadMap<?> spanMap
+          = auditManager.getActiveSpanMap();
+
+      // count number of spans removed
+      final long derefenced = threadIds.stream()
+          .filter((id) -> !spanMap.containsKey(id))
+          .count();
+      if (derefenced > 0) {
+        LOG.info("{} executed across {} threads and dereferenced {} entries",
+            auditManager, threadsUsed, derefenced);
+      }
+
+      // resolve not quite all of the threads.
+      // why not all? leaves at least one for pruning
+      // but it does complicate some of the assertions...
+      int spansRecreated = 0;
+      int subset = threadIdArray.length - 1;
+      LOG.info("Resolving {} thread references", subset);
+      for (int i = 0; i < subset; i++) {
+        final long id = threadIdArray[i];
+
+        // note whether or not the span is present
+        final boolean present = spanMap.containsKey(id);
+
+        // get the the span for that ID. which must never be
+        // null
+        Assertions.assertThat(spanMap.get(id))
+            .describedAs("Span map entry for thread %d", id)
+            .isNotNull();
+
+        // if it wasn't present, the unbounded span must therefore have been
+        // bounded to this map entry.
+        if (!present) {
+          spansRecreated++;
+        }
+      }
+      LOG.info("Recreated {} spans", subset);
+
+      // if the number of spans lost is more than the number
+      // of entries not probed, then at least one span was
+      // recreated
+      if (derefenced > threadIdArray.length - subset) {
+        Assertions.assertThat(spansRecreated)
+            .describedAs("number of recreated spans")
+            .isGreaterThan(0);
+      }
+
+      // now prune.
+      int pruned = auditManager.prune();
+      if (pruned > 0) {
+        LOG.info("{} executed across {} threads and pruned {} entries",
+            auditManager, threadsUsed, pruned);
+      }
+      Assertions.assertThat(pruned)
+          .describedAs("Count of references pruned")
+          .isEqualTo(derefenced - spansRecreated);
+      return pruned + (int) derefenced;
+    }
+
+  }
+
+  private Configuration createMemoryHungryConfiguration() {
+    final Configuration conf = new Configuration(false);
+    conf.set(AUDIT_SERVICE_CLASSNAME, MemoryHungryAuditor.NAME);
+    return conf;
+  }
+
+  /**
+   * The operation in each worker thread.
+   * @param auditManager audit manager
+   * @return result of the call
+   * @throws IOException troluble
+   */
+  private Result spanningOperation(final ActiveAuditManagerS3A auditManager)
+      throws IOException {
+    auditManager.getActiveAuditSpan();
+    final AuditSpanS3A auditSpan =
+        auditManager.createSpan("span", null, null);
+    Assertions.assertThat(auditSpan)
+        .describedAs("audit span for current thread")
+        .isNotNull();
+    // this is needed to ensure that more of the thread pool is used up
+    Thread.yield();
+    return new Result(Thread.currentThread().getId());
+  }
+
+  /**
+   * Result of the spanning operation.
+   */
+  private static final class Result {
+    /** thread operation took place in. */
+    private final long threadId;
+
+
+    private Result(final long threadId) {
+      this.threadId = threadId;
+    }
+
+    private long getThreadId() {
+      return threadId;
+    }
+
+
+  }
+
+  /**
+   * Verifu that pruning takes place intermittently.

Review comment:
       typo: "Verify"

##########
File path: hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md
##########
@@ -119,16 +119,81 @@ The auditor then creates and returns a span for the specific operation.
 The AuditManagerS3A will automatically activate the span returned by the auditor
 (i.e. assign it the thread local variable tracking the active span in each thread).
 
-### Memory Leakage through `ThreadLocal` use
+### Memory Leakage through `ThreadLocal` misuse
 
-This architecture contains a critical defect,
+The original implementation of the integration with the S3AFileSystem class
+contained a critical defect,
 [HADOOP-18091](https://issues.apache.org/jira/browse/HADOOP-18091) _S3A auditing leaks memory through ThreadLocal references_.
 
-The code was written assuming that when the `ActiveAuditManagerS3A` service is
-stopped, it's `ThreadLocal` fields would be freed.
-In fact, they are retained until the threads with references are terminated.
+The original code was written with the assumption that when the `ActiveAuditManagerS3A` service was
+garbage collected, references in its `ThreadLocal` field would be freed.
+In fact, they are retained until all threads with references are terminated.
+If any long-lived thread had performed an s3 operation which created a span,
+a reference back to the audit manager instance was created
+*whose lifetime was that of the thread*
+
+In short-lived processes, and long-lived processes where a limited set of
+`S3AFileSystem` instances were reused, this had no adverse effect.
+Indeed, if the filesystem instances were retained in the cache until
+the process was shut down, there would be strong references to the owning
+`S3AFileSystem` instance anyway.
+
+Where it did have problems was when the following conditions were met
+1. Process was long-lived.
+2. Long-lived threads in the process invoked filesystem operations on `s3a://` URLs.
+3. Either `S3AFileSystem` instances were created repeatedly, rather than retrieved
+   from the cache of active instances.
+4. Or, after a query for a specific user was completed,
+   `FileSystem.closeAllForUGI(UserGroupInformation)` was invoked to remove all
+   cached FS instances of that user.
+
+Conditions 1, 2 and 4 are exactly those which long-lived Hive services can
+encounter.
+
+_Auditing was disabled by default until a fix was implemented._
+
+The memory leak has been fixed using a new class `org.apache.hadoop.util.WeakReferenceMap`
+to store a map of thread IDs to active spans. When the S3A fileystem is closed,

Review comment:
       typo : "filesystem"

##########
File path: hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md
##########
@@ -149,6 +214,39 @@ Spans may be used on different thread from that which they were created.
 Spans MUST always use the values from the `currentAuditContext()` in the creation
 thread.
 
+#### Memory Usage of `CommonAuditContext`
+
+The `CommonAuditContext` map has a `ThreadLocal` field to store global
+information which is intended to span multiple operations across multiple
+filesystems, for example the MapReduce or Spark job ID, which is set
+in the S3A committers.
+
+Applications and Hadoop code MUST NOT attach context entries
+which directly or indirectly consumes lots of memory, as the life
+of that memory use will become that of the thread.
+
+Applications and Hadoop code SHOULD remove context entries when
+no-longer needed.
+
+If memory leakage is suspected here, set the log
+`org.apache.hadoop.fs.audit.CommonAuditContext` to `TRACE`
+to log the origin of operations which add log entries.
+
+This will produce a log entry whose strack trace will show the caller chain.f

Review comment:
       typo: "stack trace"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org