You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/11/22 03:36:44 UTC
[samza] branch master updated: Allow users to configure URL's in
reflection-based-resolver. (#1222)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 1feaa50 Allow users to configure URL's in reflection-based-resolver. (#1222)
1feaa50 is described below
commit 1feaa50405a7b4a7ec1efada0cda20388a5f47b3
Author: shanthoosh <sp...@usc.edu>
AuthorDate: Thu Nov 21 19:36:34 2019 -0800
Allow users to configure URL's in reflection-based-resolver. (#1222)
---
.../samza/sql/udf/ReflectionBasedUdfResolver.java | 31 ++++++++++++++++++++--
.../udf/impl/TestReflectionBasedUdfResolver.java | 5 +---
2 files changed, 30 insertions(+), 6 deletions(-)
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java
index 43095b8..0a68a67 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/udf/ReflectionBasedUdfResolver.java
@@ -19,11 +19,14 @@
package org.apache.samza.sql.udf;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
@@ -37,6 +40,8 @@ import org.reflections.util.ConfigurationBuilder;
import org.reflections.util.FilterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.MalformedURLException;
+import java.net.URL;
/**
* An UDF resolver implementation that uses reflection to discover the subtypes
@@ -49,6 +54,8 @@ public class ReflectionBasedUdfResolver implements UdfResolver {
private static final Logger LOG = LoggerFactory.getLogger(ReflectionBasedUdfResolver.class);
private static final String CONFIG_PACKAGE_PREFIX = "samza.sql.udf.resolver.package.prefix";
+ private static final String CONFIG_PACKAGE_FILTER = "samza.sql.udf.resolver.package.filter";
+ private static final String CONFIG_RESOURCE_URLS = "samza.sql.udf.resolver.urls";
private final Set<UdfMetadata> udfs = new HashSet<>();
@@ -56,13 +63,17 @@ public class ReflectionBasedUdfResolver implements UdfResolver {
// Searching the entire classpath to discover the subtypes of SamzaSqlUdf is expensive. To reduce the search space,
// the search is limited to the set of package prefixes defined in the configuration.
String samzaSqlUdfPackagePrefix = udfConfig.getOrDefault(CONFIG_PACKAGE_PREFIX, "org.apache.samza");
+ String samzaSqlUdfFilter = udfConfig.getOrDefault(CONFIG_PACKAGE_FILTER, ".*");
// 1. Build the reflections instance with appropriate configuration.
ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
+ // Include only the SamzaSqlUDFClass implementations defined in the package prefix.
configurationBuilder.forPackages(samzaSqlUdfPackagePrefix.split(","));
+ configurationBuilder.filterInputsBy(new FilterBuilder().includePackage(samzaSqlUdfFilter));
+ // Manually add the resource urls if they're configured.
+ configurationBuilder.addUrls(getUrls(udfConfig));
+
configurationBuilder.addClassLoader(Thread.currentThread().getContextClassLoader());
- // Include only the SamzaSqlUDFClass implementations defined in the package prefix.
- configurationBuilder.filterInputsBy(new FilterBuilder().includePackage(samzaSqlUdfPackagePrefix.split(",")));
Reflections reflections = new Reflections(configurationBuilder);
// 2. Get all the sub-types of SamzaSqlUdf.
@@ -92,6 +103,22 @@ public class ReflectionBasedUdfResolver implements UdfResolver {
}
}
+ @VisibleForTesting
+ List<URL> getUrls(Config udfConfig) {
+ String urls = udfConfig.getOrDefault(CONFIG_RESOURCE_URLS, "");
+ List<URL> urlList = new ArrayList<>();
+ if (!urls.isEmpty()) {
+ for (String url : urls.split(",")) {
+ try {
+ urlList.add(new URL(url));
+ } catch (MalformedURLException e) {
+ LOG.error("Exception occurred when loading url: {}", url, e);
+ }
+ }
+ }
+ return urlList;
+ }
+
@Override
public Collection<UdfMetadata> getUdfs() {
return udfs;
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java b/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
index 5293e1d..d4c96af 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
@@ -21,9 +21,6 @@ package org.apache.samza.sql.udf.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.sql.interfaces.UdfMetadata;
@@ -48,7 +45,7 @@ public class TestReflectionBasedUdfResolver {
@Test
public void testUDfResolverShouldReturnAllUDFInClassPath() throws NoSuchMethodException {
- Config config = new MapConfig(ImmutableMap.of("samza.sql.udf.resolver.package.prefix", "org.apache.samza.sql.udf.impl"));
+ Config config = new MapConfig(ImmutableMap.of("samza.sql.udf.resolver.package.filter", "org.apache.samza.sql.udf.impl"));
ReflectionBasedUdfResolver reflectionBasedUdfResolver = new ReflectionBasedUdfResolver(config);
Collection<UdfMetadata> udfMetadataList = reflectionBasedUdfResolver.getUdfs();