You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/05/14 19:11:39 UTC

svn commit: r1679407 - in /lucene/dev/branches/branch_5x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/handler/ solr/core/src/java/org/apache/solr/response/ solr/server/ solr/solrj/ solr/solrj/src/java/org/apache/solr/client/solrj/io/ solr/so...

Author: jbernste
Date: Thu May 14 17:11:38 2015
New Revision: 1679407

URL: http://svn.apache.org/r1679407
Log:
SOLR-7377,SOLR-7524:Make Streaming Expressions Java 7 Compatible

Added:
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/
      - copied from r1678743, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/
      - copied from r1678743, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/
    lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/
      - copied from r1678743, lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/
Removed:
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscFieldComp.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescFieldComp.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/JSONTupleStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/MergeStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/MultiComp.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/PushBackStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/RankStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/ReducerStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/StreamContext.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/TupleStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/UniqueStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/CountStream.java
    lucene/dev/branches/branch_5x/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java
Modified:
    lucene/dev/branches/branch_5x/   (props changed)
    lucene/dev/branches/branch_5x/solr/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
    lucene/dev/branches/branch_5x/solr/server/   (props changed)
    lucene/dev/branches/branch_5x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
    lucene/dev/branches/branch_5x/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java?rev=1679407&r1=1679406&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/StreamHandler.java Thu May 14 17:11:38 2015
@@ -17,32 +17,79 @@
 
 package org.apache.solr.handler;
 
+import java.util.Map.Entry;
+import java.net.URLDecoder;
 import java.io.ByteArrayInputStream;
 import java.io.ObjectInputStream;
-import java.net.URLDecoder;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.solr.client.solrj.io.SolrClientCache;
-import org.apache.solr.client.solrj.io.TupleStream;
-import org.apache.solr.client.solrj.io.StreamContext;
-import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.ExpressibleStream;
+import org.apache.solr.client.solrj.io.stream.MergeStream;
+import org.apache.solr.client.solrj.io.stream.ParallelStream;
+import org.apache.solr.client.solrj.io.stream.RankStream;
+import org.apache.solr.client.solrj.io.stream.ReducerStream;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.UniqueStream;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.util.plugin.SolrCoreAware;
 import org.apache.solr.common.util.Base64;
 
-
 public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
 
   private SolrClientCache clientCache = new SolrClientCache();
