You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/02/04 22:57:43 UTC

[kafka] branch trunk updated: KAFKA-6288: Broken symlink interrupts scanning of the plugin path

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 17aaff3  KAFKA-6288: Broken symlink interrupts scanning of the plugin path
17aaff3 is described below

commit 17aaff3606393b42d4e8ef5299141b5bb21300b0
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Sun Feb 4 14:57:25 2018 -0800

    KAFKA-6288: Broken symlink interrupts scanning of the plugin path
    
    Submitting a fail safe fix for rare IOExceptions on symbolic links.
    
    The fix is submitted without a test case since it does seem easy to reproduce such type of failures (just having a broken symbolic link does not reproduce the issue) and it's considered pretty low risk.
    
    If accepted, needs to be ported at least to 1.0, if not 0.11
    
    Author: Konstantine Karantasis <ko...@confluent.io>
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #4481 from kkonstantine/KAFKA-6288-Broken-symlink-interrupts-scanning-the-plugin-path
---
 checkstyle/suppressions.xml                        |  2 +-
 .../runtime/isolation/DelegatingClassLoader.java   | 29 +++++++++-------
 .../connect/runtime/isolation/PluginUtils.java     | 39 ++++++++++++++--------
 3 files changed, 44 insertions(+), 26 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f06155f..de1bdfd 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -57,7 +57,7 @@
               files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
 
     <suppress checks="NPathComplexity"
-              files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent|Values).java"/>
+              files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent|Values|PluginUtils).java"/>
 
     <!-- clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 0c133b9..8a44d4d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -52,6 +52,7 @@ import java.util.TreeSet;
 
 public class DelegatingClassLoader extends URLClassLoader {
     private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
+    private static final String CLASSPATH_NAME = "classpath";
 
     private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
     private final Map<String, String> aliases;
@@ -139,10 +140,23 @@ public class DelegatingClassLoader extends URLClassLoader {
     }
 
     protected void initLoaders() {
-        String path = null;
+        for (String configPath : pluginPaths) {
+            initPluginLoader(configPath);
+        }
+        // Finally add parent/system loader.
+        initPluginLoader(CLASSPATH_NAME);
+        addAllAliases();
+    }
+
+    private void initPluginLoader(String path) {
         try {
-            for (String configPath : pluginPaths) {
-                path = configPath;
+            if (CLASSPATH_NAME.equals(path)) {
+                scanUrlsAndAddPlugins(
+                        getParent(),
+                        ClasspathHelper.forJavaClassPath().toArray(new URL[0]),
+                        null
+                );
+            } else {
                 Path pluginPath = Paths.get(path).toAbsolutePath();
                 // Update for exception handling
                 path = pluginPath.toString();
@@ -156,14 +170,6 @@ public class DelegatingClassLoader extends URLClassLoader {
                     registerPlugin(pluginPath);
                 }
             }
-
-            path = "classpath";
-            // Finally add parent/system loader.
-            scanUrlsAndAddPlugins(
-                    getParent(),
-                    ClasspathHelper.forJavaClassPath().toArray(new URL[0]),
-                    null
-            );
         } catch (InvalidPathException | MalformedURLException e) {
             log.error("Invalid path in plugin path: {}. Ignoring.", path, e);
         } catch (IOException e) {
@@ -171,7 +177,6 @@ public class DelegatingClassLoader extends URLClassLoader {
         } catch (InstantiationException | IllegalAccessException e) {
             log.error("Could not instantiate plugins in: {}. Ignoring: {}", path, e);
         }
-        addAllAliases();
     }
 
     private void registerPlugin(Path pluginLocation)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index d85986e..d490bde 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -133,7 +133,7 @@ public class PluginUtils {
     private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream
             .Filter<Path>() {
         @Override
-        public boolean accept(Path path) throws IOException {
+        public boolean accept(Path path) {
             return Files.isDirectory(path) || isArchive(path) || isClassFile(path);
         }
     };
@@ -232,16 +232,29 @@ public class PluginUtils {
 
                 Path adjacent = neighbors.next();
                 if (Files.isSymbolicLink(adjacent)) {
-                    Path symlink = Files.readSymbolicLink(adjacent);
-                    // if symlink is absolute resolve() returns the absolute symlink itself
-                    Path parent = adjacent.getParent();
-                    if (parent == null) {
-                        continue;
-                    }
-                    Path absolute = parent.resolve(symlink).toRealPath();
-                    if (Files.exists(absolute)) {
-                        adjacent = absolute;
-                    } else {
+                    try {
+                        Path symlink = Files.readSymbolicLink(adjacent);
+                        // if symlink is absolute resolve() returns the absolute symlink itself
+                        Path parent = adjacent.getParent();
+                        if (parent == null) {
+                            continue;
+                        }
+                        Path absolute = parent.resolve(symlink).toRealPath();
+                        if (Files.exists(absolute)) {
+                            adjacent = absolute;
+                        } else {
+                            continue;
+                        }
+                    } catch (IOException e) {
+                        // See https://issues.apache.org/jira/browse/KAFKA-6288 for a reported
+                        // failure. Such a failure at this stage is not easily reproducible and
+                        // therefore an exception is caught and ignored after issuing a
+                        // warning. This allows class scanning to continue for non-broken plugins.
+                        log.warn(
+                                "Resolving symbolic link '{}' failed. Ignoring this path.",
+                                adjacent,
+                                e
+                        );
                         continue;
                     }
                 }
@@ -341,8 +354,8 @@ public class PluginUtils {
     }
 
     private static class DirectoryEntry {
-        DirectoryStream<Path> stream;
-        Iterator<Path> iterator;
+        final DirectoryStream<Path> stream;
+        final Iterator<Path> iterator;
 
         DirectoryEntry(DirectoryStream<Path> stream) {
             this.stream = stream;

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.