You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2018/01/25 13:34:55 UTC

[sling-org-apache-sling-commons-threads] branch master updated (1ffb9d9 -> e046d2f)

This is an automated email from the ASF dual-hosted git repository.

rombert pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-commons-threads.git.


    from 1ffb9d9  SLING-7433 Improve logging in case ThreadLocalCleaner does not work
     new 69c0e9f  SLING-7432 - Thread pool clean up code can lead to infinite loops in ThreadLocal.get
     new e046d2f  SLING-7432 - Thread pool clean up code can lead to infinite loops in ThreadLocal.get

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../commons/threads/impl/ThreadLocalCleaner.java   | 220 ++++++++++++---------
 .../ThreadPoolExecutorCleaningThreadLocals.java    |  19 +-
 2 files changed, 139 insertions(+), 100 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
rombert@apache.org.

[sling-org-apache-sling-commons-threads] 02/02: SLING-7432 - Thread pool clean up code can lead to infinite loops in ThreadLocal.get

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-commons-threads.git

commit e046d2f36dd1727448571c590285c3156552edb4
Author: Robert Munteanu <ro...@apache.org>
AuthorDate: Thu Jan 25 15:34:42 2018 +0200

    SLING-7432 - Thread pool clean up code can lead to infinite loops in
    ThreadLocal.get
    
    Propagate exceptions that occur in beforeExecute, so that the fallback
    thread pool implementation is used.
---
 .../commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java b/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java