-
+  private StreamFactory streamFactory = new StreamFactory();
+  
   public void inform(SolrCore core) {
-
-    core.addCloseHook( new CloseHook() {
+    
+    /* The stream factory will always contain the zkUrl for the given collection
+     * Adds default streams with their corresponding function names. These 
+     * defaults can be overridden or added to in the solrConfig in the stream 
+     * RequestHandler def. Example config override
+     *  <lst name="streamFunctions">
+     *    <str name="group">org.apache.solr.client.solrj.io.stream.ReducerStream</str>
+     *    <str name="count">org.apache.solr.client.solrj.io.stream.CountStream</str>
+     *  </lst>
+     * */
+
+    String defaultCollection = null;
+    String defaultZkhost     = null;
+    CoreContainer coreContainer = core.getCoreDescriptor().getCoreContainer();
+
+    if(coreContainer.isZooKeeperAware()) {
+      defaultCollection = core.getCoreDescriptor().getCollectionName();
+      defaultZkhost = core.getCoreDescriptor().getCoreContainer().getZkController().getZkServerAddress();
+      streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost);
+    }
+
+     streamFactory
+      .withStreamFunction("search", CloudSolrStream.class)
+      .withStreamFunction("merge", MergeStream.class)
+      .withStreamFunction("unique", UniqueStream.class)
+      .withStreamFunction("top", RankStream.class)
+      .withStreamFunction("group", ReducerStream.class)
+      .withStreamFunction("parallel", ParallelStream.class);
+
+    
+    // This pulls all the overrides and additions from the config
+    Object functionMappingsObj = initArgs.get("streamFunctions");
+    if(null != functionMappingsObj){
+      NamedList<?> functionMappings = (NamedList<?>)functionMappingsObj;
+      for(Entry<String,?> functionMapping : functionMappings){
+        Class<?> clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(), ExpressibleStream.class);
+        streamFactory.withStreamFunction(functionMapping.getKey(), clazz);
+      }
+    }
+        
+    core.addCloseHook(new CloseHook() {
       @Override
       public void preClose(SolrCore core) {
         //To change body of implemented methods use File | Settings | File Templates.
@@ -57,15 +104,23 @@ public class StreamHandler extends Reque
 
   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
     SolrParams params = req.getParams();
-    String encodedStream = params.get("stream");
-    encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
-    byte[] bytes = Base64.base64ToByteArray(encodedStream);
-    ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
-    ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
-    TupleStream tupleStream = (TupleStream)objectInputStream.readObject();
 
-    int worker = params.getInt("workerID");
-    int numWorkers = params.getInt("numWorkers");
+    boolean objectSerialize = params.getBool("objectSerialize", false);
+    TupleStream tupleStream = null;
+
+    if(objectSerialize) {
+      String encodedStream = params.get("stream");
+      encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
+      byte[] bytes = Base64.base64ToByteArray(encodedStream);
+      ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+      ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
+      tupleStream = (TupleStream)objectInputStream.readObject();
+    } else {
+      tupleStream = this.streamFactory.constructStream(params.get("stream"));
+    }
+
+    int worker = params.getInt("workerID", 0);
+    int numWorkers = params.getInt("numWorkers", 1);
     StreamContext context = new StreamContext();
     context.workerID = worker;
     context.numWorkers = numWorkers;
@@ -81,4 +136,4 @@ public class StreamHandler extends Reque
   public String getSource() {
     return null;
   }
-}
\ No newline at end of file
+}

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java?rev=1679407&r1=1679406&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java Thu May 14 17:11:38 2015
@@ -23,7 +23,7 @@ import java.util.*;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexableField;
-import org.apache.solr.client.solrj.io.TupleStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.EnumFieldValue;

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java?rev=1679407&r1=1679406&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java Thu May 14 17:11:38 2015
@@ -93,11 +93,11 @@ public class Tuple implements Cloneable
     this.fields.put("_MAPS_", maps);
   }
 
-  public Map<String,Tuple> getMetrics() {
-    return (Map<String,Tuple>)this.fields.get("_METRICS_");
+  public Map<String,Map> getMetrics() {
+    return (Map<String,Map>)this.fields.get("_METRICS_");
   }
 
-  public void setMetrics(Map<String, Tuple> metrics) {
+  public void setMetrics(Map<String, Map> metrics) {
     this.fields.put("_METRICS_", metrics);
   }
 

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java?rev=1679407&r1=1678743&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java Thu May 14 17:11:38 2015
@@ -1,5 +1,7 @@
 package org.apache.solr.client.solrj.io.comp;
 
+import java.io.Serializable;
+
 import org.apache.solr.client.solrj.io.Tuple;
 
 /*
@@ -22,6 +24,6 @@ import org.apache.solr.client.solrj.io.T
 /**
  * Interface for use with a comparator lambda
  */
-public interface ComparatorLambda {
+public interface ComparatorLambda extends Serializable {
   public int compare(Tuple leftTuple, Tuple rightTuple);
 }

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java?rev=1679407&r1=1678743&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java Thu May 14 17:11:38 2015
@@ -70,22 +70,21 @@ public class FieldComparator extends Str
    */
   private void assignComparator(){
     if(ComparatorOrder.DESCENDING == order){
-      // What black magic is this type intersection??
-      // Because this class is serializable we need to make sure the lambda is also serializable.
-      // This can be done by providing this type intersection on the definition of the lambda.
-      // Why not do it in the lambda interface? Functional Interfaces don't allow extends clauses
-      comparator = (ComparatorLambda & Serializable)(leftTuple, rightTuple) -> {
-        Comparable leftComp = (Comparable)leftTuple.get(leftField);
-        Comparable rightComp = (Comparable)rightTuple.get(rightField);
-        return rightComp.compareTo(leftComp);
+      comparator = new ComparatorLambda() {
+        public int compare(Tuple leftTuple, Tuple rightTuple) {
+          Comparable leftComp = (Comparable)leftTuple.get(leftField);
+          Comparable rightComp = (Comparable)rightTuple.get(rightField);
+          return rightComp.compareTo(leftComp);
+        }
       };
     }
     else{
-      // See above for black magic reasoning.
-      comparator = (ComparatorLambda & Serializable)(leftTuple, rightTuple) -> {
-        Comparable leftComp = (Comparable)leftTuple.get(leftField);
-        Comparable rightComp = (Comparable)rightTuple.get(rightField);
-        return leftComp.compareTo(rightComp);
+      comparator = new ComparatorLambda() {
+        public int compare(Tuple leftTuple, Tuple rightTuple) {
+          Comparable leftComp = (Comparable)leftTuple.get(leftField);
+          Comparable rightComp = (Comparable)rightTuple.get(rightField);
+          return leftComp.compareTo(rightComp);
+        }
       };
     }
   }

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java?rev=1679407&r1=1678743&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java Thu May 14 17:11:38 2015
@@ -54,7 +54,10 @@ public class StreamFactory implements Se
     return this;
   }
   public String getCollectionZkHost(String collectionName){
-    return this.collectionZkHosts.getOrDefault(collectionName, null);
+    if(this.collectionZkHosts.containsKey(collectionName)){
+      return this.collectionZkHosts.get(collectionName);
+    }
+    return null;
   }
   
   public Map<String,Class> getStreamFunctions(){

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml?rev=1679407&r1=1679406&r2=1679407&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml Thu May 14 17:11:38 2015
@@ -52,8 +52,11 @@
       <str name="wt">json</str>
       <str name="distrib">false</str>
     </lst>
+    <lst name="streamFunctions">
+      <str name="count">org.apache.solr.client.solrj.io.stream.CountStream</str>
+    </lst>
   </requestHandler>
-
+  
   <requestDispatcher handleSelect="true" >
     <requestParsers enableRemoteStreaming="false" multipartUploadLimitInKB="2048" />
   </requestDispatcher>