You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/05/14 19:11:39 UTC
svn commit: r1679407 - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/handler/
solr/core/src/java/org/apache/solr/response/ solr/server/ solr/solrj/
solr/solrj/src/java/org/apache/solr/client/solrj/io/ solr/so...
Author: jbernste
Date: Thu May 14 17:11:38 2015
New Revision: 1679407
URL: http://svn.apache.org/r1679407
Log:
SOLR-7377,SOLR-7524:Make Streaming Expressions Java 7 Compatible
Added:
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/
- copied from r1678743, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/
- copied from r1678743, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/
lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/
- copied from r1678743, lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/
Removed:
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscFieldComp.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescFieldComp.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/JSONTupleStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/MergeStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/MultiComp.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/PushBackStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/RankStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/ReducerStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/StreamContext.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/TupleStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/UniqueStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/CountStream.java
lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
lucene/dev/branches/branch_5x/solr/server/ (props changed)
lucene/dev/branches/branch_5x/solr/solrj/ (props changed)
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
lucene/dev/branches/branch_5x/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java?rev=1679407&r1=1679406&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java Thu May 14 17:11:38 2015
@@ -17,32 +17,79 @@
package org.apache.solr.handler;
+import java.util.Map.Entry;
+import java.net.URLDecoder;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
-import java.net.URLDecoder;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.solr.client.solrj.io.SolrClientCache;
-import org.apache.solr.client.solrj.io.TupleStream;
-import org.apache.solr.client.solrj.io.StreamContext;
-import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.ExpressibleStream;
+import org.apache.solr.client.solrj.io.stream.MergeStream;
+import org.apache.solr.client.solrj.io.stream.ParallelStream;
+import org.apache.solr.client.solrj.io.stream.RankStream;
+import org.apache.solr.client.solrj.io.stream.ReducerStream;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.UniqueStream;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.common.params.SolrParams;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.apache.solr.common.util.Base64;
-
public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
private SolrClientCache clientCache = new SolrClientCache();
-
+ private StreamFactory streamFactory = new StreamFactory();
+
public void inform(SolrCore core) {
-
- core.addCloseHook( new CloseHook() {
+
+ /* The stream factory will always contain the zkUrl for the given collection
+ * Adds default streams with their corresponding function names. These
+ * defaults can be overridden or added to in the solrConfig in the stream
+ * RequestHandler def. Example config override
+ * <lst name="streamFunctions">
+ * <str name="group">org.apache.solr.client.solrj.io.stream.ReducerStream</str>
+ * <str name="count">org.apache.solr.client.solrj.io.stream.CountStream</str>
+ * </lst>
+ * */
+
+ String defaultCollection = null;
+ String defaultZkhost = null;
+ CoreContainer coreContainer = core.getCoreDescriptor().getCoreContainer();
+
+ if(coreContainer.isZooKeeperAware()) {
+ defaultCollection = core.getCoreDescriptor().getCollectionName();
+ defaultZkhost = core.getCoreDescriptor().getCoreContainer().getZkController().getZkServerAddress();
+ streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost);
+ }
+
+ streamFactory
+ .withStreamFunction("search", CloudSolrStream.class)
+ .withStreamFunction("merge", MergeStream.class)
+ .withStreamFunction("unique", UniqueStream.class)
+ .withStreamFunction("top", RankStream.class)
+ .withStreamFunction("group", ReducerStream.class)
+ .withStreamFunction("parallel", ParallelStream.class);
+
+
+ // This pulls all the overrides and additions from the config
+ Object functionMappingsObj = initArgs.get("streamFunctions");
+ if(null != functionMappingsObj){
+ NamedList<?> functionMappings = (NamedList<?>)functionMappingsObj;
+ for(Entry<String,?> functionMapping : functionMappings){
+ Class<?> clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(), ExpressibleStream.class);
+ streamFactory.withStreamFunction(functionMapping.getKey(), clazz);
+ }
+ }
+
+ core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
//To change body of implemented methods use File | Settings | File Templates.
@@ -57,15 +104,23 @@ public class StreamHandler extends Reque
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
SolrParams params = req.getParams();
- String encodedStream = params.get("stream");
- encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
- byte[] bytes = Base64.base64ToByteArray(encodedStream);
- ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
- ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
- TupleStream tupleStream = (TupleStream)objectInputStream.readObject();
- int worker = params.getInt("workerID");
- int numWorkers = params.getInt("numWorkers");
+ boolean objectSerialize = params.getBool("objectSerialize", false);
+ TupleStream tupleStream = null;
+
+ if(objectSerialize) {
+ String encodedStream = params.get("stream");
+ encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
+ byte[] bytes = Base64.base64ToByteArray(encodedStream);
+ ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+ ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
+ tupleStream = (TupleStream)objectInputStream.readObject();
+ } else {
+ tupleStream = this.streamFactory.constructStream(params.get("stream"));
+ }
+
+ int worker = params.getInt("workerID", 0);
+ int numWorkers = params.getInt("numWorkers", 1);
StreamContext context = new StreamContext();
context.workerID = worker;
context.numWorkers = numWorkers;
@@ -81,4 +136,4 @@ public class StreamHandler extends Reque
public String getSource() {
return null;
}
-}
\ No newline at end of file
+}
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java?rev=1679407&r1=1679406&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java Thu May 14 17:11:38 2015
@@ -23,7 +23,7 @@ import java.util.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
-import org.apache.solr.client.solrj.io.TupleStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.EnumFieldValue;
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java?rev=1679407&r1=1679406&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java Thu May 14 17:11:38 2015
@@ -93,11 +93,11 @@ public class Tuple implements Cloneable
this.fields.put("_MAPS_", maps);
}
- public Map<String,Tuple> getMetrics() {
- return (Map<String,Tuple>)this.fields.get("_METRICS_");
+ public Map<String,Map> getMetrics() {
+ return (Map<String,Map>)this.fields.get("_METRICS_");
}
- public void setMetrics(Map<String, Tuple> metrics) {
+ public void setMetrics(Map<String, Map> metrics) {
this.fields.put("_METRICS_", metrics);
}
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java?rev=1679407&r1=1678743&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java Thu May 14 17:11:38 2015
@@ -1,5 +1,7 @@
package org.apache.solr.client.solrj.io.comp;
+import java.io.Serializable;
+
import org.apache.solr.client.solrj.io.Tuple;
/*
@@ -22,6 +24,6 @@ import org.apache.solr.client.solrj.io.T
/**
* Interface for use with a comparator lambda
*/
-public interface ComparatorLambda {
+public interface ComparatorLambda extends Serializable {
public int compare(Tuple leftTuple, Tuple rightTuple);
}
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java?rev=1679407&r1=1678743&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java Thu May 14 17:11:38 2015
@@ -70,22 +70,21 @@ public class FieldComparator extends Str
*/
private void assignComparator(){
if(ComparatorOrder.DESCENDING == order){
- // What black magic is this type intersection??
- // Because this class is serializable we need to make sure the lambda is also serializable.
- // This can be done by providing this type intersection on the definition of the lambda.
- // Why not do it in the lambda interface? Functional Interfaces don't allow extends clauses
- comparator = (ComparatorLambda & Serializable)(leftTuple, rightTuple) -> {
- Comparable leftComp = (Comparable)leftTuple.get(leftField);
- Comparable rightComp = (Comparable)rightTuple.get(rightField);
- return rightComp.compareTo(leftComp);
+ comparator = new ComparatorLambda() {
+ public int compare(Tuple leftTuple, Tuple rightTuple) {
+ Comparable leftComp = (Comparable)leftTuple.get(leftField);
+ Comparable rightComp = (Comparable)rightTuple.get(rightField);
+ return rightComp.compareTo(leftComp);
+ }
};
}
else{
- // See above for black magic reasoning.
- comparator = (ComparatorLambda & Serializable)(leftTuple, rightTuple) -> {
- Comparable leftComp = (Comparable)leftTuple.get(leftField);
- Comparable rightComp = (Comparable)rightTuple.get(rightField);
- return leftComp.compareTo(rightComp);
+ comparator = new ComparatorLambda() {
+ public int compare(Tuple leftTuple, Tuple rightTuple) {
+ Comparable leftComp = (Comparable)leftTuple.get(leftField);
+ Comparable rightComp = (Comparable)rightTuple.get(rightField);
+ return leftComp.compareTo(rightComp);
+ }
};
}
}
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java?rev=1679407&r1=1678743&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java Thu May 14 17:11:38 2015
@@ -54,7 +54,10 @@ public class StreamFactory implements Se
return this;
}
public String getCollectionZkHost(String collectionName){
- return this.collectionZkHosts.getOrDefault(collectionName, null);
+ if(this.collectionZkHosts.containsKey(collectionName)){
+ return this.collectionZkHosts.get(collectionName);
+ }
+ return null;
}
public Map<String,Class> getStreamFunctions(){
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml?rev=1679407&r1=1679406&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml Thu May 14 17:11:38 2015
@@ -52,8 +52,11 @@
<str name="wt">json</str>
<str name="distrib">false</str>
</lst>
+ <lst name="streamFunctions">
+ <str name="count">org.apache.solr.client.solrj.io.stream.CountStream</str>
+ </lst>
</requestHandler>
-
+
<requestDispatcher handleSelect="true" >
<requestParsers enableRemoteStreaming="false" multipartUploadLimitInKB="2048" />
</requestDispatcher>