You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:48 UTC

[34/50] [abbrv] kafka git commit: KAFKA-3611: Remove warnings when using reflections

KAFKA-3611: Remove warnings when using reflections

ewencp granders Can you take a look? Thanks!

Author: Liquan Pei <li...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1259 from Ishiihara/fix-warning


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/316389d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/316389d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/316389d6

Branch: refs/heads/0.10.0
Commit: 316389d6adfb1398e30ca2ce5d586ea94d3f3110
Parents: 57831a5
Author: Liquan Pei <li...@gmail.com>
Authored: Thu Apr 28 11:59:02 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Apr 28 11:59:02 2016 -0700

----------------------------------------------------------------------
 bin/kafka-run-class.sh                          |  6 +-
 checkstyle/import-control.xml                   |  1 +
 .../kafka/connect/runtime/AbstractHerder.java   | 10 ++-
 .../kafka/connect/util/ReflectionsUtil.java     | 90 ++++++++++++++++++++
 .../resources/ConnectorPluginsResourceTest.java |  3 +-
 5 files changed, 104 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/316389d6/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index f45d8d4..88d43be 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -34,7 +34,11 @@ fi
 shopt -s nullglob
 for dir in $base_dir/core/build/dependant-libs-${SCALA_VERSION}*;
 do
-  CLASSPATH=$CLASSPATH:$dir/*
+  if [ -z $CLASSPATH ] ; then
+    CLASSPATH=$dir/*
+  else
+    CLASSPATH=$CLASSPATH:$dir/*
+  fi
 done
 
 for file in $base_dir/examples/build/libs//kafka-examples*.jar;

http://git-wip-us.apache.org/repos/asf/kafka/blob/316389d6/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 39d4ca3..7a45515 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -221,6 +221,7 @@
 
     <subpackage name="util">
       <allow pkg="org.apache.kafka.connect" />
+      <allow pkg="org.reflections.vfs" />
       <!-- for annotations to avoid code duplication -->
       <allow pkg="com.fasterxml.jackson.annotation" />
     </subpackage>

http://git-wip-us.apache.org/repos/asf/kafka/blob/316389d6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index bd73589..83f56e2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -34,6 +34,7 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.tools.VerifiableSinkConnector;
 import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.ReflectionsUtil;
 import org.reflections.Reflections;
 import org.reflections.util.ClasspathHelper;
 import org.reflections.util.ConfigurationBuilder;
@@ -82,11 +83,10 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     protected final ConfigBackingStore configBackingStore;
 
     private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
-    private static final List<Class<? extends Connector>> SKIPPED_CONNECTORS = Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class);
     private static List<ConnectorPluginInfo> validConnectorPlugins;
     private static final Object LOCK = new Object();
     private Thread classPathTraverser;
-
+    private static final List<Class<? extends Connector>> EXCLUDES = Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class);
 
     public AbstractHerder(Worker worker,
                           String workerId,
@@ -263,10 +263,12 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
             if (validConnectorPlugins != null) {
                 return validConnectorPlugins;
             }
+            ReflectionsUtil.registerUrlTypes();
+            ConfigurationBuilder builder = new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath());
+            Reflections reflections = new Reflections(builder);
 
-            Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
             Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class);
-            connectorClasses.removeAll(SKIPPED_CONNECTORS);
+            connectorClasses.removeAll(EXCLUDES);
             List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>();
             for (Class<? extends Connector> connectorClass : connectorClasses) {
                 int mod = connectorClass.getModifiers();

http://git-wip-us.apache.org/repos/asf/kafka/blob/316389d6/connect/runtime/src/main/java/org/apache/kafka/connect/util/ReflectionsUtil.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ReflectionsUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ReflectionsUtil.java
new file mode 100644
index 0000000..fc3a0dd
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ReflectionsUtil.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+package org.apache.kafka.connect.util;
+
+import org.reflections.vfs.Vfs;
+import org.reflections.vfs.Vfs.Dir;
+import org.reflections.vfs.Vfs.File;
+import org.reflections.vfs.Vfs.UrlType;
+
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * CLASSPATH on OSX contains .mar and .jnilib file extensions. Vfs used by Reflections does not recognize
+ * urls with those extensions and log WARNs when scan them. Those WARNs can be eliminated by registering
+ * URL types before using reflection.
+ */
+public class ReflectionsUtil {
+
+    private static final String FILE_PROTOCOL = "file";
+    private static final List<String> ENDINGS = Arrays.asList(".mar", ".jnilib", "*");
+
+    public static void registerUrlTypes() {
+        final List<UrlType> urlTypes = new LinkedList<>();
+        urlTypes.add(new EmptyUrlType(ENDINGS));
+        urlTypes.addAll(Arrays.asList(Vfs.DefaultUrlTypes.values()));
+        Vfs.setDefaultURLTypes(urlTypes);
+    }
+
+    private static class EmptyUrlType implements UrlType {
+
+        private final List<String> endings;
+
+        private EmptyUrlType(final List<String> endings) {
+            this.endings = endings;
+        }
+
+        public boolean matches(URL url) {
+            final String protocol = url.getProtocol();
+            final String externalForm = url.toExternalForm();
+            if (!protocol.equals(FILE_PROTOCOL)) {
+                return false;
+            }
+            for (String ending : endings) {
+                if (externalForm.endsWith(ending)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        public Dir createDir(final URL url) throws Exception {
+            return emptyVfsDir(url);
+        }
+
+        private static Dir emptyVfsDir(final URL url) {
+            return new Dir() {
+                @Override
+                public String getPath() {
+                    return url.toExternalForm();
+                }
+
+                @Override
+                public Iterable<File> getFiles() {
+                    return Collections.emptyList();
+                }
+
+                @Override
+                public void close() {
+
+                }
+            };
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/316389d6/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 732db3d..241d331 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -21,8 +21,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
 
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Recommender;
 import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Recommender;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigDef.Width;
 import org.apache.kafka.connect.connector.Connector;
@@ -149,6 +149,7 @@ public class ConnectorPluginsResourceTest {
         assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName())));
     }
 
+
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     public static class ConnectorPluginsResourceTestConnector extends Connector {