You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/05/11 14:37:19 UTC
svn commit: r1678743 [1/3] - in /lucene/dev/trunk/solr:
core/src/java/org/apache/solr/handler/
core/src/java/org/apache/solr/response/
server/solr/configsets/data_driven_schema_configs/conf/
solrj/src/java/org/apache/solr/client/solrj/io/ solrj/src/jav...
Author: jbernste
Date: Mon May 11 12:37:18 2015
New Revision: 1678743
URL: http://svn.apache.org/r1678743
Log:
SOLR-7377: Streaming Expressions
Added:
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorOrder.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/package-info.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpression.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionNamedParameter.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParameter.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParser.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionValue.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/package-info.java (with props)
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/package-info.java (with props)
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java (with props)
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java (with props)
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java (with props)
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java (with props)
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/expr/
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParserTest.java (with props)
Removed:
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/AscFieldComp.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/CloudSolrStream.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/DescFieldComp.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/JSONTupleStream.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/MergeStream.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/MultiComp.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ParallelStream.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/PushBackStream.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/RankStream.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/ReducerStream.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrStream.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/StreamContext.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/TupleStream.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/UniqueStream.java
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/CountStream.java
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/StreamingTest.java
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
lucene/dev/trunk/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java?rev=1678743&r1=1678742&r2=1678743&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java Mon May 11 12:37:18 2015
@@ -17,32 +17,79 @@
package org.apache.solr.handler;
+import java.util.Map.Entry;
+import java.net.URLDecoder;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
-import java.net.URLDecoder;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.solr.client.solrj.io.SolrClientCache;
-import org.apache.solr.client.solrj.io.TupleStream;
-import org.apache.solr.client.solrj.io.StreamContext;
-import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.ExpressibleStream;
+import org.apache.solr.client.solrj.io.stream.MergeStream;
+import org.apache.solr.client.solrj.io.stream.ParallelStream;
+import org.apache.solr.client.solrj.io.stream.RankStream;
+import org.apache.solr.client.solrj.io.stream.ReducerStream;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.UniqueStream;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.common.params.SolrParams;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.apache.solr.common.util.Base64;
-
public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
private SolrClientCache clientCache = new SolrClientCache();
-
+ private StreamFactory streamFactory = new StreamFactory();
+
public void inform(SolrCore core) {
-
- core.addCloseHook( new CloseHook() {
+
+ /* The stream factory will always contain the zkUrl for the given collection
+ * Adds default streams with their corresponding function names. These
+ * defaults can be overridden or added to in the solrConfig in the stream
+ * RequestHandler def. Example config override
+ * <lst name="streamFunctions">
+ * <str name="group">org.apache.solr.client.solrj.io.stream.ReducerStream</str>
+ * <str name="count">org.apache.solr.client.solrj.io.stream.CountStream</str>
+ * </lst>
+ * */
+
+ String defaultCollection = null;
+ String defaultZkhost = null;
+ CoreContainer coreContainer = core.getCoreDescriptor().getCoreContainer();
+
+ if(coreContainer.isZooKeeperAware()) {
+ defaultCollection = core.getCoreDescriptor().getCollectionName();
+ defaultZkhost = core.getCoreDescriptor().getCoreContainer().getZkController().getZkServerAddress();
+ streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost);
+ }
+
+ streamFactory
+ .withStreamFunction("search", CloudSolrStream.class)
+ .withStreamFunction("merge", MergeStream.class)
+ .withStreamFunction("unique", UniqueStream.class)
+ .withStreamFunction("top", RankStream.class)
+ .withStreamFunction("group", ReducerStream.class)
+ .withStreamFunction("parallel", ParallelStream.class);
+
+
+ // This pulls all the overrides and additions from the config
+ Object functionMappingsObj = initArgs.get("streamFunctions");
+ if(null != functionMappingsObj){
+ NamedList<?> functionMappings = (NamedList<?>)functionMappingsObj;
+ for(Entry<String,?> functionMapping : functionMappings){
+ Class<?> clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(), ExpressibleStream.class);
+ streamFactory.withStreamFunction(functionMapping.getKey(), clazz);
+ }
+ }
+
+ core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
//To change body of implemented methods use File | Settings | File Templates.
@@ -57,15 +104,23 @@ public class StreamHandler extends Reque
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
SolrParams params = req.getParams();
- String encodedStream = params.get("stream");
- encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
- byte[] bytes = Base64.base64ToByteArray(encodedStream);
- ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
- ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
- TupleStream tupleStream = (TupleStream)objectInputStream.readObject();
- int worker = params.getInt("workerID");
- int numWorkers = params.getInt("numWorkers");
+ boolean objectSerialize = params.getBool("objectSerialize", false);
+ TupleStream tupleStream = null;
+
+ if(objectSerialize) {
+ String encodedStream = params.get("stream");
+ encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
+ byte[] bytes = Base64.base64ToByteArray(encodedStream);
+ ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+ ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
+ tupleStream = (TupleStream)objectInputStream.readObject();
+ } else {
+ tupleStream = this.streamFactory.constructStream(params.get("stream"));
+ }
+
+ int worker = params.getInt("workerID", 0);
+ int numWorkers = params.getInt("numWorkers", 1);
StreamContext context = new StreamContext();
context.workerID = worker;
context.numWorkers = numWorkers;
@@ -81,4 +136,4 @@ public class StreamHandler extends Reque
public String getSource() {
return null;
}
-}
\ No newline at end of file
+}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java?rev=1678743&r1=1678742&r2=1678743&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java Mon May 11 12:37:18 2015
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.io.Writer;
import java.util.*;
-import org.apache.solr.client.solrj.io.TupleStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.lucene.index.StorableField;
import org.apache.lucene.index.StoredDocument;
Modified: lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml?rev=1678743&r1=1678742&r2=1678743&view=diff
==============================================================================
--- lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml (original)
+++ lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml Mon May 11 12:37:18 2015
@@ -854,6 +854,37 @@
</lst>
</requestHandler>
+
+ <!--
+ The export request handler is used to export full sorted result sets.
+ Do not change these defaults.
+ -->
+
+ <requestHandler name="/export" class="solr.SearchHandler">
+ <lst name="invariants">
+ <str name="rq">{!xport}</str>
+ <str name="wt">xsort</str>
+ <str name="distrib">false</str>
+ </lst>
+
+ <arr name="components">
+ <str>query</str>
+ </arr>
+ </requestHandler>
+
+
+ <!--
+ Distributed Stream processing.
+ -->
+
+ <requestHandler name="/stream" class="solr.StreamHandler">
+ <lst name="invariants">
+ <str name="wt">json</str>
+ <str name="distrib">false</str>
+ </lst>
+ </requestHandler>
+
+
<!-- A Robust Example
This example SearchHandler declaration shows off usage of the
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java?rev=1678743&r1=1678742&r2=1678743&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java Mon May 11 12:37:18 2015
@@ -93,11 +93,11 @@ public class Tuple implements Cloneable
this.fields.put("_MAPS_", maps);
}
- public Map<String,Tuple> getMetrics() {
- return (Map<String,Tuple>)this.fields.get("_METRICS_");
+ public Map<String,Map> getMetrics() {
+ return (Map<String,Map>)this.fields.get("_METRICS_");
}
- public void setMetrics(Map<String, Tuple> metrics) {
+ public void setMetrics(Map<String, Map> metrics) {
this.fields.put("_METRICS_", metrics);
}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorLambda.java Mon May 11 12:37:18 2015
@@ -0,0 +1,27 @@
+package org.apache.solr.client.solrj.io.comp;
+
+import org.apache.solr.client.solrj.io.Tuple;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Interface for use with a comparator lambda
+ */
+public interface ComparatorLambda {
+ public int compare(Tuple leftTuple, Tuple rightTuple);
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorOrder.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorOrder.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorOrder.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ComparatorOrder.java Mon May 11 12:37:18 2015
@@ -0,0 +1,48 @@
+package org.apache.solr.client.solrj.io.comp;
+
+import java.util.Locale;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Enum for supported comparator ordering
+ */
+public enum ComparatorOrder {
+ ASCENDING, DESCENDING;
+
+ public static ComparatorOrder fromString(String order){
+ switch(order.toLowerCase(Locale.ROOT)){
+ case "asc":
+ return ComparatorOrder.ASCENDING;
+ case "desc":
+ return ComparatorOrder.DESCENDING;
+ default:
+ throw new IllegalArgumentException(String.format(Locale.ROOT,"Unknown order '%s'", order));
+ }
+ }
+
+ public String toString(){
+ switch(this){
+ case DESCENDING:
+ return "desc";
+ default:
+ return "asc";
+
+ }
+ }
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java Mon May 11 12:37:18 2015
@@ -0,0 +1,33 @@
+package org.apache.solr.client.solrj.io.comp;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines a comparator that can be expressed in an expression
+ */
+public interface ExpressibleComparator {
+ StreamExpressionParameter toExpression(StreamFactory factory) throws IOException;
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java Mon May 11 12:37:18 2015
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.comp;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ * An equality field Comparator which compares a field of two Tuples and determines sort order.
+ **/
+public class FieldComparator extends StreamComparator implements Comparator<Tuple>, ExpressibleComparator, Serializable {
+
+ private static final long serialVersionUID = 1;
+ private ComparatorLambda comparator;
+
+ public FieldComparator(String field, ComparatorOrder order) {
+ super(field, order);
+ assignComparator();
+ }
+ public FieldComparator(String leftField, String rightField, ComparatorOrder order){
+ super(leftField,rightField,order);
+ assignComparator();
+ }
+
+ public StreamExpressionParameter toExpression(StreamFactory factory){
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(leftField);
+
+ if(!leftField.equals(rightField)){
+ sb.append("=");
+ sb.append(rightField);
+ }
+
+ sb.append(" ");
+ sb.append(order);
+
+ return new StreamExpressionValue(sb.toString());
+ }
+
+ /*
+ * What're we doing here messing around with lambdas for the comparator logic?
+ * We want the compare(...) function to run as fast as possible because it will be called many many
+ * times over the lifetime of this object. For that reason we want to limit the number of comparisons
+ * taking place in the compare(...) function. Because this class supports both ascending and
+ * descending comparisons and the logic for each is slightly different, we want to do the
+ * if(ascending){ compare like this } else { compare like this }
+ * check only once - we can do that in the constructor of this class, create a lambda, and then execute
+ * that lambda in the compare function. A little bit of branch prediction savings right here.
+ */
+ private void assignComparator(){
+ if(ComparatorOrder.DESCENDING == order){
+ // What black magic is this type intersection??
+ // Because this class is serializable we need to make sure the lambda is also serializable.
+ // This can be done by providing this type intersection on the definition of the lambda.
+ // Why not do it in the lambda interface? Functional Interfaces don't allow extends clauses
+ comparator = (ComparatorLambda & Serializable)(leftTuple, rightTuple) -> {
+ Comparable leftComp = (Comparable)leftTuple.get(leftField);
+ Comparable rightComp = (Comparable)rightTuple.get(rightField);
+ return rightComp.compareTo(leftComp);
+ };
+ }
+ else{
+ // See above for black magic reasoning.
+ comparator = (ComparatorLambda & Serializable)(leftTuple, rightTuple) -> {
+ Comparable leftComp = (Comparable)leftTuple.get(leftField);
+ Comparable rightComp = (Comparable)rightTuple.get(rightField);
+ return leftComp.compareTo(rightComp);
+ };
+ }
+ }
+
+ public int compare(Tuple leftTuple, Tuple rightTuple) {
+ return comparator.compare(leftTuple, rightTuple);
+ }
+}
\ No newline at end of file
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java Mon May 11 12:37:18 2015
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.comp;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+
+/**
+ * Wraps multiple Comparators to provide sub-sorting.
+ **/
+
+public class MultiComp implements Comparator<Tuple>, ExpressibleComparator, Serializable {
+
+ private static final long serialVersionUID = 1;
+
+ private Comparator<Tuple>[] comps;
+
+ public MultiComp(Comparator<Tuple>... comps) {
+ this.comps = comps;
+ }
+
+ public int compare(Tuple t1, Tuple t2) {
+ for(Comparator<Tuple> comp : comps) {
+ int i = comp.compare(t1, t2);
+ if(i != 0) {
+ return i;
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ for(Comparator<Tuple> comp : comps){
+ if(comp instanceof ExpressibleComparator){
+ if(sb.length() > 0){ sb.append(","); }
+ sb.append(((ExpressibleComparator)comp).toExpression(factory));
+ }
+ else{
+ throw new IOException("This MultiComp contains a non-expressible comparator - it cannot be converted to an expression");
+ }
+ }
+
+ return new StreamExpressionValue(sb.toString());
+ }
+}
\ No newline at end of file
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java Mon May 11 12:37:18 2015
@@ -0,0 +1,45 @@
+package org.apache.solr.client.solrj.io.comp;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines a comparator that can be expressed in an expression
+ */
+public abstract class StreamComparator implements Comparator<Tuple>, Serializable {
+ protected String leftField;
+ protected String rightField;
+ protected final ComparatorOrder order;
+
+ public StreamComparator(String field, ComparatorOrder order) {
+ this.leftField = field;
+ this.rightField = field;
+ this.order = order;
+ }
+ public StreamComparator(String leftField, String rightField, ComparatorOrder order){
+ this.leftField = leftField;
+ this.rightField = rightField;
+ this.order = order;
+ }
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/package-info.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/package-info.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/package-info.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/package-info.java Mon May 11 12:37:18 2015
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+/**
+ * Comparators for the Streaming Aggregation API
+ **/
+package org.apache.solr.client.solrj.io.comp;
+
+
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultiComp;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+
+/**
+ * Connects to Zookeeper to pick replicas from a specific collection to send the query to.
+ * Under the covers the SolrStream instances send the query to the replicas.
+ * SolrStreams are opened using a thread pool, but a single thread is used
+ * to iterate and merge Tuples from each SolrStream.
+ **/
+
+public class CloudSolrStream extends TupleStream implements ExpressibleStream {
+
+ private static final long serialVersionUID = 1;
+
+ protected String zkHost;
+ protected String collection;
+ protected Map<String,String> params;
+ private Map<String, String> fieldMappings;
+ protected Comparator<Tuple> comp;
+ private int zkConnectTimeout = 10000;
+ private int zkClientTimeout = 10000;
+ private int numWorkers;
+ private int workerID;
+ private boolean trace;
+ protected transient Map<String, Tuple> eofTuples;
+ protected transient SolrClientCache cache;
+ protected transient CloudSolrClient cloudSolrClient;
+ protected transient List<TupleStream> solrStreams;
+ protected transient TreeSet<TupleWrapper> tuples;
+ protected transient StreamContext streamContext;
+
+ // Used by parallel stream
+ protected CloudSolrStream(){
+
+ }
+ public CloudSolrStream(String zkHost, String collectionName, Map params) throws IOException {
+ init(collectionName, zkHost, params);
+ }
+
+ public CloudSolrStream(StreamExpression expression, StreamFactory factory) throws IOException{
+ // grab all parameters out
+ String collectionName = factory.getValueOperand(expression, 0);
+ List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+ StreamExpressionNamedParameter aliasExpression = factory.getNamedOperand(expression, "aliases");
+ StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+ // Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice
+ if(expression.getParameters().size() != 1 + namedParams.size()){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found",expression));
+ }
+
+ // Collection Name
+ if(null == collectionName){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+ }
+
+ // Named parameters - passed directly to solr as solrparams
+ if(0 == namedParams.size()){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
+ }
+
+ Map<String,String> params = new HashMap<String,String>();
+ for(StreamExpressionNamedParameter namedParam : namedParams){
+ if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")){
+ params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
+ }
+ }
+
+ // Aliases, optional, if provided then need to split
+ if(null != aliasExpression && aliasExpression.getParameter() instanceof StreamExpressionValue){
+ fieldMappings = new HashMap<String,String>();
+ for(String mapping : ((StreamExpressionValue)aliasExpression.getParameter()).getValue().split(",")){
+ String[] parts = mapping.trim().split("=");
+ if(2 == parts.length){
+ fieldMappings.put(parts[0], parts[1]);
+ }
+ else{
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - alias expected of the format origName=newName",expression));
+ }
+ }
+ }
+
+ // zkHost, optional - if not provided then will look into factory list to get
+ String zkHost = null;
+ if(null == zkHostExpression){
+ zkHost = factory.getCollectionZkHost(collectionName);
+ }
+ else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+ zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+ }
+ if(null == zkHost){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+ }
+
+ // We've got all the required items
+ init(collectionName, zkHost, params);
+ }
+
+ @Override
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+ // functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."])
+
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // collection
+ expression.addParameter(collection);
+
+ // parameters
+ for(Entry<String,String> param : params.entrySet()){
+ expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue()));
+ }
+
+ // zkHost
+ expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+
+ // aliases
+ if(null != fieldMappings && 0 != fieldMappings.size()){
+ StringBuilder sb = new StringBuilder();
+ for(Entry<String,String> mapping : fieldMappings.entrySet()){
+ if(sb.length() > 0){ sb.append(","); }
+ sb.append(mapping.getKey());
+ sb.append("=");
+ sb.append(mapping.getValue());
+ }
+
+ expression.addParameter(new StreamExpressionNamedParameter("aliases", sb.toString()));
+ }
+
+ return expression;
+ }
+
+ private void init(String collectionName, String zkHost, Map params) throws IOException {
+ this.zkHost = zkHost;
+ this.collection = collectionName;
+ this.params = params;
+
+ // If the comparator is null then it was not explicitly set so we will create one using the sort parameter
+ // of the query. While doing this we will also take into account any aliases such that if we are sorting on
+ // fieldA but fieldA is aliased to alias.fieldA then the comparater will be against alias.fieldA.
+ if(!params.containsKey("fl")){
+ throw new IOException("fl param expected for a stream");
+ }
+ if(!params.containsKey("sort")){
+ throw new IOException("sort param expected for a stream");
+ }
+ this.comp = parseComp((String)params.get("sort"), (String)params.get("fl"));
+ }
+
+ public void setFieldMappings(Map<String, String> fieldMappings) {
+ this.fieldMappings = fieldMappings;
+ }
+
+ public void setTrace(boolean trace) {
+ this.trace = trace;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.numWorkers = context.numWorkers;
+ this.workerID = context.workerID;
+ this.cache = context.getSolrClientCache();
+ this.streamContext = context;
+ }
+
+ /**
+ * Opens the CloudSolrStream
+ *
+ ***/
+ public void open() throws IOException {
+ this.tuples = new TreeSet();
+ this.solrStreams = new ArrayList();
+ this.eofTuples = Collections.synchronizedMap(new HashMap());
+ if(this.cache != null) {
+ this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
+ } else {
+ this.cloudSolrClient = new CloudSolrClient(zkHost);
+ this.cloudSolrClient.connect();
+ }
+ constructStreams();
+ openStreams();
+ }
+
+
+ public Map getEofTuples() {
+ return this.eofTuples;
+ }
+
+ public List<TupleStream> children() {
+ return solrStreams;
+ }
+
+ private Comparator<Tuple> parseComp(String sort, String fl) throws IOException {
+
+ String[] fls = fl.split(",");
+ HashSet fieldSet = new HashSet();
+ for(String f : fls) {
+ fieldSet.add(f.trim()); //Handle spaces in the field list.
+ }
+
+ String[] sorts = sort.split(",");
+ Comparator[] comps = new Comparator[sorts.length];
+ for(int i=0; i<sorts.length; i++) {
+ String s = sorts[i];
+
+ String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
+
+ String fieldName = spec[0].trim();
+ String order = spec[1].trim();
+
+ if(!fieldSet.contains(spec[0])) {
+ throw new IOException("Fields in the sort spec must be included in the field list:"+spec[0]);
+ }
+
+ // if there's an alias for the field then use the alias
+ if(null != fieldMappings && fieldMappings.containsKey(fieldName)){
+ fieldName = fieldMappings.get(fieldName);
+ }
+
+ comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
+ }
+
+ if(comps.length > 1) {
+ return new MultiComp(comps);
+ } else {
+ return comps[0];
+ }
+ }
+
+ protected void constructStreams() throws IOException {
+
+ try {
+
+ ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+ ClusterState clusterState = zkStateReader.getClusterState();
+ //System.out.println("Connected to zk an got cluster state.");
+
+ Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
+ long time = System.currentTimeMillis();
+ params.put("distrib","false"); // We are the aggregator.
+
+ for(Slice slice : slices) {
+ Collection<Replica> replicas = slice.getReplicas();
+ List<Replica> shuffler = new ArrayList();
+ for(Replica replica : replicas) {
+ shuffler.add(replica);
+ }
+
+ Collections.shuffle(shuffler, new Random(time));
+ Replica rep = shuffler.get(0);
+ ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
+ String url = zkProps.getCoreUrl();
+ SolrStream solrStream = new SolrStream(url, params);
+ if(streamContext != null) {
+ solrStream.setStreamContext(streamContext);
+ }
+ solrStream.setFieldMappings(this.fieldMappings);
+ solrStreams.add(solrStream);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void openStreams() throws IOException {
+ ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("CloudSolrStream"));
+ try {
+ List<Future<TupleWrapper>> futures = new ArrayList();
+ for (TupleStream solrStream : solrStreams) {
+ StreamOpener so = new StreamOpener((SolrStream) solrStream, comp);
+ Future<TupleWrapper> future = service.submit(so);
+ futures.add(future);
+ }
+
+ try {
+ for (Future<TupleWrapper> f : futures) {
+ TupleWrapper w = f.get();
+ if (w != null) {
+ tuples.add(w);
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ } finally {
+ service.shutdown();
+ }
+ }
+
+ /**
+ * Closes the CloudSolrStream
+ **/
+ public void close() throws IOException {
+ for(TupleStream solrStream : solrStreams) {
+ solrStream.close();
+ }
+
+ if(cache == null) {
+ cloudSolrClient.close();
+ }
+ }
+
+ public Tuple read() throws IOException {
+ return _read();
+ }
+
+ protected Tuple _read() throws IOException {
+ TupleWrapper tw = tuples.pollFirst();
+ if(tw != null) {
+ Tuple t = tw.getTuple();
+
+ if (trace) {
+ t.put("_COLLECTION_", this.collection);
+ }
+
+ if(tw.next()) {
+ tuples.add(tw);
+ }
+ return t;
+ } else {
+ Map m = new HashMap();
+ if(trace) {
+ m.put("_COLLECTION_", this.collection);
+ }
+
+ m.put("EOF", true);
+
+ return new Tuple(m);
+ }
+ }
+
+ protected class TupleWrapper implements Comparable<TupleWrapper> {
+ private Tuple tuple;
+ private SolrStream stream;
+ private Comparator comp;
+
+ public TupleWrapper(SolrStream stream, Comparator comp) {
+ this.stream = stream;
+ this.comp = comp;
+ }
+
+ public int compareTo(TupleWrapper w) {
+ if(this == w) {
+ return 0;
+ }
+
+ int i = comp.compare(tuple, w.tuple);
+ if(i == 0) {
+ return 1;
+ } else {
+ return i;
+ }
+ }
+
+ public boolean equals(Object o) {
+ return this == o;
+ }
+
+ public Tuple getTuple() {
+ return tuple;
+ }
+
+ public boolean next() throws IOException {
+ this.tuple = stream.read();
+
+ if(tuple.EOF) {
+ eofTuples.put(stream.getBaseUrl(), tuple);
+ }
+
+ return !tuple.EOF;
+ }
+ }
+
+ protected class StreamOpener implements Callable<TupleWrapper> {
+
+ private SolrStream stream;
+ private Comparator<Tuple> comp;
+
+ public StreamOpener(SolrStream stream, Comparator<Tuple> comp) {
+ this.stream = stream;
+ this.comp = comp;
+ }
+
+ public TupleWrapper call() throws Exception {
+ stream.open();
+ TupleWrapper wrapper = new TupleWrapper(stream, comp);
+ if(wrapper.next()) {
+ return wrapper;
+ } else {
+ return null;
+ }
+ }
+ }
+}
\ No newline at end of file
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,30 @@
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines a stream that can be expressed in an expression
+ */
+public interface ExpressibleStream {
+ StreamExpressionParameter toExpression(StreamFactory factory) throws IOException;
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,165 @@
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.InputStreamResponseParser;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.noggit.JSONParser;
+import org.noggit.ObjectBuilder;
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ Queries a Solr instance, and maps SolrDocs to Tuples.
+ Initial version works with the json format and only SolrDocs are handled.
+*/
+
+public class JSONTupleStream {
+ private List<String> path; // future... for more general stream handling
+ private Reader reader;
+ private JSONParser parser;
+ private boolean atDocs;
+
+ public JSONTupleStream(Reader reader) {
+ this.reader = reader;
+ this.parser = new JSONParser(reader);
+ }
+
+ // temporary...
+ public static JSONTupleStream create(SolrClient server, SolrParams requestParams) throws IOException, SolrServerException {
+ String p = requestParams.get("qt");
+ if(p != null) {
+ ModifiableSolrParams modifiableSolrParams = (ModifiableSolrParams) requestParams;
+ modifiableSolrParams.remove("qt");
+ }
+
+ QueryRequest query = new QueryRequest( requestParams );
+ query.setPath(p);
+ query.setResponseParser(new InputStreamResponseParser("json"));
+ query.setMethod(SolrRequest.METHOD.POST);
+ NamedList<Object> genericResponse = server.request(query);
+ InputStream stream = (InputStream)genericResponse.get("stream");
+ InputStreamReader reader = new InputStreamReader(stream, "UTF-8");
+ return new JSONTupleStream(reader);
+ }
+
+
+ /** returns the next Tuple or null */
+ public Map<String,Object> next() throws IOException {
+ if (!atDocs) {
+ boolean found = advanceToDocs();
+ atDocs = true;
+ if (!found) return null;
+ }
+ // advance past ARRAY_START (in the case that we just advanced to docs, or OBJECT_END left over from the last call.
+ int event = parser.nextEvent();
+ if (event == JSONParser.ARRAY_END) return null;
+
+ Object o = ObjectBuilder.getVal(parser);
+ // right now, getVal will leave the last event read as OBJECT_END
+
+ return (Map<String,Object>)o;
+ }
+
+ public void close() throws IOException {
+ reader.close();
+ }
+
+
+ private void expect(int parserEventType) throws IOException {
+ int event = parser.nextEvent();
+ if (event != parserEventType) {
+ throw new IOException("JSONTupleStream: expected " + JSONParser.getEventString(parserEventType) + " but got " + JSONParser.getEventString(event) );
+ }
+ }
+
+ private void expect(String mapKey) {
+
+
+ }
+
+ private boolean advanceToMapKey(String key, boolean deepSearch) throws IOException {
+ for (;;) {
+ int event = parser.nextEvent();
+ switch (event) {
+ case JSONParser.STRING:
+ if (key != null) {
+ String val = parser.getString();
+ if (key.equals(val)) {
+ return true;
+ }
+ }
+ break;
+ case JSONParser.OBJECT_END:
+ return false;
+ case JSONParser.OBJECT_START:
+ if (deepSearch) {
+ boolean found = advanceToMapKey(key, true);
+ if (found) {
+ return true;
+ }
+ } else {
+ advanceToMapKey(null, false);
+ }
+ break;
+ case JSONParser.ARRAY_START:
+ skipArray(key, deepSearch);
+ break;
+ }
+ }
+ }
+
+ private void skipArray(String key, boolean deepSearch) throws IOException {
+ for (;;) {
+ int event = parser.nextEvent();
+ switch (event) {
+ case JSONParser.OBJECT_START:
+ advanceToMapKey(key, deepSearch);
+ break;
+ case JSONParser.ARRAY_START:
+ skipArray(key, deepSearch);
+ break;
+ case JSONParser.ARRAY_END:
+ return;
+ }
+ }
+ }
+
+
+ private boolean advanceToDocs() throws IOException {
+ expect(JSONParser.OBJECT_START);
+ boolean found = advanceToMapKey("docs", true);
+ expect(JSONParser.ARRAY_START);
+ return found;
+ }
+
+
+
+}
\ No newline at end of file
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Locale;
+import java.util.Map.Entry;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.ExpressibleComparator;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+* Unions streamA with streamB ordering the Tuples based on a Comparator.
+* Both streams must be sorted by the fields being compared.
+**/
+
+
+public class MergeStream extends TupleStream implements ExpressibleStream {
+
+ private static final long serialVersionUID = 1;
+
+ private PushBackStream streamA;
+ private PushBackStream streamB;
+ private Comparator<Tuple> comp;
+
+ public MergeStream(TupleStream streamA, TupleStream streamB, Comparator<Tuple> comp) {
+ this.streamA = new PushBackStream(streamA);
+ this.streamB = new PushBackStream(streamB);
+ this.comp = comp;
+ }
+
+ public MergeStream(StreamExpression expression,StreamFactory factory) throws IOException {
+ // grab all parameters out
+ List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class);
+ StreamExpressionNamedParameter onExpression = factory.getNamedOperand(expression, "on");
+
+ // validate expression contains only what we want.
+ if(expression.getParameters().size() != streamExpressions.size() + 1){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+ }
+
+ if(2 != streamExpressions.size()){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size()));
+ }
+ this.streamA = new PushBackStream(factory.constructStream(streamExpressions.get(0)));
+ this.streamB = new PushBackStream(factory.constructStream(streamExpressions.get(1)));
+
+ if(null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'on' parameter listing fields to merge on but didn't find one",expression));
+ }
+
+ // Merge is always done over equality, so always use an EqualTo comparator
+ this.comp = factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class);
+ }
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // streams
+ expression.addParameter(streamA.toExpression(factory));
+ expression.addParameter(streamB.toExpression(factory));
+
+ // on
+ if(comp instanceof ExpressibleComparator){
+ expression.addParameter(new StreamExpressionNamedParameter("on",((ExpressibleComparator)comp).toExpression(factory)));
+ }
+ else{
+ throw new IOException("This MergeStream contains a non-expressible comparator - it cannot be converted to an expression");
+ }
+
+ return expression;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.streamA.setStreamContext(context);
+ this.streamB.setStreamContext(context);
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList();
+ l.add(streamA);
+ l.add(streamB);
+ return l;
+ }
+
+ public void open() throws IOException {
+ streamA.open();
+ streamB.open();
+ }
+
+ public void close() throws IOException {
+ streamA.close();
+ streamB.close();
+ }
+
+ public Tuple read() throws IOException {
+ Tuple a = streamA.read();
+ Tuple b = streamB.read();
+
+ if(a.EOF && b.EOF) {
+ return a;
+ }
+
+ if(a.EOF) {
+ streamA.pushBack(a);
+ return b;
+ }
+
+ if(b.EOF) {
+ streamB.pushBack(b);
+ return a;
+ }
+
+ int c = comp.compare(a,b);
+
+ if(c < 0) {
+ streamB.pushBack(b);
+ return a;
+ } else {
+ streamA.pushBack(a);
+ return b;
+ }
+ }
+
+ public int getCost() {
+ return 0;
+ }
+}
\ No newline at end of file
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.net.URLEncoder;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.io.ByteArrayOutputStream;
+import java.util.Random;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.ExpressibleComparator;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Base64;
+
+/**
+ * The ParallelStream decorates a TupleStream implementation and pushes it to N workers for parallel execution.
+ * Workers are chosen from a SolrCloud collection.
+ * Tuples that are streamed back from the workers are ordered by a Comparator.
+ **/
+
+
+public class ParallelStream extends CloudSolrStream implements ExpressibleStream {
+
+ private TupleStream tupleStream;
+ private int workers;
+ private boolean objectSerialize = true;
+ private transient StreamFactory streamFactory;
+
+ public ParallelStream(String zkHost,
+ String collection,
+ TupleStream tupleStream,
+ int workers,
+ Comparator<Tuple> comp) throws IOException {
+ init(zkHost,collection,tupleStream,workers,comp);
+ }
+
+
+ public ParallelStream(String zkHost,
+ String collection,
+ String expressionString,
+ int workers,
+ Comparator<Tuple> comp) throws IOException {
+ objectSerialize = false;
+ TupleStream tStream = this.streamFactory.constructStream(expressionString);
+ init(zkHost,collection, tStream, workers,comp);
+ }
+
+ public ParallelStream(StreamExpression expression, StreamFactory factory) throws IOException {
+ // grab all parameters out
+ objectSerialize = false;
+ String collectionName = factory.getValueOperand(expression, 0);
+ StreamExpressionNamedParameter workersParam = factory.getNamedOperand(expression, "workers");
+ List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class);
+ StreamExpressionNamedParameter sortExpression = factory.getNamedOperand(expression, "sort");
+ StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
+
+ // validate expression contains only what we want.
+
+ if(expression.getParameters().size() != streamExpressions.size() + 3 + (null != zkHostExpression ? 1 : 0)){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+ }
+
+ // Collection Name
+ if(null == collectionName){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
+ }
+
+ // Workers
+ if(null == workersParam || null == workersParam.getParameter() || !(workersParam.getParameter() instanceof StreamExpressionValue)){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single 'workersParam' parameter of type positive integer but didn't find one",expression));
+ }
+ String workersStr = ((StreamExpressionValue)workersParam.getParameter()).getValue();
+ int workersInt = 0;
+ try{
+ workersInt = Integer.parseInt(workersStr);
+ if(workersInt <= 0){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - workers '%s' must be greater than 0.",expression, workersStr));
+ }
+ }
+ catch(NumberFormatException e){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - workers '%s' is not a valid integer.",expression, workersStr));
+ }
+
+ // Stream
+ if(1 != streamExpressions.size()){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+ }
+
+ // Sort
+ if(null == sortExpression || !(sortExpression.getParameter() instanceof StreamExpressionValue)){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'sort' parameter telling us how to join the parallel streams but didn't find one",expression));
+ }
+
+ // zkHost, optional - if not provided then will look into factory list to get
+ String zkHost = null;
+ if(null == zkHostExpression){
+ zkHost = factory.getCollectionZkHost(collectionName);
+ }
+ else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+ zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+ }
+ if(null == zkHost){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
+ }
+
+ // We've got all the required items
+ TupleStream stream = factory.constructStream(streamExpressions.get(0));
+ Comparator<Tuple> comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class);
+ streamFactory = factory;
+ init(zkHost,collectionName,stream,workersInt,comp);
+ }
+
+ private void init(String zkHost,String collection,TupleStream tupleStream,int workers,Comparator<Tuple> comp) throws IOException{
+ this.zkHost = zkHost;
+ this.collection = collection;
+ this.workers = workers;
+ this.comp = comp;
+ this.tupleStream = tupleStream;
+
+ // requires Expressible stream and comparator
+ if(!objectSerialize && !(tupleStream instanceof ExpressibleStream)){
+ throw new IOException("Unable to create ParallelStream with a non-expressible TupleStream.");
+ }
+ }
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // collection
+ expression.addParameter(collection);
+
+ // workers
+ expression.addParameter(new StreamExpressionNamedParameter("workers", Integer.toString(workers)));
+
+ // stream
+ if(tupleStream instanceof ExpressibleStream){
+ expression.addParameter(((ExpressibleStream)tupleStream).toExpression(factory));
+ }
+ else{
+ throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+ }
+
+ // sort
+ if(comp instanceof ExpressibleComparator){
+ expression.addParameter(new StreamExpressionNamedParameter("sort",((ExpressibleComparator)comp).toExpression(factory)));
+ }
+ else{
+ throw new IOException("This ParallelStream contains a non-expressible comparator - it cannot be converted to an expression");
+ }
+
+ // zkHost
+ expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+
+ return expression;
+ }
+
+ public List<TupleStream> children() {
+ List l = new ArrayList();
+ l.add(tupleStream);
+ return l;
+ }
+
+ public Tuple read() throws IOException {
+ Tuple tuple = _read();
+
+ if(tuple.EOF) {
+ Map m = new HashMap();
+ m.put("EOF", true);
+ Tuple t = new Tuple(m);
+
+ Map<String, Map> metrics = new HashMap();
+ Iterator<Entry<String,Tuple>> it = this.eofTuples.entrySet().iterator();
+ while(it.hasNext()) {
+ Map.Entry<String, Tuple> entry = it.next();
+ metrics.put(entry.getKey(), entry.getValue().fields);
+ }
+
+ t.setMetrics(metrics);
+ return t;
+ }
+
+ return tuple;
+ }
+
+ public void setStreamContext(StreamContext streamContext) {
+ this.streamContext = streamContext;
+ if(streamFactory == null) {
+ this.streamFactory = streamContext.getStreamFactory();
+ }
+ this.tupleStream.setStreamContext(streamContext);
+ }
+
+ protected void constructStreams() throws IOException {
+
+ try {
+ Object pushStream = null;
+
+ if (objectSerialize) {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bout);
+ out.writeObject(tupleStream);
+ byte[] bytes = bout.toByteArray();
+ String encoded = Base64.byteArrayToBase64(bytes, 0, bytes.length);
+ pushStream = URLEncoder.encode(encoded, "UTF-8");
+ } else {
+ pushStream = ((ExpressibleStream) tupleStream).toExpression(streamFactory);
+ }
+
+ ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+ ClusterState clusterState = zkStateReader.getClusterState();
+ Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
+ long time = System.currentTimeMillis();
+ List<Replica> shuffler = new ArrayList();
+ for(Slice slice : slices) {
+ Collection<Replica> replicas = slice.getReplicas();
+ for (Replica replica : replicas) {
+ shuffler.add(replica);
+ }
+ }
+
+ if(workers > shuffler.size()) {
+ throw new IOException("Number of workers exceeds nodes in the worker collection");
+ }
+
+ Collections.shuffle(shuffler, new Random(time));
+
+ for(int w=0; w<workers; w++) {
+ HashMap params = new HashMap();
+ params.put("distrib","false"); // We are the aggregator.
+ params.put("numWorkers", workers);
+ params.put("workerID", w);
+ params.put("stream", pushStream);
+ params.put("qt","/stream");
+ params.put("objectSerialize", objectSerialize);
+ Replica rep = shuffler.get(w);
+ ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
+ String url = zkProps.getCoreUrl();
+ SolrStream solrStream = new SolrStream(url, params);
+ solrStreams.add(solrStream);
+ }
+
+ assert(solrStreams.size() == workers);
+
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void setObjectSerialize(boolean objectSerialize) {
+ this.objectSerialize = objectSerialize;
+ }
+
+ public boolean getObjectSerialize() {
+ return objectSerialize;
+ }
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ * A TupleStream that allows a single Tuple to be pushed back onto the stream after it's been read.
+ * This is a useful class when building streams that maintain the order of Tuples between multiple
+ * substreams.
+ **/
+
+public class PushBackStream extends TupleStream {
+
+ private static final long serialVersionUID = 1;
+
+ private TupleStream stream;
+ private Tuple tuple;
+
+ public PushBackStream(TupleStream stream) {
+ this.stream = stream;
+ }
+
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException{
+ if(stream instanceof ExpressibleStream){
+ return ((ExpressibleStream)stream).toExpression(factory);
+ }
+
+ throw new IOException("This PushBackStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.stream.setStreamContext(context);
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList();
+ l.add(stream);
+ return l;
+ }
+
+ public void open() throws IOException {
+ stream.open();
+ }
+
+ public void close() throws IOException {
+ stream.close();
+ }
+
+ public void pushBack(Tuple tuple) {
+ this.tuple = tuple;
+ }
+
+ public Tuple read() throws IOException {
+ if(tuple != null) {
+ Tuple t = tuple;
+ tuple = null;
+ return t;
+ } else {
+ return stream.read();
+ }
+ }
+
+ public int getCost() {
+ return 0;
+ }
+}
\ No newline at end of file
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.PriorityQueue;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.ExpressibleComparator;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+
+/**
+* Iterates over a TupleStream and Ranks the topN tuples based on a Comparator.
+**/
+
+public class RankStream extends TupleStream implements ExpressibleStream {
+
+ private static final long serialVersionUID = 1;
+
+ private TupleStream tupleStream;
+ private Comparator<Tuple> comp;
+ private int size;
+ private transient PriorityQueue<Tuple> top;
+ private transient boolean finished = false;
+ private transient LinkedList<Tuple> topList;
+
+ public RankStream(TupleStream tupleStream, int size, Comparator<Tuple> comp) {
+ init(tupleStream,size,comp);
+ }
+
+ public RankStream(StreamExpression expression, StreamFactory factory) throws IOException {
+ // grab all parameters out
+ List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class);
+ StreamExpressionNamedParameter nParam = factory.getNamedOperand(expression, "n");
+ StreamExpressionNamedParameter sortExpression = factory.getNamedOperand(expression, "sort");
+
+ // validate expression contains only what we want.
+ if(expression.getParameters().size() != streamExpressions.size() + 2){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+ }
+
+ if(null == nParam || null == nParam.getParameter() || !(nParam.getParameter() instanceof StreamExpressionValue)){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single 'n' parameter of type positive integer but didn't find one",expression));
+ }
+ String nStr = ((StreamExpressionValue)nParam.getParameter()).getValue();
+ int nInt = 0;
+ try{
+ nInt = Integer.parseInt(nStr);
+ if(nInt <= 0){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - topN '%s' must be greater than 0.",expression, nStr));
+ }
+ }
+ catch(NumberFormatException e){
+ throw new IOException(String.format(Locale.ROOT,"invalid expression %s - topN '%s' is not a valid integer.",expression, nStr));
+ }
+ if(1 != streamExpressions.size()){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+ }
+ if(null == sortExpression || !(sortExpression.getParameter() instanceof StreamExpressionValue)){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'over' parameter listing fields to unique over but didn't find one",expression));
+ }
+
+ TupleStream stream = factory.constructStream(streamExpressions.get(0));
+ Comparator<Tuple> comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class);
+
+ init(stream,nInt,comp);
+ }
+
+ private void init(TupleStream tupleStream, int size, Comparator<Tuple> comp){
+ this.tupleStream = tupleStream;
+ this.comp = comp;
+ this.size = size;
+ }
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // n
+ expression.addParameter(new StreamExpressionNamedParameter("n", Integer.toString(size)));
+
+ // stream
+ if(tupleStream instanceof ExpressibleStream){
+ expression.addParameter(((ExpressibleStream)tupleStream).toExpression(factory));
+ }
+ else{
+ throw new IOException("This RankStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+ }
+
+ // sort
+ if(comp instanceof ExpressibleComparator){
+ expression.addParameter(new StreamExpressionNamedParameter("sort",((ExpressibleComparator)comp).toExpression(factory)));
+ }
+ else{
+ throw new IOException("This RankStream contains a non-expressible comparator - it cannot be converted to an expression");
+ }
+
+ return expression;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.tupleStream.setStreamContext(context);
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList();
+ l.add(tupleStream);
+ return l;
+ }
+
+ public void open() throws IOException {
+ this.top = new PriorityQueue(size, new ReverseComp(comp));
+ this.topList = new LinkedList();
+ tupleStream.open();
+ }
+
+ public void close() throws IOException {
+ tupleStream.close();
+ }
+
+ public Comparator<Tuple> getComparator(){
+ return this.comp;
+ }
+
+ public Tuple read() throws IOException {
+ if(!finished) {
+ while(true) {
+ Tuple tuple = tupleStream.read();
+ if(tuple.EOF) {
+ finished = true;
+ int s = top.size();
+ for(int i=0; i<s; i++) {
+ Tuple t = top.poll();
+ topList.addFirst(t);
+ }
+ topList.addLast(tuple);
+ break;
+ } else {
+ Tuple peek = top.peek();
+ if(top.size() >= size) {
+ if(comp.compare(tuple, peek) < 0) {
+ top.poll();
+ top.add(tuple);
+ }
+ } else {
+ top.add(tuple);
+ }
+ }
+ }
+ }
+
+ return topList.pollFirst();
+ }
+
+ public int getCost() {
+ return 0;
+ }
+
+ class ReverseComp implements Comparator<Tuple>, Serializable {
+
+ private Comparator<Tuple> comp;
+
+ public ReverseComp(Comparator<Tuple> comp) {
+ this.comp = comp;
+ }
+
+ public int compare(Tuple t1, Tuple t2) {
+ return comp.compare(t1, t2)*(-1);
+ }
+ }
+}
\ No newline at end of file