You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2018/01/25 13:34:56 UTC

[sling-org-apache-sling-commons-threads] 01/02: SLING-7432 - Thread pool clean up code can lead to infinite loops in ThreadLocal.get

This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-commons-threads.git

commit 69c0e9ff0dcbb6e4e24ced6fdde4919816169a81
Author: Robert Munteanu <ro...@apache.org>
AuthorDate: Wed Jan 24 23:11:54 2018 +0200

    SLING-7432 - Thread pool clean up code can lead to infinite loops in
    ThreadLocal.get
    
    Stop using thread locals, as they might interfere with the save/restore
    logic. Got no more hangs in ThreadLocal code after this change.
---
 .../commons/threads/impl/ThreadLocalCleaner.java   | 220 ++++++++++++---------
 .../ThreadPoolExecutorCleaningThreadLocals.java    |  16 +-
 2 files changed, 137 insertions(+), 99 deletions(-)

diff --git a/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java b/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java
index b755034..5a85f72 100644
--- a/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java
+++ b/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java
@@ -21,12 +21,17 @@ import java.lang.reflect.Field;
 import java.util.Arrays;
 
 import org.apache.sling.commons.threads.impl.ThreadLocalChangeListener.Mode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Notifies a {@link ThreadLocalChangeListener} about changes on a thread local storage. In addition it removes all references to variables
  * being added to the thread local storage while the cleaner was running with its {@link cleanup} method.
  * 
  * @see <a href="http://www.javaspecialists.eu/archive/Issue229.html">JavaSpecialist.eu - Cleaning ThreadLocals</a> */
 public class ThreadLocalCleaner {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(ThreadLocalCleaner.class);
+    
     private final ThreadLocalChangeListener listener;
 
     /* Reflection fields */
@@ -47,15 +52,6 @@ public class ThreadLocalCleaner {
     private static Field threadLocalMapThresholdField;
     private static volatile IllegalStateException reflectionException;
 
-
-    public ThreadLocalCleaner(ThreadLocalChangeListener listener) {
-        if (threadLocalsField == null || reflectionException != null) {
-            initReflectionFields();
-        }
-        this.listener = listener;
-        saveOldThreadLocals();
-    }
-
     private static synchronized void initReflectionFields() throws IllegalStateException {
         // check if previous initialization lead to an exception
         if (reflectionException != null) {
@@ -80,11 +76,89 @@ public class ThreadLocalCleaner {
             }
         }
     }
+    
+    /** @param c the class containing the field
+     * @param name the name of the field
+     * @return the field from the given class with the given name (made accessible)
+     * @throws NoSuchFieldException */
+    private static Field field(Class<?> c, String name)
+            throws NoSuchFieldException {
+        Field field = c.getDeclaredField(name);
+        field.setAccessible(true);
+        return field;
+    }
+
+    /** @param clazz the class containing the inner class
+     * @param name the name of the inner class
+     * @return the class with the given name, declared as inner class of the given class */
+    private static Class<?> inner(Class<?> clazz, String name) {
+        for (Class<?> c : clazz.getDeclaredClasses()) {
+            if (c.getSimpleName().equals(name)) {
+                return c;
+            }
+        }
+        throw new IllegalStateException(
+                "Could not find inner class " + name + " in " + clazz);
+    }
+    
+    private static Reference<?>[] copy(Field field) {
+        try {
+            Thread thread = Thread.currentThread();
+            Object threadLocals = field.get(thread);
+            if (threadLocals == null)
+                return null;
+            Reference<?>[] table = (Reference<?>[]) tableField.get(threadLocals);
+            return Arrays.copyOf(table, table.length);
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException("Access denied", e);
+        }
+    }
+
+    private static Integer size(Field field, Field sizeField) {
+        try {
+            Thread thread = Thread.currentThread();
+            Object threadLocals = field.get(thread);
+            if (threadLocals == null)
+                return null;
+            return (Integer) sizeField.get(threadLocals);
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException("Access denied", e);
+        }
+    }
+    
+    private ThreadLocalMapCopy threadLocalsCopy;
+    private ThreadLocalMapCopy inheritableThreadLocalsCopy;
+    
+    private static void restore(Field field, Object[] value, Integer size, Integer threshold) {
+        try {
+            Thread thread = Thread.currentThread();
+            if (value == null) {
+                field.set(thread, null);
+                LOG.debug("Restored {} to a null value", field.getName());
+            } else {
+                final Object threadLocals = field.get(thread);
+                tableField.set(threadLocals, value);
+                threadLocalMapSizeField.set(threadLocals, size);
+                threadLocalMapThresholdField.set(threadLocals, threshold);
+                LOG.debug("Restored {} with to {} references, size {}, threshold {}" ,field.getName(), value.length, size, threshold);
+            }
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException("Access denied", e);
+        }
+    }
+    
+    public ThreadLocalCleaner(ThreadLocalChangeListener listener) {
+        if (threadLocalsField == null || reflectionException != null) {
+            initReflectionFields();
+        }
+        this.listener = listener;
+        saveOldThreadLocals();
+    }
 
     public void cleanup() {
         // the first two diff calls are only to notify the listener, the actual cleanup is done by restoreOldThreadLocals
-        diff(threadLocalsField, copyOfThreadLocals.get());
-        diff(inheritableThreadLocalsField, copyOfInheritableThreadLocals.get());
+        diff(threadLocalsField, threadLocalsCopy.references);
+        diff(inheritableThreadLocalsField, inheritableThreadLocalsCopy.references);
         restoreOldThreadLocals();
     }
 
@@ -114,8 +188,8 @@ public class ThreadLocalCleaner {
                 // nested loop - both arrays *should* be relatively small
                 next: for (Reference<?> curRef : current) {
                     if (curRef != null) {
-                        if (curRef.get() == copyOfThreadLocals ||
-                                curRef.get() == copyOfInheritableThreadLocals) {
+                        if (curRef.get() == this.threadLocalsCopy ||
+                                curRef.get() == this.inheritableThreadLocalsCopy) {
                             continue next;
                         }
                         for (Reference<?> backupRef : backup) {
@@ -151,96 +225,54 @@ public class ThreadLocalCleaner {
         }
     }
 
-    /** @param c the class containing the field
-     * @param name the name of the field
-     * @return the field from the given class with the given name (made accessible)
-     * @throws NoSuchFieldException */
-    private static Field field(Class<?> c, String name)
-            throws NoSuchFieldException {
-        Field field = c.getDeclaredField(name);
-        field.setAccessible(true);
-        return field;
+    private void saveOldThreadLocals() {
+        
+        threadLocalsCopy = new ThreadLocalMapCopy(copy(threadLocalsField), 
+                size(threadLocalsField, threadLocalMapSizeField),
+                size(threadLocalsField, threadLocalMapThresholdField));
+        threadLocalsCopy.debug("saved", "Thread locals");
+        
+        inheritableThreadLocalsCopy = new ThreadLocalMapCopy(copy(inheritableThreadLocalsField),
+                size(inheritableThreadLocalsField, threadLocalMapSizeField),
+                size(inheritableThreadLocalsField, threadLocalMapThresholdField));
+        inheritableThreadLocalsCopy.debug("saved", "Inheritable thread locals");
     }
 
-    /** @param clazz the class containing the inner class
-     * @param name the name of the inner class
-     * @return the class with the given name, declared as inner class of the given class */
-    private static Class<?> inner(Class<?> clazz, String name) {
-        for (Class<?> c : clazz.getDeclaredClasses()) {
-            if (c.getSimpleName().equals(name)) {
-                return c;
-            }
-        }
-        throw new IllegalStateException(
-                "Could not find inner class " + name + " in " + clazz);
-    }
-
-    private static final ThreadLocal<Reference<?>[]> copyOfThreadLocals = new ThreadLocal<>();
-    private static final ThreadLocal<Integer> copyOfThreadLocalsSize = new ThreadLocal<>();
-    private static final ThreadLocal<Integer> copyOfThreadLocalsThreshold = new ThreadLocal<>();
-    private static final ThreadLocal<Reference<?>[]> copyOfInheritableThreadLocals = new ThreadLocal<>();
-    private static final ThreadLocal<Integer> copyOfInheritableThreadLocalsSize = new ThreadLocal<>();
-    private static final ThreadLocal<Integer> copyOfInheritableThreadLocalsThreshold = new ThreadLocal<>();
-
-    private static void saveOldThreadLocals() {
-        copyOfThreadLocals.set(copy(threadLocalsField));
-        copyOfThreadLocalsSize.set(size(threadLocalsField, threadLocalMapSizeField));
-        copyOfThreadLocalsThreshold.set(size(threadLocalsField, threadLocalMapThresholdField));
-        copyOfInheritableThreadLocals.set(copy(inheritableThreadLocalsField));
-        copyOfInheritableThreadLocalsSize.set(size(inheritableThreadLocalsField, threadLocalMapSizeField));
-        copyOfInheritableThreadLocalsThreshold.set(size(inheritableThreadLocalsField, threadLocalMapThresholdField));
-    }
-
-    private static Reference<?>[] copy(Field field) {
+    private void restoreOldThreadLocals() {
         try {
-            Thread thread = Thread.currentThread();
-            Object threadLocals = field.get(thread);
-            if (threadLocals == null)
-                return null;
-            Reference<?>[] table = (Reference<?>[]) tableField.get(threadLocals);
-            return Arrays.copyOf(table, table.length);
-        } catch (IllegalAccessException e) {
-            throw new IllegalStateException("Access denied", e);
-        }
-    }
-
-    private static Integer size(Field field, Field sizeField) {
-        try {
-            Thread thread = Thread.currentThread();
-            Object threadLocals = field.get(thread);
-            if (threadLocals == null)
-                return null;
-            return (Integer) sizeField.get(threadLocals);
-        } catch (IllegalAccessException e) {
-            throw new IllegalStateException("Access denied", e);
-        }
-    }
-
-    private static void restoreOldThreadLocals() {
-        try {
-            restore(inheritableThreadLocalsField, copyOfInheritableThreadLocals.get(),
-                copyOfInheritableThreadLocalsSize.get(), copyOfInheritableThreadLocalsThreshold.get());
-            restore(threadLocalsField, copyOfThreadLocals.get(),
-                copyOfThreadLocalsSize.get(), copyOfThreadLocalsThreshold.get());
+            restore(inheritableThreadLocalsField, inheritableThreadLocalsCopy.references,
+                inheritableThreadLocalsCopy.size, inheritableThreadLocalsCopy.threshold);
+            restore(threadLocalsField, threadLocalsCopy.references,
+                threadLocalsCopy.size, threadLocalsCopy.threshold);
         } finally {
-            copyOfThreadLocals.remove();
-            copyOfInheritableThreadLocals.remove();
+            threadLocalsCopy = null;
+            inheritableThreadLocalsCopy = null;
         }
     }
 
-    private static void restore(Field field, Object value, Integer size, Integer threshold) {
-        try {
-            Thread thread = Thread.currentThread();
-            if (value == null) {
-                field.set(thread, null);
+    /**
+     * Helper class that encapsulates the state from a <tt>ThreadLocalMap</tt>
+     *
+     */
+    static class ThreadLocalMapCopy {
+        
+        private final Reference<?>[] references;
+        private final Integer size;
+        private final Integer threshold;
+        
+        private ThreadLocalMapCopy(Reference<?>[] references, Integer size, Integer threshold) {
+            this.references = references;
+            this.size = size;
+            this.threshold = threshold;
+        }
+        
+        void debug(String event, String mapName) {
+            if ( references != null ) {
+                ThreadLocalCleaner.LOG.debug("{}: {} {} references, size: {}, threshold: {}",
+                    mapName, event, references.length, size, threshold);
             } else {
-                final Object threadLocals = field.get(thread);
-                tableField.set(threadLocals, value);
-                threadLocalMapSizeField.set(threadLocals, size);
-                threadLocalMapThresholdField.set(threadLocals, threshold);
+                ThreadLocalCleaner.LOG.debug("{}: {} null references", mapName, event);
             }
-        } catch (IllegalAccessException e) {
-            throw new IllegalStateException("Access denied", e);
         }
     }
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java b/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java
index 922f18b..8f7a9a6 100644
--- a/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java
+++ b/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java
@@ -18,6 +18,8 @@ package org.apache.sling.commons.threads.impl;
 
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -35,6 +37,8 @@ public class ThreadPoolExecutorCleaningThreadLocals extends ThreadPoolExecutor {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+    private final ConcurrentMap<Thread, ThreadLocalCleaner> cleaners = new ConcurrentHashMap<>();
+    
     public ThreadPoolExecutorCleaningThreadLocals(int corePoolSize,
             int maximumPoolSize,
             long keepAliveTime,
@@ -48,26 +52,28 @@ public class ThreadPoolExecutorCleaningThreadLocals extends ThreadPoolExecutor {
         this.listener = listener;
     }
 
-    private static final ThreadLocal<ThreadLocalCleaner> local = new ThreadLocal<>();
-
     protected void beforeExecute(Thread t, Runnable r) {
         LOGGER.debug("Collecting changes to ThreadLocal for thread {} from now on...", t);
         try {
             ThreadLocalCleaner cleaner = new ThreadLocalCleaner(listener);
-            local.set(cleaner);
+            cleaners.put(t, cleaner);
         } catch (Throwable e) {
             LOGGER.warn("Could not set up thread local cleaner (most probably not a compliant JRE): {}", e, e);
         }
+        
+        super.beforeExecute(t, r);
     }
 
     protected void afterExecute(Runnable r, Throwable t) {
+        super.afterExecute(r, t);
+        
         LOGGER.debug("Cleaning up thread locals for thread {}...", Thread.currentThread());
-        ThreadLocalCleaner cleaner = local.get();
+        ThreadLocalCleaner cleaner = cleaners.remove(Thread.currentThread());
+
         if (cleaner != null) {
             cleaner.cleanup();
         } else {
             LOGGER.warn("Could not clean up thread locals in thread {} as the cleaner was not set up correctly", Thread.currentThread());
         }
-        local.remove();
     }
 }

-- 
To stop receiving notification emails like this one, please contact
rombert@apache.org.