index 8f7a9a6..ca1fd3b 100644
--- a/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java
+++ b/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java
@@ -57,8 +57,9 @@ public class ThreadPoolExecutorCleaningThreadLocals extends ThreadPoolExecutor {
         try {
             ThreadLocalCleaner cleaner = new ThreadLocalCleaner(listener);
             cleaners.put(t, cleaner);
-        } catch (Throwable e) {
+        } catch (RuntimeException | Error e) {
             LOGGER.warn("Could not set up thread local cleaner (most probably not a compliant JRE): {}", e, e);
+            throw e;
         }
         
         super.beforeExecute(t, r);

-- 
To stop receiving notification emails like this one, please contact
rombert@apache.org.

[sling-org-apache-sling-commons-threads] 01/02: SLING-7432 - Thread pool clean up code can lead to infinite loops in ThreadLocal.get

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-commons-threads.git

commit 69c0e9ff0dcbb6e4e24ced6fdde4919816169a81
Author: Robert Munteanu <ro...@apache.org>
AuthorDate: Wed Jan 24 23:11:54 2018 +0200

    SLING-7432 - Thread pool clean up code can lead to infinite loops in
    ThreadLocal.get
    
    Stop using thread locals, as they might interfere with the save/restore
    logic. Got no more hangs in ThreadLocal code after this change.
---
 .../commons/threads/impl/ThreadLocalCleaner.java   | 220 ++++++++++++---------
 .../ThreadPoolExecutorCleaningThreadLocals.java    |  16 +-
 2 files changed, 137 insertions(+), 99 deletions(-)

diff --git a/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java b/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java
index b755034..5a85f72 100644
--- a/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java
+++ b/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java
@@ -21,12 +21,17 @@ import java.lang.reflect.Field;
 import java.util.Arrays;
 
 import org.apache.sling.commons.threads.impl.ThreadLocalChangeListener.Mode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Notifies a {@link ThreadLocalChangeListener} about changes on a thread local storage. In addition it removes all references to variables
  * being added to the thread local storage while the cleaner was running with its {@link cleanup} method.
  * 
  * @see <a href="http://www.javaspecialists.eu/archive/Issue229.html">JavaSpecialist.eu - Cleaning ThreadLocals</a> */
 public class ThreadLocalCleaner {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(ThreadLocalCleaner.class);
+    
     private final ThreadLocalChangeListener listener;
 
     /* Reflection fields */
@@ -47,15 +52,6 @@ public class ThreadLocalCleaner {
     private static Field threadLocalMapThresholdField;
     private static volatile IllegalStateException reflectionException;
 
-
-    public ThreadLocalCleaner(ThreadLocalChangeListener listener) {
-        if (threadLocalsField == null || reflectionException != null) {
-            initReflectionFields();
-        }
-        this.listener = listener;
-        saveOldThreadLocals();
-    }
-
     private static synchronized void initReflectionFields() throws IllegalStateException {
         // check if previous initialization lead to an exception
         if (reflectionException != null) {
@@ -80,11 +76,89 @@ public class ThreadLocalCleaner {
             }
         }
     }
+    
+    /** @param c the class containing the field
+     * @param name the name of the field
+     * @return the field from the given class with the given name (made accessible)
+     * @throws NoSuchFieldException */
+    private static Field field(Class<?> c, String name)
+            throws NoSuchFieldException {
+        Field field = c.getDeclaredField(name);
+        field.setAccessible(true);
+        return field;
+    }
+
+    /** @param clazz the class containing the inner class
+     * @param name the name of the inner class
+     * @return the class with the given name, declared as inner class of the given class */
+    private static Class<?> inner(Class<?> clazz, String name) {
+        for (Class<?> c : clazz.getDeclaredClasses()) {
+            if (c.getSimpleName().equals(name)) {
+                return c;
+            }
+        }
+        throw new IllegalStateException(
+                "Could not find inner class " + name + " in " + clazz);
+    }
+    
+    private static Reference<?>[] copy(Field field) {
+        try {
+            Thread thread = Thread.currentThread();
+            Object threadLocals = field.get(thread);
+            if (threadLocals == null)
+                return null;
+            Reference<?>[] table = (Reference<?>[]) tableField.get(threadLocals);
+            return Arrays.copyOf(table, table.length);
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException("Access denied", e);
+        }
+    }
+
+    private static Integer size(Field field, Field sizeField) {
+        try {
+            Thread thread = Thread.currentThread();
+            Object threadLocals = field.get(thread);
+            if (threadLocals == null)
+                return null;
+            return (Integer) sizeField.get(threadLocals);
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException("Access denied", e);
+        }
+    }
+    
+    private ThreadLocalMapCopy threadLocalsCopy;
+    private ThreadLocalMapCopy inheritableThreadLocalsCopy;
+    
+    private static void restore(Field field, Object[] value, Integer size, Integer threshold) {
+        try {
+            Thread thread = Thread.currentThread();
+            if (value == null) {
+                field.set(thread, null);
+                LOG.debug("Restored {} to a null value", field.getName());
+            } else {
+                final Object threadLocals = field.get(thread);
+                tableField.set(threadLocals, value);
+                threadLocalMapSizeField.set(threadLocals, size);
+                threadLocalMapThresholdField.set(threadLocals, threshold);
+                LOG.debug("Restored {} with to {} references, size {}, threshold {}" ,field.getName(), value.length, size, threshold);
+            }
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException("Access denied", e);
+        }
+    }
+    
+    public ThreadLocalCleaner(ThreadLocalChangeListener listener) {
+        if (threadLocalsField == null || reflectionException != null) {
+            initReflectionFields();
+        }
+        this.listener = listener;
+        saveOldThreadLocals();
+    }
 
     public void cleanup() {
         // the first two diff calls are only to notify the listener, the actual cleanup is done by restoreOldThreadLocals
-        diff(threadLocalsField, copyOfThreadLocals.get());
-        diff(inheritableThreadLocalsField, copyOfInheritableThreadLocals.get());
+        diff(threadLocalsField, threadLocalsCopy.references);
+        diff(inheritableThreadLocalsField, inheritableThreadLocalsCopy.references);
         restoreOldThreadLocals();
     }
 
@@ -114,8 +188,8 @@ public class ThreadLocalCleaner {
                 // nested loop - both arrays *should* be relatively small
                 next: for (Reference<?> curRef : current) {
                     if (curRef != null) {
-                        if (curRef.get() == copyOfThreadLocals ||
-                                curRef.get() == copyOfInheritableThreadLocals) {
+                        if (curRef.get() == this.threadLocalsCopy ||
+                                curRef.get() == this.inheritableThreadLocalsCopy) {
                             continue next;
                         }
                         for (Reference<?> backupRef : backup) {
@@ -151,96 +225,54 @@ public class ThreadLocalCleaner {
         }
     }
 
-    /** @param c the class containing the field
-     * @param name the name of the field
-     * @return the field from the given class with the given name (made accessible)
-     * @throws NoSuchFieldException */
-    private static Field field(Class<?> c, String name)
-            throws NoSuchFieldException {
-        Field field = c.getDeclaredField(name);
-        field.setAccessible(true);
-        return field;
+    private void saveOldThreadLocals() {
+        
+        threadLocalsCopy = new ThreadLocalMapCopy(copy(threadLocalsField), 
+                size(threadLocalsField, threadLocalMapSizeField),
+                size(threadLocalsField, threadLocalMapThresholdField));
+        threadLocalsCopy.debug("saved", "Thread locals");
+        
+        inheritableThreadLocalsCopy = new ThreadLocalMapCopy(copy(inheritableThreadLocalsField),
+                size(inheritableThreadLocalsField, threadLocalMapSizeField),
+                size(inheritableThreadLocalsField, threadLocalMapThresholdField));
+        inheritableThreadLocalsCopy.debug("saved", "Inheritable thread locals");
     }
 
-    /** @param clazz the class containing the inner class
-     * @param name the name of the inner class
-     * @return the class with the given name, declared as inner class of the given class */
-    private static Class<?> inner(Class<?> clazz, String name) {
-        for (Class<?> c : clazz.getDeclaredClasses()) {
-            if (c.getSimpleName().equals(name)) {
-                return c;
-            }
-        }
-        throw new IllegalStateException(
-                "Could not find inner class " + name + " in " + clazz);
-    }
-
-    private static final ThreadLocal<Reference<?>[]> copyOfThreadLocals = new ThreadLocal<>();
-    private static final ThreadLocal<Integer> copyOfThreadLocalsSize = new ThreadLocal<>();
-    private static final ThreadLocal<Integer> copyOfThreadLocalsThreshold = new ThreadLocal<>();
-    private static final ThreadLocal<Reference<?>[]> copyOfInheritableThreadLocals = new ThreadLocal<>();
-    private static final ThreadLocal<Integer> copyOfInheritableThreadLocalsSize = new ThreadLocal<>();
-    private static final ThreadLocal<Integer> copyOfInheritableThreadLocalsThreshold = new ThreadLocal<>();
-
-    private static void saveOldThreadLocals() {
-        copyOfThreadLocals.set(copy(threadLocalsField));
-        copyOfThreadLocalsSize.set(size(threadLocalsField, threadLocalMapSizeField));
-        copyOfThreadLocalsThreshold.set(size(threadLocalsField, threadLocalMapThresholdField));
-        copyOfInheritableThreadLocals.set(copy(inheritableThreadLocalsField));
-        copyOfInheritableThreadLocalsSize.set(size(inheritableThreadLocalsField, threadLocalMapSizeField));
-        copyOfInheritableThreadLocalsThreshold.set(size(inheritableThreadLocalsField, threadLocalMapThresholdField));
-    }
-
-    private static Reference<?>[] copy(Field field) {
+    private void restoreOldThreadLocals() {
         try {
-            Thread thread = Thread.currentThread();
-            Object threadLocals = field.get(thread);
-            if (threadLocals == null)
-                return null;
-            Reference<?>[] table = (Reference<?>[]) tableField.get(threadLocals);
-            return Arrays.copyOf(table, table.length);
-        } catch (IllegalAccessException e) {
-            throw new IllegalStateException("Access denied", e);
-        }
-    }
-
-    private static Integer size(Field field, Field sizeField) {
-        try {
-            Thread thread = Thread.currentThread();
-            Object threadLocals = field.get(thread);
-            if (threadLocals == null)
-                return null;
-            return (Integer) sizeField.get(threadLocals);
-        } catch (IllegalAccessException e) {
-            throw new IllegalStateException("Access denied", e);
-        }
-    }
-
-    private static void restoreOldThreadLocals() {
-        try {
-            restore(inheritableThreadLocalsField, copyOfInheritableThreadLocals.get(),
-                copyOfInheritableThreadLocalsSize.get(), copyOfInheritableThreadLocalsThreshold.get());
-            restore(threadLocalsField, copyOfThreadLocals.get(),
-                copyOfThreadLocalsSize.get(), copyOfThreadLocalsThreshold.get());
+            restore(inheritableThreadLocalsField, inheritableThreadLocalsCopy.references,
+                inheritableThreadLocalsCopy.size, inheritableThreadLocalsCopy.threshold);
+            restore(threadLocalsField, threadLocalsCopy.references,
+                threadLocalsCopy.size, threadLocalsCopy.threshold);
         } finally {
-            copyOfThreadLocals.remove();
-            copyOfInheritableThreadLocals.remove();
+            threadLocalsCopy = null;
+            inheritableThreadLocalsCopy = null;
         }
     }
 
-    private static void restore(Field field, Object value, Integer size, Integer threshold) {
-        try {
-            Thread thread = Thread.currentThread();
-            if (value == null) {
-                field.set(thread, null);
+    /**
+     * Helper class that encapsulates the state from a <tt>ThreadLocalMap</tt>
+     *
+     */
+    static class ThreadLocalMapCopy {
+        
+        private final Reference<?>[] references;
+        private final Integer size;
+        private final Integer threshold;
+        
+        private ThreadLocalMapCopy(Reference<?>[] references, Integer size, Integer threshold) {
+            this.references = references;
+            this.size = size;
+            this.threshold = threshold;
+        }
+        
+        void debug(String event, String mapName) {
+            if ( references != null ) {
+                ThreadLocalCleaner.LOG.debug("{}: {} {} references, size: {}, threshold: {}",
+                    mapName, event, references.length, size, threshold);
             } else {
-                final Object threadLocals = field.get(thread);
-                tableField.set(threadLocals, value);
-                threadLocalMapSizeField.set(threadLocals, size);
-                threadLocalMapThresholdField.set(threadLocals, threshold);
+                ThreadLocalCleaner.LOG.debug("{}: {} null references", mapName, event);
             }
-        } catch (IllegalAccessException e) {
-            throw new IllegalStateException("Access denied", e);
         }
     }
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java b/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java
index 922f18b..8f7a9a6 100644
--- a/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java
+++ b/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java
@@ -18,6 +18,8 @@ package org.apache.sling.commons.threads.impl;
 
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -35,6 +37,8 @@ public class ThreadPoolExecutorCleaningThreadLocals extends ThreadPoolExecutor {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+    private final ConcurrentMap<Thread, ThreadLocalCleaner> cleaners = new ConcurrentHashMap<>();
+    
     public ThreadPoolExecutorCleaningThreadLocals(int corePoolSize,
             int maximumPoolSize,
             long keepAliveTime,
@@ -48,26 +52,28 @@ public class ThreadPoolExecutorCleaningThreadLocals extends ThreadPoolExecutor {
         this.listener = listener;
     }
 
-    private static final ThreadLocal<ThreadLocalCleaner> local = new ThreadLocal<>();
-
     protected void beforeExecute(Thread t, Runnable r) {
         LOGGER.debug("Collecting changes to ThreadLocal for thread {} from now on...", t);
         try {
             ThreadLocalCleaner cleaner = new ThreadLocalCleaner(listener);
-            local.set(cleaner);
+            cleaners.put(t, cleaner);
         } catch (Throwable e) {
             LOGGER.warn("Could not set up thread local cleaner (most probably not a compliant JRE): {}", e, e);
         }
+        
+        super.beforeExecute(t, r);
     }
 
     protected void afterExecute(Runnable r, Throwable t) {
+        super.afterExecute(r, t);
+        
         LOGGER.debug("Cleaning up thread locals for thread {}...", Thread.currentThread());
-        ThreadLocalCleaner cleaner = local.get();
+        ThreadLocalCleaner cleaner = cleaners.remove(Thread.currentThread());
+
         if (cleaner != null) {
             cleaner.cleanup();
         } else {
             LOGGER.warn("Could not clean up thread locals in thread {} as the cleaner was not set up correctly", Thread.currentThread());
         }
-        local.remove();
     }
 }

-- 
To stop receiving notification emails like this one, please contact
rombert@apache.org.