You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/11/22 03:36:44 UTC

[samza] branch master updated: Allow users to configure URL's in reflection-based-resolver. (#1222)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 1feaa50  Allow users to configure URL's in reflection-based-resolver. (#1222)
1feaa50 is described below

commit 1feaa50405a7b4a7ec1efada0cda20388a5f47b3
Author: shanthoosh <sp...@usc.edu>
AuthorDate: Thu Nov 21 19:36:34 2019 -0800

    Allow users to configure URL's in reflection-based-resolver. (#1222)
---
 .../samza/sql/udf/ReflectionBasedUdfResolver.java  | 31 ++++++++++++++++++++--
 .../udf/impl/TestReflectionBasedUdfResolver.java   |  5 +---
 2 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java
index 43095b8..0a68a67 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java
@@ -19,11 +19,14 @@
 package org.apache.samza.sql.udf;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.reflect.MethodUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -37,6 +40,8 @@ import org.reflections.util.ConfigurationBuilder;
 import org.reflections.util.FilterBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.net.MalformedURLException;
+import java.net.URL;
 
 /**
  * An UDF resolver implementation that uses reflection to discover the subtypes
@@ -49,6 +54,8 @@ public class ReflectionBasedUdfResolver implements UdfResolver {
   private static final Logger LOG = LoggerFactory.getLogger(ReflectionBasedUdfResolver.class);
 
   private static final String CONFIG_PACKAGE_PREFIX = "samza.sql.udf.resolver.package.prefix";
+  private static final String CONFIG_PACKAGE_FILTER = "samza.sql.udf.resolver.package.filter";
+  private static final String CONFIG_RESOURCE_URLS = "samza.sql.udf.resolver.urls";
 
   private final Set<UdfMetadata> udfs  = new HashSet<>();
 
@@ -56,13 +63,17 @@ public class ReflectionBasedUdfResolver implements UdfResolver {
     // Searching the entire classpath to discover the subtypes of SamzaSqlUdf is expensive. To reduce the search space,
     // the search is limited to the set of package prefixes defined in the configuration.
     String samzaSqlUdfPackagePrefix = udfConfig.getOrDefault(CONFIG_PACKAGE_PREFIX, "org.apache.samza");
+    String samzaSqlUdfFilter = udfConfig.getOrDefault(CONFIG_PACKAGE_FILTER, ".*");
 
     // 1. Build the reflections instance  with appropriate configuration.
     ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
+    // Include only the SamzaSqlUDFClass implementations defined in the package prefix.
     configurationBuilder.forPackages(samzaSqlUdfPackagePrefix.split(","));
+    configurationBuilder.filterInputsBy(new FilterBuilder().includePackage(samzaSqlUdfFilter));
+    // Manually add the resource urls if they're configured.
+    configurationBuilder.addUrls(getUrls(udfConfig));
+
     configurationBuilder.addClassLoader(Thread.currentThread().getContextClassLoader());
-    // Include only the SamzaSqlUDFClass implementations defined in the package prefix.
-    configurationBuilder.filterInputsBy(new FilterBuilder().includePackage(samzaSqlUdfPackagePrefix.split(",")));
     Reflections reflections = new Reflections(configurationBuilder);
 
     // 2. Get all the sub-types of SamzaSqlUdf.
@@ -92,6 +103,22 @@ public class ReflectionBasedUdfResolver implements UdfResolver {
     }
   }
 
+  @VisibleForTesting
+  List<URL> getUrls(Config udfConfig) {
+    String urls = udfConfig.getOrDefault(CONFIG_RESOURCE_URLS, "");
+    List<URL> urlList = new ArrayList<>();
+    if (!urls.isEmpty()) {
+      for (String url : urls.split(",")) {
+        try {
+          urlList.add(new URL(url));
+        } catch (MalformedURLException e) {
+          LOG.error("Exception occurred when loading url: {}", url, e);
+        }
+      }
+    }
+    return urlList;
+  }
+
   @Override
   public Collection<UdfMetadata> getUdfs() {
     return udfs;
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java b/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
index 5293e1d..d4c96af 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
@@ -21,9 +21,6 @@ package org.apache.samza.sql.udf.impl;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.interfaces.UdfMetadata;
@@ -48,7 +45,7 @@ public class TestReflectionBasedUdfResolver {
 
   @Test
   public void testUDfResolverShouldReturnAllUDFInClassPath() throws NoSuchMethodException {
-    Config config = new MapConfig(ImmutableMap.of("samza.sql.udf.resolver.package.prefix", "org.apache.samza.sql.udf.impl"));
+    Config config = new MapConfig(ImmutableMap.of("samza.sql.udf.resolver.package.filter", "org.apache.samza.sql.udf.impl"));
     ReflectionBasedUdfResolver reflectionBasedUdfResolver = new ReflectionBasedUdfResolver(config);
     Collection<UdfMetadata> udfMetadataList = reflectionBasedUdfResolver.getUdfs();