You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/02/04 22:57:43 UTC
[kafka] branch trunk updated: KAFKA-6288: Broken symlink interrupts
scanning of the plugin path
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 17aaff3 KAFKA-6288: Broken symlink interrupts scanning of the plugin path
17aaff3 is described below
commit 17aaff3606393b42d4e8ef5299141b5bb21300b0
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Sun Feb 4 14:57:25 2018 -0800
KAFKA-6288: Broken symlink interrupts scanning of the plugin path
Submitting a fail safe fix for rare IOExceptions on symbolic links.
The fix is submitted without a test case since it does seem easy to reproduce such type of failures (just having a broken symbolic link does not reproduce the issue) and it's considered pretty low risk.
If accepted, needs to be ported at least to 1.0, if not 0.11
Author: Konstantine Karantasis <ko...@confluent.io>
Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #4481 from kkonstantine/KAFKA-6288-Broken-symlink-interrupts-scanning-the-plugin-path
---
checkstyle/suppressions.xml | 2 +-
.../runtime/isolation/DelegatingClassLoader.java | 29 +++++++++-------
.../connect/runtime/isolation/PluginUtils.java | 39 ++++++++++++++--------
3 files changed, 44 insertions(+), 26 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f06155f..de1bdfd 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -57,7 +57,7 @@
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
<suppress checks="NPathComplexity"
- files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent|Values).java"/>
+ files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent|Values|PluginUtils).java"/>
<!-- clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 0c133b9..8a44d4d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -52,6 +52,7 @@ import java.util.TreeSet;
public class DelegatingClassLoader extends URLClassLoader {
private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
+ private static final String CLASSPATH_NAME = "classpath";
private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
private final Map<String, String> aliases;
@@ -139,10 +140,23 @@ public class DelegatingClassLoader extends URLClassLoader {
}
protected void initLoaders() {
- String path = null;
+ for (String configPath : pluginPaths) {
+ initPluginLoader(configPath);
+ }
+ // Finally add parent/system loader.
+ initPluginLoader(CLASSPATH_NAME);
+ addAllAliases();
+ }
+
+ private void initPluginLoader(String path) {
try {
- for (String configPath : pluginPaths) {
- path = configPath;
+ if (CLASSPATH_NAME.equals(path)) {
+ scanUrlsAndAddPlugins(
+ getParent(),
+ ClasspathHelper.forJavaClassPath().toArray(new URL[0]),
+ null
+ );
+ } else {
Path pluginPath = Paths.get(path).toAbsolutePath();
// Update for exception handling
path = pluginPath.toString();
@@ -156,14 +170,6 @@ public class DelegatingClassLoader extends URLClassLoader {
registerPlugin(pluginPath);
}
}
-
- path = "classpath";
- // Finally add parent/system loader.
- scanUrlsAndAddPlugins(
- getParent(),
- ClasspathHelper.forJavaClassPath().toArray(new URL[0]),
- null
- );
} catch (InvalidPathException | MalformedURLException e) {
log.error("Invalid path in plugin path: {}. Ignoring.", path, e);
} catch (IOException e) {
@@ -171,7 +177,6 @@ public class DelegatingClassLoader extends URLClassLoader {
} catch (InstantiationException | IllegalAccessException e) {
log.error("Could not instantiate plugins in: {}. Ignoring: {}", path, e);
}
- addAllAliases();
}
private void registerPlugin(Path pluginLocation)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index d85986e..d490bde 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -133,7 +133,7 @@ public class PluginUtils {
private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream
.Filter<Path>() {
@Override
- public boolean accept(Path path) throws IOException {
+ public boolean accept(Path path) {
return Files.isDirectory(path) || isArchive(path) || isClassFile(path);
}
};
@@ -232,16 +232,29 @@ public class PluginUtils {
Path adjacent = neighbors.next();
if (Files.isSymbolicLink(adjacent)) {
- Path symlink = Files.readSymbolicLink(adjacent);
- // if symlink is absolute resolve() returns the absolute symlink itself
- Path parent = adjacent.getParent();
- if (parent == null) {
- continue;
- }
- Path absolute = parent.resolve(symlink).toRealPath();
- if (Files.exists(absolute)) {
- adjacent = absolute;
- } else {
+ try {
+ Path symlink = Files.readSymbolicLink(adjacent);
+ // if symlink is absolute resolve() returns the absolute symlink itself
+ Path parent = adjacent.getParent();
+ if (parent == null) {
+ continue;
+ }
+ Path absolute = parent.resolve(symlink).toRealPath();
+ if (Files.exists(absolute)) {
+ adjacent = absolute;
+ } else {
+ continue;
+ }
+ } catch (IOException e) {
+ // See https://issues.apache.org/jira/browse/KAFKA-6288 for a reported
+ // failure. Such a failure at this stage is not easily reproducible and
+ // therefore an exception is caught and ignored after issuing a
+ // warning. This allows class scanning to continue for non-broken plugins.
+ log.warn(
+ "Resolving symbolic link '{}' failed. Ignoring this path.",
+ adjacent,
+ e
+ );
continue;
}
}
@@ -341,8 +354,8 @@ public class PluginUtils {
}
private static class DirectoryEntry {
- DirectoryStream<Path> stream;
- Iterator<Path> iterator;
+ final DirectoryStream<Path> stream;
+ final Iterator<Path> iterator;
DirectoryEntry(DirectoryStream<Path> stream) {
this.stream = stream;
--
To stop receiving notification emails like this one, please contact
ewencp@apache.org.