You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2019/12/23 04:30:38 UTC
[lucene-solr] branch branch_8x updated: SOLR-14125 : Streaming
expressions to be loadable from packages
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new 04b0a5d SOLR-14125 : Streaming expressions to be loadable from packages
04b0a5d is described below
commit 04b0a5d8f36d18c4c7d1de88dfb30ea5ea3454ed
Author: noble <no...@apache.org>
AuthorDate: Mon Dec 23 15:30:11 2019 +1100
SOLR-14125 : Streaming expressions to be loadable from packages
---
solr/CHANGES.txt | 2 +
.../java/org/apache/solr/handler/GraphHandler.java | 25 ++++-
.../org/apache/solr/handler/SolrConfigHandler.java | 11 +-
.../org/apache/solr/handler/StreamHandler.java | 82 ++++++++++----
.../org/apache/solr/pkg/PackagePluginHolder.java | 7 +-
.../src/test-files/runtimecode/expressible.jar.bin | Bin 0 -> 1977 bytes
solr/core/src/test-files/runtimecode/sig.txt | 9 ++
.../src/test/org/apache/solr/pkg/TestPackages.java | 31 ++++--
.../client/solrj/io/stream/expr/StreamFactory.java | 121 ++++++++++++---------
.../org/apache/solr/client/solrj/io/TestLang.java | 3 +-
10 files changed, 206 insertions(+), 85 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 79e2494..47edf80 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -47,6 +47,8 @@ Improvements
---------------------
* SOLR-14042: Fix varargs precommit warnings (Andraas Salamon via Jason Gerlowski)
+* SOLR-14125: Make <expressible> plugins work with packages (noble)
+
Optimizations
---------------------
(No changes)
diff --git a/solr/core/src/java/org/apache/solr/handler/GraphHandler.java b/solr/core/src/java/org/apache/solr/handler/GraphHandler.java
index ed5ae0a..621a4db 100644
--- a/solr/core/src/java/org/apache/solr/handler/GraphHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/GraphHandler.java
@@ -19,6 +19,7 @@ package org.apache.solr.handler;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -27,7 +28,9 @@ import java.util.Map.Entry;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.graph.Traversal;
-import org.apache.solr.client.solrj.io.stream.*;
+import org.apache.solr.client.solrj.io.stream.ExceptionStream;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.DefaultStreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@@ -38,6 +41,8 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.PluginInfo;
+import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@@ -90,11 +95,21 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P
Object functionMappingsObj = initArgs.get("streamFunctions");
if(null != functionMappingsObj){
NamedList<?> functionMappings = (NamedList<?>)functionMappingsObj;
- for(Entry<String,?> functionMapping : functionMappings){
- Class<? extends Expressible> clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(),
- Expressible.class);
- streamFactory.withFunctionName(functionMapping.getKey(), clazz);
+ for(Entry<String,?> functionMapping : functionMappings) {
+ String key = functionMapping.getKey();
+ PluginInfo pluginInfo = new PluginInfo(key, Collections.singletonMap("class", functionMapping.getValue()));
+
+ if (pluginInfo.pkgName == null) {
+ Class<? extends Expressible> clazz = core.getResourceLoader().findClass((String) functionMapping.getValue(),
+ Expressible.class);
+ streamFactory.withFunctionName(key, clazz);
+ } else {
+ StreamHandler.ExpressibleHolder holder = new StreamHandler.ExpressibleHolder(pluginInfo, core, SolrConfig.classVsSolrPluginInfo.get(Expressible.class));
+ streamFactory.withFunctionName(key, () -> holder.getClazz());
+ }
+
}
+
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java b/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
index 004da31..09f2778 100644
--- a/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
@@ -45,6 +45,7 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
@@ -588,9 +589,17 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
private boolean verifyClass(CommandOperation op, String clz, Class expected) {
if (clz == null) return true;
if (!"true".equals(String.valueOf(op.getStr("runtimeLib", null)))) {
+ PluginInfo info = new PluginInfo(SolrRequestHandler.TYPE, op.getDataMap());
//this is not dynamically loaded so we can verify the class right away
try {
- req.getCore().createInitInstance(new PluginInfo(SolrRequestHandler.TYPE, op.getDataMap()), expected, clz, "");
+ if(expected == Expressible.class) {
+ SolrResourceLoader resourceLoader = info.pkgName == null ?
+ req.getCore().getResourceLoader() :
+ req.getCore().getResourceLoader(info.pkgName);
+ resourceLoader.findClass(info.className, expected);
+ } else {
+ req.getCore().createInitInstance(info, expected, clz, "");
+ }
} catch (Exception e) {
log.error("Error checking plugin : ", e);
op.addError(e.getMessage());
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index 1502190..9fd6681 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -33,7 +33,10 @@ import org.apache.solr.client.solrj.io.ModelCache;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
-import org.apache.solr.client.solrj.io.stream.*;
+import org.apache.solr.client.solrj.io.stream.DaemonStream;
+import org.apache.solr.client.solrj.io.stream.ExceptionStream;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@@ -44,6 +47,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
@@ -52,7 +56,10 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.PluginInfo;
+import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.pkg.PackageLoader;
+import org.apache.solr.pkg.PackagePluginHolder;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.security.AuthorizationContext;
@@ -87,7 +94,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
private SolrDefaultStreamFactory streamFactory = new SolrDefaultStreamFactory();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName;
- private Map<String,DaemonStream> daemons = Collections.synchronizedMap(new HashMap());
+ private Map<String, DaemonStream> daemons = Collections.synchronizedMap(new HashMap());
@Override
public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
@@ -118,8 +125,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
// This pulls all the overrides and additions from the config
List<PluginInfo> pluginInfos = core.getSolrConfig().getPluginInfos(Expressible.class.getName());
for (PluginInfo pluginInfo : pluginInfos) {
- Class<? extends Expressible> clazz = core.getMemClassLoader().findClass(pluginInfo.className, Expressible.class);
- streamFactory.withFunctionName(pluginInfo.name, clazz);
+ if (pluginInfo.pkgName != null) {
+ ExpressibleHolder holder = new ExpressibleHolder(pluginInfo, core, SolrConfig.classVsSolrPluginInfo.get(Expressible.class));
+ streamFactory.withFunctionName(pluginInfo.name,
+ () -> holder.getClazz());
+ } else {
+ Class<? extends Expressible> clazz = core.getMemClassLoader().findClass(pluginInfo.className, Expressible.class);
+ streamFactory.withFunctionName(pluginInfo.name, clazz);
+ }
}
core.addCloseHook(new CloseHook() {
@@ -135,6 +148,24 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
});
}
+ public static class ExpressibleHolder extends PackagePluginHolder {
+ private Class clazz;
+
+ public ExpressibleHolder(PluginInfo info, SolrCore core, SolrConfig.SolrPluginInfo pluginMeta) {
+ super(info, core, pluginMeta);
+ }
+
+ public Class getClazz() {
+ return clazz;
+ }
+
+ @Override
+ protected void initNewInstance(PackageLoader.Package.Version newest) {
+ clazz = newest.getLoader().findClass(pluginInfo.className, Expressible.class);
+ }
+
+ }
+
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
SolrParams params = req.getParams();
params = adjustParams(params);
@@ -218,6 +249,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
private void handleAdmin(SolrQueryRequest req, SolrQueryResponse rsp, SolrParams params) {
String action = params.get("action").toLowerCase(Locale.ROOT).trim();
+ if ("plugins".equals(action)) {
+ rsp.add("plugins", (MapWriter) ew -> streamFactory.getFunctionNames().forEach((s, classSupplier) -> ew.putNoEx(s, classSupplier.get().getName())));
+ return;
+ }
if ("list".equals(action)) {
Collection<DaemonStream> vals = daemons.values();
@@ -253,10 +288,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " killed on " + coreName));
break;
- default:
- rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " action '"
- + action + "' not recognized on " + coreName));
- break;
+ default:
+ rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " action '"
+ + action + "' not recognized on " + coreName));
+ break;
}
}
@@ -286,11 +321,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return null;
}
- public void close() {}
+ public void close() {
+ }
- public void open() {}
+ public void open() {
+ }
- public void setStreamContext(StreamContext context) {}
+ public void setStreamContext(StreamContext context) {
+ }
public List<TupleStream> children() {
return null;
@@ -333,11 +371,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return null;
}
- public void close() {}
+ public void close() {
+ }
- public void open() {}
+ public void open() {
+ }
- public void setStreamContext(StreamContext context) {}
+ public void setStreamContext(StreamContext context) {
+ }
public List<TupleStream> children() {
return null;
@@ -376,11 +417,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return null;
}
- public void close() {}
+ public void close() {
+ }
- public void open() {}
+ public void open() {
+ }
- public void setStreamContext(StreamContext context) {}
+ public void setStreamContext(StreamContext context) {
+ }
public List<TupleStream> children() {
return null;
@@ -460,9 +504,9 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
}
}
- private Map<String,List<String>> getCollectionShards(SolrParams params) {
+ private Map<String, List<String>> getCollectionShards(SolrParams params) {
- Map<String,List<String>> collectionShards = new HashMap();
+ Map<String, List<String>> collectionShards = new HashMap();
Iterator<String> paramsIt = params.getParameterNamesIterator();
while (paramsIt.hasNext()) {
String param = paramsIt.next();
diff --git a/solr/core/src/java/org/apache/solr/pkg/PackagePluginHolder.java b/solr/core/src/java/org/apache/solr/pkg/PackagePluginHolder.java
index 63facde..c6ddd4f 100644
--- a/solr/core/src/java/org/apache/solr/pkg/PackagePluginHolder.java
+++ b/solr/core/src/java/org/apache/solr/pkg/PackagePluginHolder.java
@@ -104,12 +104,17 @@ public class PackagePluginHolder<T> extends PluginBag.PluginHolder<T> {
log.info("loading plugin: {} -> {} using package {}:{}",
pluginInfo.type, pluginInfo.name, pkg.name(), newest.getVersion());
+ initNewInstance(newest);
+ pkgVersion = newest;
+
+ }
+
+ protected void initNewInstance(PackageLoader.Package.Version newest) {
Object instance = SolrCore.createInstance(pluginInfo.className,
pluginMeta.clazz, pluginMeta.getCleanTag(), core, newest.getLoader());
PluginBag.initInstance(instance, pluginInfo);
T old = inst;
inst = (T) instance;
- pkgVersion = newest;
if (old instanceof AutoCloseable) {
AutoCloseable closeable = (AutoCloseable) old;
try {
diff --git a/solr/core/src/test-files/runtimecode/expressible.jar.bin b/solr/core/src/test-files/runtimecode/expressible.jar.bin
new file mode 100644
index 0000000..50d61b9
Binary files /dev/null and b/solr/core/src/test-files/runtimecode/expressible.jar.bin differ
diff --git a/solr/core/src/test-files/runtimecode/sig.txt b/solr/core/src/test-files/runtimecode/sig.txt
index f354b5f..74bb942 100644
--- a/solr/core/src/test-files/runtimecode/sig.txt
+++ b/solr/core/src/test-files/runtimecode/sig.txt
@@ -65,6 +65,10 @@ openssl dgst -sha1 -sign ../cryptokeys/priv_key512.pem testurp_v2.jar.bin | open
P/ptFXRvQMd4oKPvadSpd+A9ffwY3gcex5GVFVRy3df0/OF8XT5my8rQz7FZva+2ORbWxdXS8NKwNrbPVHLGXw==
+openssl dgst -sha1 -sign ../cryptokeys/priv_key512.pem expressible.jar.bin | openssl enc -base64 | openssl enc -base64 | tr -d \\n | sed
+
+ZOT11arAiPmPZYOHzqodiNnxO9pRyRozWZEBX8XGjU1/HJptFnZK+DI7eXnUtbNaMcbXE2Ze8hh4M/eGyhY8BQ==
+
====================sha512====================
openssl dgst -sha512 runtimelibs.jar.bin
@@ -95,6 +99,11 @@ openssl dgst -sha512 testurp_v1.jar.bin
openssl dgst -sha512 testurp_v2.jar.bin
5c4c0c454a032916e48a1c14a0fecbd6658658a66aedec5168b7222f2e3c0c63fbe09637238a9325ce2e95a2c8521834397a97701ead46c681aa20c9fccb6654
+
+openssl dgst -sha512 expressible.jar.bin
+
+3474a1414c8329c71ef5db2d3eb6e870363bdd7224a836aab561dccf5e8bcee4974ac799e72398c7e0b0c01972bab1c7454c8a4e791a8865bb676c0440627388
+
=============sha256============================
openssl dgst -sha256 runtimelibs.jar.bin
diff --git a/solr/core/src/test/org/apache/solr/pkg/TestPackages.java b/solr/core/src/test/org/apache/solr/pkg/TestPackages.java
index baa8a99..b714310 100644
--- a/solr/core/src/test/org/apache/solr/pkg/TestPackages.java
+++ b/solr/core/src/test/org/apache/solr/pkg/TestPackages.java
@@ -92,6 +92,7 @@ public class TestPackages extends SolrCloudTestCase {
String FILE3 = "/mypkg/runtimelibs_v3.jar";
String URP1 = "/mypkg/testurpv1.jar";
String URP2 = "/mypkg/testurpv2.jar";
+ String EXPR1 = "/mypkg/expressible.jar";
String COLLECTION_NAME = "testPluginLoadingColl";
byte[] derFile = readFile("cryptokeys/pub_key512.der");
cluster.getZkClient().makePath("/keys/exe", true);
@@ -102,10 +103,13 @@ public class TestPackages extends SolrCloudTestCase {
postFileAndWait(cluster, "runtimecode/testurp_v1.jar.bin", URP1,
"h6UmMzuPqu4hQFGLBMJh/6kDSEXpJlgLsQDXx0KuxXWkV5giilRP57K3towiJRh2J+rqihqIghNCi3YgzgUnWQ==");
+ postFileAndWait(cluster, "runtimecode/expressible.jar.bin", EXPR1,
+ "ZOT11arAiPmPZYOHzqodiNnxO9pRyRozWZEBX8XGjU1/HJptFnZK+DI7eXnUtbNaMcbXE2Ze8hh4M/eGyhY8BQ==");
+
Package.AddVersion add = new Package.AddVersion();
add.version = "1.0";
add.pkg = "mypkg";
- add.files = Arrays.asList(new String[]{FILE1, URP1});
+ add.files = Arrays.asList(new String[]{FILE1, URP1, EXPR1});
V2Request req = new V2Request.Builder("/cluster/package")
.forceV2(true)
.withMethod(SolrRequest.METHOD.POST)
@@ -134,7 +138,8 @@ public class TestPackages extends SolrCloudTestCase {
"'create-requesthandler' : { 'name' : '/runtime', 'class': 'mypkg:org.apache.solr.core.RuntimeLibReqHandler' }," +
"'create-searchcomponent' : { 'name' : 'get', 'class': 'mypkg:org.apache.solr.core.RuntimeLibSearchComponent' }," +
"'create-queryResponseWriter' : { 'name' : 'json1', 'class': 'mypkg:org.apache.solr.core.RuntimeLibResponseWriter' }" +
- "'create-updateProcessor' : { 'name' : 'myurp', 'class': 'mypkg:org.apache.solr.update.TestVersionedURP' }" +
+ "'create-updateProcessor' : { 'name' : 'myurp', 'class': 'mypkg:org.apache.solr.update.TestVersionedURP' }," +
+ " create-expressible: {name: mincopy , class: 'mypkg:org.apache.solr.client.solrj.io.stream.metrics.MinCopyMetric'}" +
"}";
cluster.getSolrClient().request(new ConfigRequest(payload) {
@Override
@@ -159,6 +164,20 @@ public class TestPackages extends SolrCloudTestCase {
COLLECTION_NAME, "updateProcessor", "myurp",
"mypkg", "1.0" );
+ verifyCmponent(cluster.getSolrClient(),
+ COLLECTION_NAME, "expressible", "mincopy",
+ "mypkg", "1.0" );
+
+ TestDistribPackageStore.assertResponseValues(10,
+ cluster.getSolrClient() ,
+ new GenericSolrRequest(SolrRequest.METHOD.GET,
+ "/stream", new MapSolrParams((Map) Utils.makeMap("collection", COLLECTION_NAME,
+ WT, JAVABIN,
+ "action", "plugins"
+ ))), Utils.makeMap(
+ ":plugins:mincopy", "org.apache.solr.client.solrj.io.stream.metrics.MinCopyMetric"
+ ));
+
UpdateRequest ur = new UpdateRequest();
ur.add(new SolrInputDocument("id", "1"));
ur.setParam("processor", "myurp");
@@ -192,7 +211,7 @@ public class TestPackages extends SolrCloudTestCase {
"P/ptFXRvQMd4oKPvadSpd+A9ffwY3gcex5GVFVRy3df0/OF8XT5my8rQz7FZva+2ORbWxdXS8NKwNrbPVHLGXw==");
//add the version using package API
add.version = "1.1";
- add.files = Arrays.asList(new String[]{FILE2,URP2});
+ add.files = Arrays.asList(new String[]{FILE2,URP2, EXPR1});
req.process(cluster.getSolrClient());
verifyCmponent(cluster.getSolrClient(),
@@ -222,7 +241,7 @@ public class TestPackages extends SolrCloudTestCase {
"a400n4T7FT+2gM0SC6+MfSOExjud8MkhTSFylhvwNjtWwUgKdPFn434Wv7Qc4QEqDVLhQoL3WqYtQmLPti0G4Q==");
add.version = "2.1";
- add.files = Arrays.asList(new String[]{FILE3, URP2});
+ add.files = Arrays.asList(new String[]{FILE3, URP2, EXPR1});
req.process(cluster.getSolrClient());
//now let's verify that the classes are updated
@@ -304,7 +323,7 @@ public class TestPackages extends SolrCloudTestCase {
}.process(cluster.getSolrClient()) ;
add.version = "2.1";
- add.files = Arrays.asList(new String[]{FILE3, URP2});
+ add.files = Arrays.asList(new String[]{FILE3, URP2, EXPR1});
req.process(cluster.getSolrClient());
//the collections mypkg is set to use version 1.1
@@ -368,7 +387,6 @@ public class TestPackages extends SolrCloudTestCase {
}
}
-
private void executeReq(String uri, JettySolrRunner jetty, Utils.InputStreamConsumer parser, Map expected) throws Exception {
try(HttpSolrClient client = (HttpSolrClient) jetty.newClient()){
TestDistribPackageStore.assertResponseValues(10,
@@ -390,7 +408,6 @@ public class TestPackages extends SolrCloudTestCase {
"componentName", componentName,
"meta", "true"));
- String s = "queryResponseWriter";
GenericSolrRequest req1 = new GenericSolrRequest(SolrRequest.METHOD.GET,
"/config/" + componentType, params);
TestDistribPackageStore.assertResponseValues(10,
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
index 0ba5d5c..4e176dd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
@@ -21,11 +21,13 @@ import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
@@ -44,7 +46,7 @@ import org.apache.solr.client.solrj.io.stream.metrics.Metric;
public class StreamFactory implements Serializable {
private transient HashMap<String,String> collectionZkHosts;
- private transient HashMap<String,Class<? extends Expressible>> functionNames;
+ private transient HashMap<String,Supplier<Class<? extends Expressible>>> functionNames;
private transient String defaultZkHost;
private transient String defaultCollection;
@@ -79,14 +81,20 @@ public class StreamFactory implements Serializable {
return null;
}
- public Map<String,Class<? extends Expressible>> getFunctionNames(){
- return functionNames;
+ public Map<String, Supplier<Class<? extends Expressible>>> getFunctionNames() {
+ return Collections.unmodifiableMap(functionNames);
}
public StreamFactory withFunctionName(String functionName, Class<? extends Expressible> clazz){
+ this.functionNames.put(functionName, () -> clazz);
+ return this;
+ }
+
+ public StreamFactory withFunctionName(String functionName, Supplier< Class<? extends Expressible>> clazz){
this.functionNames.put(functionName, clazz);
return this;
}
-
+
+
public StreamExpressionParameter getOperand(StreamExpression expression, int parameterIndex){
if(null == expression.getParameters() || parameterIndex >= expression.getParameters().size()){
return null;
@@ -173,14 +181,15 @@ public class StreamFactory implements Serializable {
List<StreamExpression> allStreamExpressions = getExpressionOperands(expression);
parameterLoop:
- for(StreamExpression streamExpression : allStreamExpressions){
- if(functionNames.containsKey(streamExpression.getFunctionName())){
- for(Class clazz : clazzes){
- if(!clazz.isAssignableFrom(functionNames.get(streamExpression.getFunctionName()))){
+ for(StreamExpression streamExpression : allStreamExpressions) {
+ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(streamExpression.getFunctionName());
+ if (classSupplier != null) {
+ for (Class clazz : clazzes) {
+ if (!clazz.isAssignableFrom(classSupplier.get())) {
continue parameterLoop;
}
}
-
+
matchingStreamExpressions.add(streamExpression);
}
}
@@ -189,9 +198,10 @@ public class StreamFactory implements Serializable {
}
public boolean doesRepresentTypes(StreamExpression expression, Class ... clazzes){
- if(functionNames.containsKey(expression.getFunctionName())){
+ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(expression.getFunctionName());
+ if(classSupplier != null){
for(Class clazz : clazzes){
- if(!clazz.isAssignableFrom(functionNames.get(expression.getFunctionName()))){
+ if(!clazz.isAssignableFrom(classSupplier.get())){
return false;
}
}
@@ -203,7 +213,7 @@ public class StreamFactory implements Serializable {
public int getIntOperand(StreamExpression expression, String paramName, Integer defaultValue) throws IOException{
StreamExpressionNamedParameter param = getNamedOperand(expression, paramName);
-
+
if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){
if(null != defaultValue){
return defaultValue;
@@ -241,10 +251,12 @@ public class StreamFactory implements Serializable {
}
public TupleStream constructStream(StreamExpression expression) throws IOException{
String function = expression.getFunctionName();
- if(functionNames.containsKey(function)){
- Class<? extends Expressible> clazz = functionNames.get(function);
+ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
+
+ if(classSupplier != null){
+ Class<? extends Expressible> clazz = classSupplier.get();
if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){
- return (TupleStream)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
+ return (TupleStream)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
}
}
@@ -256,10 +268,11 @@ public class StreamFactory implements Serializable {
}
public Metric constructMetric(StreamExpression expression) throws IOException{
String function = expression.getFunctionName();
- if(functionNames.containsKey(function)){
- Class<? extends Expressible> clazz = functionNames.get(function);
+ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
+ if(classSupplier != null){
+ Class<? extends Expressible> clazz = classSupplier.get();
if(Expressible.class.isAssignableFrom(clazz) && Metric.class.isAssignableFrom(clazz)){
- return (Metric)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
+ return (Metric)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
}
}
@@ -356,16 +369,18 @@ public class StreamFactory implements Serializable {
public Metric constructOperation(String expressionClause) throws IOException {
return constructMetric(StreamExpressionParser.parse(expressionClause));
}
- public StreamOperation constructOperation(StreamExpression expression) throws IOException{
+
+ public StreamOperation constructOperation(StreamExpression expression) throws IOException {
String function = expression.getFunctionName();
- if(functionNames.containsKey(function)){
- Class<? extends Expressible> clazz = functionNames.get(function);
- if(Expressible.class.isAssignableFrom(clazz) && StreamOperation.class.isAssignableFrom(clazz)){
- return (StreamOperation)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
+ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
+ if (classSupplier != null) {
+ Class<? extends Expressible> clazz = classSupplier.get();
+ if (Expressible.class.isAssignableFrom(clazz) && StreamOperation.class.isAssignableFrom(clazz)) {
+ return (StreamOperation) createInstance(clazz, new Class[]{StreamExpression.class, StreamFactory.class}, new Object[]{expression, this});
}
}
-
- throw new IOException(String.format(Locale.ROOT,"Invalid operation expression %s - function '%s' is unknown (not mapped to a valid StreamOperation)", expression, expression.getFunctionName()));
+
+ throw new IOException(String.format(Locale.ROOT, "Invalid operation expression %s - function '%s' is unknown (not mapped to a valid StreamOperation)", expression, expression.getFunctionName()));
}
public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(String expressionClause) throws IOException {
@@ -373,33 +388,37 @@ public class StreamFactory implements Serializable {
}
public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(StreamExpression expression) throws IOException{
String function = expression.getFunctionName();
- if(functionNames.containsKey(function)){
- Class<? extends Expressible> clazz = functionNames.get(function);
+ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
+
+ if(classSupplier != null){
+ Class<? extends Expressible> clazz = classSupplier.get();
if(Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)){
- return (org.apache.solr.client.solrj.io.eval.StreamEvaluator)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
+ return (org.apache.solr.client.solrj.io.eval.StreamEvaluator)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
}
}
throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName()));
}
- public boolean isStream(StreamExpression expression) throws IOException{
+ public boolean isStream(StreamExpression expression) throws IOException {
String function = expression.getFunctionName();
- if(functionNames.containsKey(function)){
- Class<? extends Expressible> clazz = functionNames.get(function);
- if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){
+ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
+ if (classSupplier != null) {
+ Class<? extends Expressible> clazz = classSupplier.get();
+ if (Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)) {
return true;
}
}
return false;
}
-
- public boolean isEvaluator(StreamExpression expression) throws IOException{
+
+ public boolean isEvaluator(StreamExpression expression) throws IOException {
String function = expression.getFunctionName();
- if(functionNames.containsKey(function)){
- Class<? extends Expressible> clazz = functionNames.get(function);
- if(Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)){
+ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
+ if (classSupplier != null) {
+ Class<? extends Expressible> clazz = classSupplier.get();
+ if (Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)) {
return true;
}
}
@@ -407,35 +426,35 @@ public class StreamFactory implements Serializable {
return false;
}
- public <T> T createInstance(Class<T> clazz, Class<?>[] paramTypes, Object[] params) throws IOException{
+ public <T> T createInstance(Class<T> clazz, Class<?>[] paramTypes, Object[] params) throws IOException {
Constructor<T> ctor;
try {
ctor = clazz.getConstructor(paramTypes);
return ctor.newInstance(params);
-
+
} catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- if(null != e.getMessage()){
- throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s caused by %s", clazz.getName(), e.getMessage()),e);
- }
- else{
- throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s", clazz.getName()),e);
+ if (null != e.getMessage()) {
+ throw new IOException(String.format(Locale.ROOT, "Unable to construct instance of %s caused by %s", clazz.getName(), e.getMessage()), e);
+ } else {
+ throw new IOException(String.format(Locale.ROOT, "Unable to construct instance of %s", clazz.getName()), e);
}
}
}
-
- public String getFunctionName(Class<? extends Expressible> clazz) throws IOException{
- for(Entry<String,Class<? extends Expressible>> entry : functionNames.entrySet()){
- if(entry.getValue() == clazz){
+
+ public String getFunctionName(Class<? extends Expressible> clazz) throws IOException {
+ for (Entry<String, Supplier<Class<? extends Expressible>>> entry : functionNames.entrySet()) {
+ if (entry.getValue().get() == clazz) {
return entry.getKey();
}
}
-
+
+
throw new IOException(String.format(Locale.ROOT, "Unable to find function name for class '%s'", clazz.getName()));
}
-
+
public Object constructPrimitiveObject(String original){
String lower = original.trim().toLowerCase(Locale.ROOT);
-
+
if("null".equals(lower)){ return null; }
if("true".equals(lower) || "false".equals(lower)){ return Boolean.parseBoolean(lower); }
try{ return Long.valueOf(original); } catch(Exception ignored){};
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
index 7e510f4..025f6b2 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Supplier;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
@@ -94,7 +95,7 @@ public class TestLang extends SolrTestCase {
}
StreamFactory factory = new StreamFactory();
Lang.register(factory);
- Map<String,Class<? extends Expressible>> registeredFunctions = factory.getFunctionNames();
+ Map<String, Supplier<Class<? extends Expressible>>> registeredFunctions = factory.getFunctionNames();
//Check that each function that is expected is registered.
for(String func : functions) {