You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/05/11 14:37:19 UTC
svn commit: r1678743 [2/3] - in /lucene/dev/trunk/solr:
core/src/java/org/apache/solr/handler/
core/src/java/org/apache/solr/response/
server/solr/configsets/data_driven_schema_configs/conf/
solrj/src/java/org/apache/solr/client/solrj/io/ solrj/src/jav...
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.ExpressibleComparator;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ * Iterates over a TupleStream and buffers Tuples that are equal based on a comparator.
+ * This allows tuples to be grouped by common field(s).
+ *
+ * The read() method emits one tuple per group. The fields of the emitted Tuple reflect the first tuple
+ * encountered in the group.
+ *
+ * Use the Tuple.getMaps() method to return all the Tuples in the group. This method returns
+ * a list of maps (including the group head), which hold the data for each Tuple in the group.
+ *
+ * Note: The ReducerStream requires that it's underlying stream be sorted and partitioned by the same
+ * fields as it's comparator.
+ *
+ **/
+
+public class ReducerStream extends TupleStream implements ExpressibleStream {
+
+ private static final long serialVersionUID = 1;
+
+ private PushBackStream tupleStream;
+ private Comparator<Tuple> comp;
+
+ private transient Tuple currentGroupHead;
+
+ public ReducerStream(TupleStream tupleStream,
+ Comparator<Tuple> comp) {
+ this.tupleStream = new PushBackStream(tupleStream);
+ this.comp = comp;
+ }
+
+ public ReducerStream(StreamExpression expression, StreamFactory factory) throws IOException{
+ // grab all parameters out
+ List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class);
+ StreamExpressionNamedParameter byExpression = factory.getNamedOperand(expression, "by");
+
+ // validate expression contains only what we want.
+ if(expression.getParameters().size() != streamExpressions.size() + 1){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+ }
+
+ if(1 != streamExpressions.size()){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+ }
+ this.tupleStream = new PushBackStream(factory.constructStream(streamExpressions.get(0)));
+
+ if(null == byExpression || !(byExpression.getParameter() instanceof StreamExpressionValue)){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to group by but didn't find one",expression));
+ }
+
+ // Reducing is always done over equality, so always use an EqualTo comparator
+ this.comp = factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class);
+ }
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // stream
+ expression.addParameter(tupleStream.toExpression(factory));
+
+ // over
+ if(comp instanceof ExpressibleComparator){
+ expression.addParameter(new StreamExpressionNamedParameter("by",((ExpressibleComparator)comp).toExpression(factory)));
+ }
+ else{
+ throw new IOException("This ReducerStream contains a non-expressible comparator - it cannot be converted to an expression");
+ }
+
+ return expression;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.tupleStream.setStreamContext(context);
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList();
+ l.add(tupleStream);
+ return l;
+ }
+
+ public void open() throws IOException {
+ tupleStream.open();
+ }
+
+ public void close() throws IOException {
+ tupleStream.close();
+ }
+
+ public Tuple read() throws IOException {
+
+ List<Map> maps = new ArrayList();
+ while(true) {
+ Tuple t = tupleStream.read();
+
+ if(t.EOF) {
+ if(maps.size() > 0) {
+ tupleStream.pushBack(t);
+ Map map1 = maps.get(0);
+ Map map2 = new HashMap();
+ map2.putAll(map1);
+ Tuple groupHead = new Tuple(map2);
+ groupHead.setMaps(maps);
+ return groupHead;
+ } else {
+ return t;
+ }
+ }
+
+ if(currentGroupHead == null) {
+ currentGroupHead = t;
+ maps.add(t.getMap());
+ } else {
+ if(comp.compare(currentGroupHead, t) == 0) {
+ maps.add(t.getMap());
+ } else {
+ Tuple groupHead = currentGroupHead.clone();
+ tupleStream.pushBack(t);
+ currentGroupHead = null;
+ groupHead.setMaps(maps);
+ return groupHead;
+ }
+ }
+ }
+ }
+
+ public int getCost() {
+ return 0;
+ }
+}
\ No newline at end of file
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Iterator;
+
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+
+/**
+* Queries a single Solr instance and maps SolrDocs to a Stream of Tuples.
+**/
+
+public class SolrStream extends TupleStream {
+
+ private static final long serialVersionUID = 1;
+
+ private String baseUrl;
+ private Map params;
+ private int numWorkers;
+ private int workerID;
+ private boolean trace;
+ private Map<String, String> fieldMappings;
+ private transient JSONTupleStream jsonTupleStream;
+ private transient HttpSolrClient client;
+ private transient SolrClientCache cache;
+
+ public SolrStream(String baseUrl, Map params) {
+ this.baseUrl = baseUrl;
+ this.params = params;
+ }
+
+ public void setFieldMappings(Map<String, String> fieldMappings) {
+ this.fieldMappings = fieldMappings;
+ }
+
+ public List<TupleStream> children() {
+ return new ArrayList();
+ }
+
+ public String getBaseUrl() {
+ return baseUrl;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.numWorkers = context.numWorkers;
+ this.workerID = context.workerID;
+ this.cache = context.getSolrClientCache();
+ }
+
+ /**
+ * Opens the stream to a single Solr instance.
+ **/
+
+ public void open() throws IOException {
+
+ if(cache == null) {
+ client = new HttpSolrClient(baseUrl);
+ } else {
+ client = cache.getHttpSolrClient(baseUrl);
+ }
+
+ try {
+ jsonTupleStream = JSONTupleStream.create(client, loadParams(params));
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Setting trace to true will include the "_CORE_" field in each Tuple emitted by the stream.
+ **/
+
+ public void setTrace(boolean trace) {
+ this.trace = trace;
+ }
+
+ private SolrParams loadParams(Map params) throws IOException {
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ if(params.containsKey("partitionKeys")) {
+ if(!params.get("partitionKeys").equals("none")) {
+ String partitionFilter = getPartitionFilter();
+ solrParams.add("fq", partitionFilter);
+ }
+ } else {
+ if(numWorkers > 1) {
+ throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker.");
+ }
+ }
+
+ Iterator<Map.Entry> it = params.entrySet().iterator();
+ while(it.hasNext()) {
+ Map.Entry entry = it.next();
+ solrParams.add((String)entry.getKey(), entry.getValue().toString());
+ }
+
+ return solrParams;
+ }
+
+ private String getPartitionFilter() {
+ StringBuilder buf = new StringBuilder("{!hash workers=");
+ buf.append(this.numWorkers);
+ buf.append(" worker=");
+ buf.append(this.workerID);
+ buf.append("}");
+ return buf.toString();
+ }
+
+ /**
+ * Closes the Stream to a single Solr Instance
+ * */
+
+ public void close() throws IOException {
+ jsonTupleStream.close();
+ if(cache == null) {
+ client.close();
+ }
+ }
+
+ /**
+ * Reads a Tuple from the stream. The Stream is completed when Tuple.EOF == true.
+ **/
+
+ public Tuple read() throws IOException {
+ Map fields = jsonTupleStream.next();
+
+ if(trace) {
+ fields.put("_CORE_", this.baseUrl);
+ }
+
+ if(fields == null) {
+ //Return the EOF tuple.
+ Map m = new HashMap();
+ m.put("EOF", true);
+ return new Tuple(m);
+ } else {
+ if(fieldMappings != null) {
+ fields = mapFields(fields, fieldMappings);
+ }
+ return new Tuple(fields);
+ }
+ }
+
+ private Map mapFields(Map fields, Map<String,String> mappings) {
+
+ Iterator<Map.Entry<String,String>> it = mappings.entrySet().iterator();
+ while(it.hasNext()) {
+ Map.Entry<String,String> entry = it.next();
+ String mapFrom = entry.getKey();
+ String mapTo = entry.getValue();
+ Object o = fields.get(mapFrom);
+ fields.remove(mapFrom);
+ fields.put(mapTo, o);
+ }
+
+ return fields;
+ }
+}
\ No newline at end of file
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java Mon May 11 12:37:18 2015
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ * The StreamContext is passed to TupleStreams using the TupleStream.setStreamContext() method.
+ * The StreamContext is used to pass shared context to concentrically wrapped TupleStreams.
+ *
+ * Note: The StreamContext contains the SolrClientCache which is used to cache SolrClients for reuse
+ * across multiple TupleStreams.
+ **/
+
+
+public class StreamContext implements Serializable{
+
+ private Map entries = new HashMap();
+ public int workerID;
+ public int numWorkers;
+ private SolrClientCache clientCache;
+ private StreamFactory streamFactory;
+
+ public Object get(Object key) {
+ return entries.get(key);
+ }
+
+ public void put(Object key, Object value) {
+ this.entries.put(key, value);
+ }
+
+ public void setSolrClientCache(SolrClientCache clientCache) {
+ this.clientCache = clientCache;
+ }
+
+ public SolrClientCache getSolrClientCache() {
+ return this.clientCache;
+ }
+
+ public void setStreamFactory(StreamFactory streamFactory) {
+ this.streamFactory = streamFactory;
+ }
+
+ public StreamFactory getStreamFactory() {
+ return this.streamFactory;
+ }
+}
\ No newline at end of file
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public abstract class TupleStream implements Serializable {
+
+ private static final long serialVersionUID = 1;
+
+ public TupleStream() {
+
+ }
+
+ public abstract void setStreamContext(StreamContext context);
+
+ public abstract List<TupleStream> children();
+
+ public abstract void open() throws IOException;
+
+ public abstract void close() throws IOException;
+
+ public abstract Tuple read() throws IOException;
+
+ public int getCost() {
+ return 0;
+ }
+}
\ No newline at end of file
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.ExpressibleComparator;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+
+/**
+ * The UniqueStream emits a unique stream of Tuples based on a Comparator.
+ *
+ * Note: The sort order of the underlying stream must match the Comparator.
+ **/
+
+public class UniqueStream extends TupleStream implements ExpressibleStream {
+
+ private static final long serialVersionUID = 1;
+
+ private TupleStream tupleStream;
+ private Comparator<Tuple> comp;
+ private transient Tuple currentTuple;
+
+ public UniqueStream(TupleStream tupleStream, Comparator<Tuple> comp) {
+ this.tupleStream = tupleStream;
+ this.comp = comp;
+ }
+
+ public UniqueStream(StreamExpression expression,StreamFactory factory) throws IOException {
+ // grab all parameters out
+ List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class);
+ StreamExpressionNamedParameter overExpression = factory.getNamedOperand(expression, "over");
+
+ // validate expression contains only what we want.
+ if(expression.getParameters().size() != streamExpressions.size() + 1){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+ }
+
+ if(1 != streamExpressions.size()){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+ }
+ this.tupleStream = factory.constructStream(streamExpressions.get(0));
+
+ if(null == overExpression || !(overExpression.getParameter() instanceof StreamExpressionValue)){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'over' parameter listing fields to unique over but didn't find one",expression));
+ }
+
+ // Uniqueness is always done over equality, so always use an EqualTo comparator
+ this.comp = factory.constructComparator(((StreamExpressionValue)overExpression.getParameter()).getValue(), FieldComparator.class);
+ }
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) throws IOException {
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // streams
+ if(tupleStream instanceof ExpressibleStream){
+ expression.addParameter(((ExpressibleStream)tupleStream).toExpression(factory));
+ }
+ else{
+ throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+ }
+
+ // over
+ if(comp instanceof ExpressibleComparator){
+ expression.addParameter(new StreamExpressionNamedParameter("over",((ExpressibleComparator)comp).toExpression(factory)));
+ }
+ else{
+ throw new IOException("This UniqueStream contains a non-expressible comparator - it cannot be converted to an expression");
+ }
+
+ return expression;
+ }
+
+ public void setComp(Comparator<Tuple> comp) {
+ this.comp = comp;
+ }
+
+ public void setStreamContext(StreamContext context) {
+ this.tupleStream.setStreamContext(context);
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList<TupleStream>();
+ l.add(tupleStream);
+ return l;
+ }
+
+ public void open() throws IOException {
+ tupleStream.open();
+ }
+
+ public void close() throws IOException {
+ tupleStream.close();
+ }
+
+ public Tuple read() throws IOException {
+ Tuple tuple = tupleStream.read();
+ if(tuple.EOF) {
+ return tuple;
+ }
+
+ if(currentTuple == null) {
+ currentTuple = tuple;
+ return tuple;
+ } else {
+ while(true) {
+ int i = comp.compare(currentTuple, tuple);
+ if(i == 0) {
+ //We have duplicate tuple so read the next tuple from the stream.
+ tuple = tupleStream.read();
+ if(tuple.EOF) {
+ return tuple;
+ }
+ } else {
+ //We have a non duplicate
+ this.currentTuple = tuple;
+ return tuple;
+ }
+ }
+ }
+ }
+
+ public int getCost() {
+ return 0;
+ }
+
+}
\ No newline at end of file
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpression.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpression.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpression.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpression.java Mon May 11 12:37:18 2015
@@ -0,0 +1,127 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Expression containing a function and set of parameters
+ */
+public class StreamExpression implements StreamExpressionParameter {
+ private String functionName;
+ private List<StreamExpressionParameter> parameters;
+
+ public StreamExpression(String functionName){
+ this.functionName = functionName;
+ parameters = new ArrayList<StreamExpressionParameter>();
+ }
+
+ public String getFunctionName(){
+ return this.functionName;
+ }
+ public void setFunctionName(String functionName){
+ if(null == functionName){
+ throw new IllegalArgumentException("Null functionName is not allowed.");
+ }
+
+ this.functionName = functionName;
+ }
+ public StreamExpression withFunctionName(String functionName){
+ setFunctionName(functionName);
+ return this;
+ }
+
+ public void addParameter(StreamExpressionParameter parameter){
+ this.parameters.add(parameter);
+ }
+ public void addParameter(String parameter){
+ addParameter(new StreamExpressionValue(parameter));
+ }
+
+ public StreamExpression withParameter(StreamExpressionParameter parameter){
+ this.parameters.add(parameter);
+ return this;
+ }
+ public StreamExpression withParameter(String parameter){
+ return withParameter(new StreamExpressionValue(parameter));
+ }
+
+ public List<StreamExpressionParameter> getParameters(){
+ return this.parameters;
+ }
+ public void setParameters(List<StreamExpressionParameter> parameters){
+ if(null == parameters){
+ throw new IllegalArgumentException("Null parameter list is not allowed.");
+ }
+
+ this.parameters = parameters;
+ }
+ public StreamExpression withParameters(List<StreamExpressionParameter> parameters){
+ setParameters(parameters);
+ return this;
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder(this.functionName);
+
+ sb.append("(");
+ for(int idx = 0; idx < parameters.size(); ++idx){
+ if(0 != idx){ sb.append(","); }
+ sb.append(parameters.get(idx));
+ }
+ sb.append(")");
+
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object other){
+ if(other.getClass() != StreamExpression.class){
+ return false;
+ }
+
+ StreamExpression check = (StreamExpression)other;
+
+ if(null == this.functionName && null != check.functionName){
+ return false;
+ }
+ if(null != this.functionName && null == check.functionName){
+ return false;
+ }
+
+ if(null != this.functionName && null != check.functionName && !this.functionName.equals(check.functionName)){
+ return false;
+ }
+
+ if(this.parameters.size() != check.parameters.size()){
+ return false;
+ }
+
+ for(int idx = 0; idx < this.parameters.size(); ++idx){
+ StreamExpressionParameter left = this.parameters.get(idx);
+ StreamExpressionParameter right = check.parameters.get(idx);
+ if(!left.equals(right)){
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionNamedParameter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionNamedParameter.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionNamedParameter.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionNamedParameter.java Mon May 11 12:37:18 2015
@@ -0,0 +1,111 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provides a named parameter
+ */
+public class StreamExpressionNamedParameter implements StreamExpressionParameter {
+ private String name;
+ private StreamExpressionParameter parameter;
+
+ public StreamExpressionNamedParameter(String name){
+ this.name = name;
+ }
+ public StreamExpressionNamedParameter(String name, String parameter){
+ this.name = name;
+ setParameter(parameter);
+ }
+ public StreamExpressionNamedParameter(String name, StreamExpressionParameter parameter){
+ this.name = name;
+ setParameter(parameter);
+ }
+
+ public String getName(){
+ return this.name;
+ }
+ public void setName(String name){
+ if(null == name || 0 == name.length()){
+ throw new IllegalArgumentException("Null or empty name is not allowed is not allowed.");
+ }
+
+ this.name = name;
+ }
+
+ public StreamExpressionParameter getParameter(){
+ return this.parameter;
+ }
+ public void setParameter(StreamExpressionParameter parameter){
+ this.parameter = parameter;
+ }
+ public StreamExpressionNamedParameter withParameter(StreamExpressionParameter parameter){
+ setParameter(parameter);
+ return this;
+ }
+ public void setParameter(String parameter){
+ this.parameter = new StreamExpressionValue(parameter);
+ }
+ public StreamExpressionNamedParameter withParameter(String parameter){
+ setParameter(parameter);
+ return this;
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder(name);
+ sb.append("=");
+
+ // check if we require quoting
+ boolean requiresQuote = false;
+ if(parameter instanceof StreamExpressionValue){
+ String value = ((StreamExpressionValue)parameter).getValue();
+ requiresQuote = !StreamExpressionParser.wordToken(value);
+ }
+
+ if(requiresQuote){ sb.append("\""); }
+ sb.append(parameter.toString());
+ if(requiresQuote){ sb.append("\""); }
+
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object other){
+ if(other.getClass() != StreamExpressionNamedParameter.class){
+ return false;
+ }
+
+ StreamExpressionNamedParameter check = (StreamExpressionNamedParameter)other;
+
+ if(null == this.name && null != check.name){
+ return false;
+ }
+ if(null != this.name && null == check.name){
+ return false;
+ }
+
+ if(null != this.name && null != check.name && !this.name.equals(check.name)){
+ return false;
+ }
+
+ return this.parameter.equals(check.parameter);
+ }
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParameter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParameter.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParameter.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParameter.java Mon May 11 12:37:18 2015
@@ -0,0 +1,25 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Base interface of a stream parameter
+ */
+public interface StreamExpressionParameter {
+
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParser.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParser.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParser.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParser.java Mon May 11 12:37:18 2015
@@ -0,0 +1,313 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Takes a prefix notation expression and returns a tokenized expression
+ */
+public class StreamExpressionParser {
+
+
+ static char[] wordChars = {'_','.','-'};
+
+ static {
+ Arrays.sort(wordChars);
+ }
+
+ public static StreamExpression parse(String clause){
+ StreamExpressionParameter expr = generateStreamExpression(clause);
+ if(null != expr && expr instanceof StreamExpression){
+ return (StreamExpression)expr;
+ }
+
+ return null;
+ }
+
+ private static StreamExpressionParameter generateStreamExpression(String clause){
+ String working = clause.trim();
+
+ if(!isExpressionClause(working)){
+ throw new IllegalArgumentException(String.format(Locale.ROOT,"'%s' is not a proper expression clause", working));
+ }
+
+ // Get functionName
+ int firstOpenParen = findNextClear(working, 0, '(');
+ StreamExpression expression = new StreamExpression(working.substring(0, firstOpenParen).trim());
+
+ // strip off functionName and ()
+ working = working.substring(firstOpenParen + 1,working.length() - 1).trim();
+ List<String> parts = splitOn(working,',');
+
+ for(int idx = 0; idx < parts.size(); ++idx){
+ String part = parts.get(idx).trim();
+ if(isExpressionClause(part)){
+ StreamExpressionParameter parameter = generateStreamExpression(part);
+ if(null != parameter){
+ expression.addParameter(parameter);
+ }
+ }
+ else if(isNamedParameterClause(part)){
+ StreamExpressionNamedParameter parameter = generateNamedParameterExpression(part);
+ if(null != parameter){
+ expression.addParameter(parameter);
+ }
+ }
+ else{
+ expression.addParameter(new StreamExpressionValue(part));
+ }
+ }
+
+ return expression;
+ }
+
+ private static StreamExpressionNamedParameter generateNamedParameterExpression(String clause){
+ String working = clause.trim();
+
+ // might be overkill as the only place this is called from does this check already
+ if(!isNamedParameterClause(working)){
+ throw new IllegalArgumentException(String.format(Locale.ROOT,"'%s' is not a proper named parameter clause", working));
+ }
+
+ // Get name
+ int firstOpenEquals = findNextClear(working, 0, '=');
+ StreamExpressionNamedParameter namedParameter = new StreamExpressionNamedParameter(working.substring(0, firstOpenEquals).trim());
+
+ // we know this is ok because of the check in isNamedParameter
+ String parameter = working.substring(firstOpenEquals + 1, working.length());
+ if(isExpressionClause(parameter)){
+ namedParameter.setParameter(generateStreamExpression(parameter));
+ }
+ else{
+ // if wrapped in quotes, remove them
+ if(parameter.startsWith("\"") && parameter.endsWith("\"")){
+ parameter = parameter.substring(1, parameter.length() - 1).trim();
+ if(0 == parameter.length()){
+ throw new IllegalArgumentException(String.format(Locale.ROOT,"'%s' is not a proper named parameter clause", working));
+ }
+ }
+ namedParameter.setParameter(new StreamExpressionValue(parameter));
+ }
+
+ return namedParameter;
+ }
+
+
+ /* Returns true if the clause is a valid expression clause. This is defined to
+ * mean it begins with ( and ends with )
+ * Expects that the passed in clause has already been trimmed of leading and
+ * trailing spaces*/
+ private static boolean isExpressionClause(String clause){
+ // operator(.....something.....)
+
+ // must be balanced
+ if(!isBalanced(clause)){ return false; }
+
+ // find first (, then check from start to that location and only accept alphanumeric
+ int firstOpenParen = findNextClear(clause, 0, '(');
+ if(firstOpenParen <= 0 || firstOpenParen == clause.length() - 1){ return false; }
+ String functionName = clause.substring(0, firstOpenParen).trim();
+ if(!wordToken(functionName)){ return false; }
+
+ // Must end with )
+ return clause.endsWith(")");
+ }
+
+ private static boolean isNamedParameterClause(String clause){
+ // name=thing
+
+ // find first = then check from start to that location and only accept alphanumeric
+ int firstOpenEquals = findNextClear(clause, 0, '=');
+ if(firstOpenEquals <= 0 || firstOpenEquals == clause.length() - 1){ return false; }
+ String name = clause.substring(0, firstOpenEquals);
+ if(!wordToken(name)){ return false; }
+
+ return true;
+ }
+
+ /* Finds index of the next char equal to findThis that is not within a quote or set of parens
+ * Does not work with the following values of findThis: " ' \ ) -- well, it might but wouldn't
+ * really give you what you want. Don't call with those characters */
+ private static int findNextClear(String clause, int startingIdx, char findThis){
+ int openParens = 0;
+ boolean isDoubleQuote = false;
+ boolean isSingleQuote = false;
+ boolean isEscaped = false;
+
+ for(int idx = startingIdx; idx < clause.length(); ++idx){
+ char c = clause.charAt(idx);
+
+ // if we're not in a non-escaped quote or paren state, then we've found the space we want
+ if(c == findThis && !isEscaped && !isSingleQuote && !isDoubleQuote && 0 == openParens){
+ return idx;
+ }
+
+
+ switch(c){
+ case '\\':
+ // We invert to support situations where \\ exists
+ isEscaped = !isEscaped;
+ break;
+
+ case '"':
+ // if we're not in a non-escaped single quote state, then invert the double quote state
+ if(!isEscaped && !isSingleQuote){
+ isDoubleQuote = !isDoubleQuote;
+ }
+ isEscaped = false;
+ break;
+
+ case '\'':
+ // if we're not in a non-escaped double quote state, then invert the single quote state
+ if(!isEscaped && !isDoubleQuote){
+ isSingleQuote = !isSingleQuote;
+ }
+ isEscaped = false;
+ break;
+
+ case '(':
+ // if we're not in a non-escaped quote state, then increment the # of open parens
+ if(!isEscaped && !isSingleQuote && !isDoubleQuote){
+ openParens += 1;
+ }
+ isEscaped = false;
+ break;
+
+ case ')':
+ // if we're not in a non-escaped quote state, then decrement the # of open parens
+ if(!isEscaped && !isSingleQuote && !isDoubleQuote){
+ openParens -= 1;
+ }
+ isEscaped = false;
+ break;
+ default:
+ isEscaped = false;
+ }
+ }
+
+ // Not found
+ return -1;
+ }
+
+
+ /* Returns a list of the tokens found. Assumed to be of the form
+ * 'foo bar baz' and not of the for '(foo bar baz)'
+ * 'foo bar (baz jaz)' is ok and will return three tokens of
+ * 'foo', 'bar', and '(baz jaz)'
+ */
+ private static List<String> splitOn(String clause, char splitOnThis){
+ String working = clause.trim();
+
+ List<String> parts = new ArrayList<String>();
+
+ while(true){ // will break when next splitOnThis isn't found
+ int nextIdx = findNextClear(working, 0, splitOnThis);
+
+ if(nextIdx < 0){
+ parts.add(working);
+ break;
+ }
+
+ parts.add(working.substring(0, nextIdx));
+
+ // handle ending splitOnThis
+ if(nextIdx+1 == working.length()){
+ break;
+ }
+
+ working = working.substring(nextIdx + 1).trim();
+ }
+
+ return parts;
+ }
+
+ /* Returns true if the clause has balanced parenthesis */
+ private static boolean isBalanced(String clause){
+ int openParens = 0;
+ boolean isDoubleQuote = false;
+ boolean isSingleQuote = false;
+ boolean isEscaped = false;
+
+ for(int idx = 0; idx < clause.length(); ++idx){
+ char c = clause.charAt(idx);
+
+ switch(c){
+ case '\\':
+ // We invert to support situations where \\ exists
+ isEscaped = !isEscaped;
+ break;
+
+ case '"':
+ // if we're not in a non-escaped single quote state, then invert the double quote state
+ if(!isEscaped && !isSingleQuote){
+ isDoubleQuote = !isDoubleQuote;
+ }
+ isEscaped = false;
+ break;
+
+ case '\'':
+ // if we're not in a non-escaped double quote state, then invert the single quote state
+ if(!isEscaped && !isDoubleQuote){
+ isSingleQuote = !isSingleQuote;
+ }
+ isEscaped = false;
+ break;
+
+ case '(':
+ // if we're not in a non-escaped quote state, then increment the # of open parens
+ if(!isEscaped && !isSingleQuote && !isDoubleQuote){
+ openParens += 1;
+ }
+ isEscaped = false;
+ break;
+
+ case ')':
+ // if we're not in a non-escaped quote state, then decrement the # of open parens
+ if(!isEscaped && !isSingleQuote && !isDoubleQuote){
+ openParens -= 1;
+
+ // If we're ever < 0 then we know we're not balanced
+ if(openParens < 0){
+ return false;
+ }
+ }
+ isEscaped = false;
+ break;
+
+ default:
+ isEscaped = false;
+ }
+ }
+
+ return (0 == openParens);
+ }
+
+ public static boolean wordToken(String token) {
+ for(int i=0; i<token.length(); i++) {
+ char c = token.charAt(i);
+ if (!Character.isLetterOrDigit(c) && Arrays.binarySearch(wordChars, c) < 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionValue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionValue.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionValue.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionValue.java Mon May 11 12:37:18 2015
@@ -0,0 +1,66 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Basic string stream expression
+ */
+public class StreamExpressionValue implements StreamExpressionParameter {
+
+ private String value;
+
+ public StreamExpressionValue(String value){
+ this.value = value;
+ }
+
+ public String getValue(){
+ return this.value;
+ }
+
+ public void setValue(String value){
+ this.value = value;
+ }
+
+ public StreamExpressionValue withValue(String value){
+ this.value = value;
+ return this;
+ }
+
+ @Override
+ public String toString(){
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object other){
+ if(other.getClass() != StreamExpressionValue.class){
+ return false;
+ }
+
+ StreamExpressionValue check = (StreamExpressionValue)other;
+
+ if(null == this.value && null == check.value){
+ return true;
+ }
+ if(null == this.value || null == check.value){
+ return false;
+ }
+
+ return this.value.equals(((StreamExpressionValue)other).value);
+ }
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java Mon May 11 12:37:18 2015
@@ -0,0 +1,223 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.ExpressibleComparator;
+import org.apache.solr.client.solrj.io.comp.MultiComp;
+import org.apache.solr.client.solrj.io.stream.ExpressibleStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Used to convert strings into stream expressions
+ */
+public class StreamFactory implements Serializable {
+
+ private transient HashMap<String,String> collectionZkHosts;
+ private transient HashMap<String,Class> streamFunctions;
+
+ public StreamFactory(){
+ collectionZkHosts = new HashMap<String,String>();
+ streamFunctions = new HashMap<String,Class>();
+ }
+
+ public StreamFactory withCollectionZkHost(String collectionName, String zkHost){
+ this.collectionZkHosts.put(collectionName, zkHost);
+ return this;
+ }
+ public String getCollectionZkHost(String collectionName){
+ return this.collectionZkHosts.getOrDefault(collectionName, null);
+ }
+
+ public Map<String,Class> getStreamFunctions(){
+ return streamFunctions;
+ }
+ public StreamFactory withStreamFunction(String streamFunction, Class clazz){
+ this.streamFunctions.put(streamFunction, clazz);
+ return this;
+ }
+
+ public StreamExpressionParameter getOperand(StreamExpression expression, int parameterIndex){
+ if(null == expression.getParameters() || parameterIndex >= expression.getParameters().size()){
+ return null;
+ }
+
+ return expression.getParameters().get(parameterIndex);
+ }
+
+ /** Given an expression, will return the value parameter at the given index, or null if doesn't exist */
+ public String getValueOperand(StreamExpression expression, int parameterIndex){
+ StreamExpressionParameter parameter = getOperand(expression, parameterIndex);
+ if(null != parameter){
+ if(parameter instanceof StreamExpressionValue){
+ return ((StreamExpressionValue)parameter).getValue();
+ }
+ }
+
+ return null;
+ }
+
+ public List<StreamExpressionNamedParameter> getNamedOperands(StreamExpression expression){
+ List<StreamExpressionNamedParameter> namedParameters = new ArrayList<StreamExpressionNamedParameter>();
+ for(StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpressionNamedParameter.class)){
+ namedParameters.add((StreamExpressionNamedParameter)parameter);
+ }
+
+ return namedParameters;
+ }
+ public StreamExpressionNamedParameter getNamedOperand(StreamExpression expression, String name){
+ List<StreamExpressionNamedParameter> namedParameters = getNamedOperands(expression);
+ for(StreamExpressionNamedParameter param : namedParameters){
+ if(param.getName().equals(name)){
+ return param;
+ }
+ }
+
+ return null;
+ }
+
+ public List<StreamExpression> getExpressionOperands(StreamExpression expression){
+ List<StreamExpression> namedParameters = new ArrayList<StreamExpression>();
+ for(StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpression.class)){
+ namedParameters.add((StreamExpression)parameter);
+ }
+
+ return namedParameters;
+ }
+ public List<StreamExpression> getExpressionOperands(StreamExpression expression, String functionName){
+ List<StreamExpression> namedParameters = new ArrayList<StreamExpression>();
+ for(StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpression.class)){
+ StreamExpression expressionOperand = (StreamExpression)parameter;
+ if(expressionOperand.getFunctionName().equals(functionName)){
+ namedParameters.add(expressionOperand);
+ }
+ }
+
+ return namedParameters;
+ }
+ public List<StreamExpressionParameter> getOperandsOfType(StreamExpression expression, Class ... clazzes){
+ List<StreamExpressionParameter> parameters = new ArrayList<StreamExpressionParameter>();
+
+ parameterLoop:
+ for(StreamExpressionParameter parameter : expression.getParameters()){
+ for(Class clazz : clazzes){
+ if(!clazz.isAssignableFrom(parameter.getClass())){
+ continue parameterLoop; // go to the next parameter since this parameter cannot be assigned to at least one of the classes
+ }
+ }
+
+ parameters.add(parameter);
+ }
+
+ return parameters;
+ }
+
+ public List<StreamExpression> getExpressionOperandsRepresentingTypes(StreamExpression expression, Class ... clazzes){
+ List<StreamExpression> matchingStreamExpressions = new ArrayList<StreamExpression>();
+ List<StreamExpression> allStreamExpressions = getExpressionOperands(expression);
+
+ parameterLoop:
+ for(StreamExpression streamExpression : allStreamExpressions){
+ if(streamFunctions.containsKey(streamExpression.getFunctionName())){
+ for(Class clazz : clazzes){
+ if(!clazz.isAssignableFrom(streamFunctions.get(streamExpression.getFunctionName()))){
+ continue parameterLoop;
+ }
+ }
+
+ matchingStreamExpressions.add(streamExpression);
+ }
+ }
+
+ return matchingStreamExpressions;
+ }
+
+ public TupleStream constructStream(String expressionClause) throws IOException {
+ return constructStream(StreamExpressionParser.parse(expressionClause));
+ }
+ public TupleStream constructStream(StreamExpression expression) throws IOException{
+ String function = expression.getFunctionName();
+ if(streamFunctions.containsKey(function)){
+ Class clazz = streamFunctions.get(function);
+ if(ExpressibleStream.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){
+ TupleStream stream = (TupleStream)createInstance(streamFunctions.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
+ return stream;
+ }
+ }
+
+ throw new IOException(String.format(Locale.ROOT,"Invalid stream expression %s - function '%s' is unknown (not mapped to a valid TupleStream)", expression, expression.getFunctionName()));
+ }
+
+ public Comparator<Tuple> constructComparator(String comparatorString, Class comparatorType) throws IOException {
+ if(comparatorString.contains(",")){
+ String[] parts = comparatorString.split(",");
+ Comparator[] comps = new Comparator[parts.length];
+ for(int idx = 0; idx < parts.length; ++idx){
+ comps[idx] = constructComparator(parts[idx].trim(), comparatorType);
+ }
+ return new MultiComp(comps);
+ }
+ else{
+ String[] parts = comparatorString.split(" ");
+ if(2 != parts.length){
+ throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting fieldName and order",comparatorString));
+ }
+
+ String fieldName = parts[0].trim();
+ String order = parts[1].trim();
+
+ return (Comparator)createInstance(comparatorType, new Class[]{ String.class, ComparatorOrder.class }, new Object[]{ fieldName, ComparatorOrder.fromString(order) });
+ }
+ }
+
+ public <T> T createInstance(Class<T> clazz, Class<?>[] paramTypes, Object[] params) throws IOException{
+ // This should use SolrResourceLoader - TODO
+ // This is adding a restriction that the class has a public constructor - we may not want to do that
+ Constructor<T> ctor;
+ try {
+ ctor = clazz.getConstructor(paramTypes);
+ return ctor.newInstance(params);
+
+ } catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s", clazz.getName()),e);
+ }
+ }
+
+ public String getFunctionName(Class clazz) throws IOException{
+ for(Entry<String,Class> entry : streamFunctions.entrySet()){
+ if(entry.getValue() == clazz){
+ return entry.getKey();
+ }
+ }
+
+ throw new IOException(String.format(Locale.ROOT, "Unable to find function name for class '%s'", clazz.getName()));
+ }
+}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/package-info.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/package-info.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/package-info.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/package-info.java Mon May 11 12:37:18 2015
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+/**
+ * Expression language for the Streaming Aggregation API
+ **/
+package org.apache.solr.client.solrj.io.stream.expr;
+
+
+
+
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/package-info.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/package-info.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/package-info.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/package-info.java Mon May 11 12:37:18 2015
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+/**
+ * Stream implementations for the Streaming Aggregation API
+ **/
+package org.apache.solr.client.solrj.io.stream;
+
+
+
+
Modified: lucene/dev/trunk/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml?rev=1678743&r1=1678742&r2=1678743&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml (original)
+++ lucene/dev/trunk/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml Mon May 11 12:37:18 2015
@@ -52,8 +52,11 @@
<str name="wt">json</str>
<str name="distrib">false</str>
</lst>
+ <lst name="streamFunctions">
+ <str name="count">org.apache.solr.client.solrj.io.stream.CountStream</str>
+ </lst>
</requestHandler>
-
+
<requestDispatcher handleSelect="true" >
<requestParsers enableRemoteStreaming="false" multipartUploadLimitInKB="2048" />
</requestDispatcher>
Added: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,98 @@
+package org.apache.solr.client.solrj.io.stream;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class CountStream extends TupleStream implements ExpressibleStream, Serializable {
+
+ private TupleStream stream;
+ private int count;
+
+ public CountStream(TupleStream stream) {
+ this.stream = stream;
+ }
+
+ public CountStream(StreamExpression expression, StreamFactory factory) throws IOException{
+ List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class);
+
+ // validate expression contains only what we want.
+ if(expression.getParameters().size() != streamExpressions.size()){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+ }
+
+ if(1 != streamExpressions.size()){
+ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+ }
+
+ stream = factory.constructStream(streamExpressions.get(0));
+ }
+
+ @Override
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+ // function name
+ StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+ // stream
+ if(stream instanceof ExpressibleStream){
+ expression.addParameter(((ExpressibleStream)stream).toExpression(factory));
+ }
+ else{
+ throw new IOException("This CountStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+ }
+
+ return expression;
+ }
+
+ public void close() throws IOException {
+ this.stream.close();
+ }
+
+ public void open() throws IOException {
+ this.stream.open();
+ }
+
+ public List<TupleStream> children() {
+ List<TupleStream> l = new ArrayList();
+ l.add(stream);
+ return l;
+ }
+
+ public void setStreamContext(StreamContext streamContext) {
+ stream.setStreamContext(streamContext);
+ }
+
+ public Tuple read() throws IOException {
+ Tuple t = stream.read();
+ if(t.EOF) {
+ t.put("count", count);
+ return t;
+ } else {
+ ++count;
+ return t;
+ }
+ }
+}
\ No newline at end of file