You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/05/11 14:37:19 UTC

svn commit: r1678743 [2/3] - in /lucene/dev/trunk/solr: core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/response/ server/solr/configsets/data_driven_schema_configs/conf/ solrj/src/java/org/apache/solr/client/solrj/io/ solrj/src/jav...

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.ExpressibleComparator;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ *  Iterates over a TupleStream and buffers Tuples that are equal based on a comparator.
+ *  This allows tuples to be grouped by common field(s).
+ *
+ *  The read() method emits one tuple per group. The fields of the emitted Tuple reflect the first tuple
+ *  encountered in the group.
+ *
+ *  Use the Tuple.getMaps() method to return all the Tuples in the group. This method returns
+ *  a list of maps (including the group head), which hold the data for each Tuple in the group.
+ *
+ *  Note: The ReducerStream requires that it's underlying stream be sorted and partitioned by the same
+ *  fields as it's comparator.
+ *
+ **/
+
+public class ReducerStream extends TupleStream implements ExpressibleStream {
+
+  private static final long serialVersionUID = 1;
+
+  private PushBackStream tupleStream;
+  private Comparator<Tuple> comp;
+
+  private transient Tuple currentGroupHead;
+
+  public ReducerStream(TupleStream tupleStream,
+                       Comparator<Tuple> comp) {
+    this.tupleStream = new PushBackStream(tupleStream);
+    this.comp = comp;
+  }
+  
+  public ReducerStream(StreamExpression expression, StreamFactory factory) throws IOException{
+    // grab all parameters out
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class);
+    StreamExpressionNamedParameter byExpression = factory.getNamedOperand(expression, "by");
+    
+    // validate expression contains only what we want.
+    if(expression.getParameters().size() != streamExpressions.size() + 1){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+    }
+    
+    if(1 != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+    }
+    this.tupleStream = new PushBackStream(factory.constructStream(streamExpressions.get(0)));
+    
+    if(null == byExpression || !(byExpression.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to group by but didn't find one",expression));
+    }
+    
+    // Reducing is always done over equality, so always use an EqualTo comparator
+    this.comp = factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class);
+  }
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {    
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    
+    // stream
+    expression.addParameter(tupleStream.toExpression(factory));
+    
+    // over
+    if(comp instanceof ExpressibleComparator){
+      expression.addParameter(new StreamExpressionNamedParameter("by",((ExpressibleComparator)comp).toExpression(factory)));
+    }
+    else{
+      throw new IOException("This ReducerStream contains a non-expressible comparator - it cannot be converted to an expression");
+    }
+    
+    return expression;   
+  }
+  
+  public void setStreamContext(StreamContext context) {
+    this.tupleStream.setStreamContext(context);
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList();
+    l.add(tupleStream);
+    return l;
+  }
+
+  public void open() throws IOException {
+    tupleStream.open();
+  }
+
+  public void close() throws IOException {
+    tupleStream.close();
+  }
+
+  public Tuple read() throws IOException {
+
+    List<Map> maps = new ArrayList();
+    while(true) {
+      Tuple t = tupleStream.read();
+
+      if(t.EOF) {
+       if(maps.size() > 0) {
+         tupleStream.pushBack(t);
+         Map map1 = maps.get(0);
+         Map map2 = new HashMap();
+         map2.putAll(map1);
+         Tuple groupHead = new Tuple(map2);
+         groupHead.setMaps(maps);
+         return groupHead;
+       } else {
+         return t;
+       }
+      }
+
+      if(currentGroupHead == null) {
+        currentGroupHead = t;
+        maps.add(t.getMap());
+      } else {
+        if(comp.compare(currentGroupHead, t) == 0) {
+          maps.add(t.getMap());
+        } else {
+          Tuple groupHead = currentGroupHead.clone();
+          tupleStream.pushBack(t);
+          currentGroupHead = null;
+          groupHead.setMaps(maps);
+          return groupHead;
+        }
+      }
+    }
+  }
+
+  public int getCost() {
+    return 0;
+  }
+}
\ No newline at end of file

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Iterator;
+
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+
+/**
+*  Queries a single Solr instance and maps SolrDocs to a Stream of Tuples.
+**/
+
+public class SolrStream extends TupleStream {
+
+  private static final long serialVersionUID = 1;
+
+  private String baseUrl;
+  private Map params;
+  private int numWorkers;
+  private int workerID;
+  private boolean trace;
+  private Map<String, String> fieldMappings;
+  private transient JSONTupleStream jsonTupleStream;
+  private transient HttpSolrClient client;
+  private transient SolrClientCache cache;
+
+  public SolrStream(String baseUrl, Map params) {
+    this.baseUrl = baseUrl;
+    this.params = params;
+  }
+
+  public void setFieldMappings(Map<String, String> fieldMappings) {
+    this.fieldMappings = fieldMappings;
+  }
+
+  public List<TupleStream> children() {
+    return new ArrayList();
+  }
+
+  public String getBaseUrl() {
+    return baseUrl;
+  }
+
+  public void setStreamContext(StreamContext context) {
+    this.numWorkers = context.numWorkers;
+    this.workerID = context.workerID;
+    this.cache = context.getSolrClientCache();
+  }
+
+  /**
+  * Opens the stream to a single Solr instance.
+  **/
+
+  public void open() throws IOException {
+
+    if(cache == null) {
+      client = new HttpSolrClient(baseUrl);
+    } else {
+      client = cache.getHttpSolrClient(baseUrl);
+    }
+
+    try {
+      jsonTupleStream = JSONTupleStream.create(client, loadParams(params));
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   *  Setting trace to true will include the "_CORE_" field in each Tuple emitted by the stream.
+   **/
+
+  public void setTrace(boolean trace) {
+    this.trace = trace;
+  }
+
+  private SolrParams loadParams(Map params) throws IOException {
+    ModifiableSolrParams solrParams = new ModifiableSolrParams();
+    if(params.containsKey("partitionKeys")) {
+      if(!params.get("partitionKeys").equals("none")) {
+        String partitionFilter = getPartitionFilter();
+        solrParams.add("fq", partitionFilter);
+      }
+    } else {
+      if(numWorkers > 1) {
+        throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker.");
+      }
+    }
+
+    Iterator<Map.Entry> it = params.entrySet().iterator();
+    while(it.hasNext()) {
+      Map.Entry entry = it.next();
+      solrParams.add((String)entry.getKey(), entry.getValue().toString());
+    }
+
+    return solrParams;
+  }
+
+  private String getPartitionFilter() {
+    StringBuilder buf = new StringBuilder("{!hash workers=");
+    buf.append(this.numWorkers);
+    buf.append(" worker=");
+    buf.append(this.workerID);
+    buf.append("}");
+    return buf.toString();
+  }
+
+  /**
+  *  Closes the Stream to a single Solr Instance
+  * */
+
+  public void close() throws IOException {
+    jsonTupleStream.close();
+    if(cache == null) {
+      client.close();
+    }
+  }
+
+  /**
+  * Reads a Tuple from the stream. The Stream is completed when Tuple.EOF == true.
+  **/
+
+  public Tuple read() throws IOException {
+    Map fields = jsonTupleStream.next();
+
+    if(trace) {
+      fields.put("_CORE_", this.baseUrl);
+    }
+
+    if(fields == null) {
+      //Return the EOF tuple.
+      Map m = new HashMap();
+      m.put("EOF", true);
+      return new Tuple(m);
+    } else {
+      if(fieldMappings != null) {
+        fields = mapFields(fields, fieldMappings);
+      }
+      return new Tuple(fields);
+    }
+  }
+
+  private Map mapFields(Map fields, Map<String,String> mappings) {
+
+    Iterator<Map.Entry<String,String>> it = mappings.entrySet().iterator();
+    while(it.hasNext()) {
+      Map.Entry<String,String> entry = it.next();
+      String mapFrom = entry.getKey();
+      String mapTo = entry.getValue();
+      Object o = fields.get(mapFrom);
+      fields.remove(mapFrom);
+      fields.put(mapTo, o);
+    }
+
+    return fields;
+  }
+}
\ No newline at end of file

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java Mon May 11 12:37:18 2015
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+/**
+ *  The StreamContext is passed to TupleStreams using the TupleStream.setStreamContext() method.
+ *  The StreamContext is used to pass shared context to concentrically wrapped TupleStreams.
+ *
+ *  Note: The StreamContext contains the SolrClientCache which is used to cache SolrClients for reuse
+ *  across multiple TupleStreams.
+ **/
+
+
+public class StreamContext implements Serializable{
+
+  private Map entries = new HashMap();
+  public int workerID;
+  public int numWorkers;
+  private SolrClientCache clientCache;
+  private StreamFactory streamFactory;
+
+  public Object get(Object key) {
+    return entries.get(key);
+  }
+
+  public void put(Object key, Object value) {
+    this.entries.put(key, value);
+  }
+
+  public void setSolrClientCache(SolrClientCache clientCache) {
+    this.clientCache = clientCache;
+  }
+
+  public SolrClientCache getSolrClientCache() {
+    return this.clientCache;
+  }
+
+  public void setStreamFactory(StreamFactory streamFactory) {
+    this.streamFactory = streamFactory;
+  }
+
+  public StreamFactory getStreamFactory() {
+    return this.streamFactory;
+  }
+}
\ No newline at end of file

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public abstract class TupleStream implements Serializable {
+
+  private static final long serialVersionUID = 1;
+
+  public TupleStream() {
+
+  }
+  
+  public abstract void setStreamContext(StreamContext context);
+
+  public abstract List<TupleStream> children();
+
+  public abstract void open() throws IOException;
+
+  public abstract void close() throws IOException;
+
+  public abstract Tuple read() throws IOException;
+
+  public int getCost() {
+    return 0;
+  }
+}
\ No newline at end of file

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.ExpressibleComparator;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+
+/**
+ * The UniqueStream emits a unique stream of Tuples based on a Comparator.
+ *
+ * Note: The sort order of the underlying stream must match the Comparator.
+ **/
+
+public class UniqueStream extends TupleStream implements ExpressibleStream {
+
+  private static final long serialVersionUID = 1;
+
+  private TupleStream tupleStream;
+  private Comparator<Tuple> comp;
+  private transient Tuple currentTuple;
+
+  public UniqueStream(TupleStream tupleStream, Comparator<Tuple> comp) {
+    this.tupleStream = tupleStream;
+    this.comp = comp;
+  }
+  
+  public UniqueStream(StreamExpression expression,StreamFactory factory) throws IOException {
+    // grab all parameters out
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class);
+    StreamExpressionNamedParameter overExpression = factory.getNamedOperand(expression, "over");
+    
+    // validate expression contains only what we want.
+    if(expression.getParameters().size() != streamExpressions.size() + 1){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+    }
+    
+    if(1 != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+    }
+    this.tupleStream = factory.constructStream(streamExpressions.get(0));
+    
+    if(null == overExpression || !(overExpression.getParameter() instanceof StreamExpressionValue)){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'over' parameter listing fields to unique over but didn't find one",expression));
+    }
+    
+    // Uniqueness is always done over equality, so always use an EqualTo comparator
+    this.comp = factory.constructComparator(((StreamExpressionValue)overExpression.getParameter()).getValue(), FieldComparator.class);
+  }
+
+  @Override
+  public StreamExpression toExpression(StreamFactory factory) throws IOException {    
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    
+    // streams
+    if(tupleStream instanceof ExpressibleStream){
+      expression.addParameter(((ExpressibleStream)tupleStream).toExpression(factory));
+    }
+    else{
+      throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+    }
+    
+    // over
+    if(comp instanceof ExpressibleComparator){
+      expression.addParameter(new StreamExpressionNamedParameter("over",((ExpressibleComparator)comp).toExpression(factory)));
+    }
+    else{
+      throw new IOException("This UniqueStream contains a non-expressible comparator - it cannot be converted to an expression");
+    }
+    
+    return expression;   
+  }
+  
+  public void setComp(Comparator<Tuple> comp) {
+    this.comp = comp;
+  }
+  
+  public void setStreamContext(StreamContext context) {
+    this.tupleStream.setStreamContext(context);
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList<TupleStream>();
+    l.add(tupleStream);
+    return l;
+  }
+
+  public void open() throws IOException {
+    tupleStream.open();
+  }
+
+  public void close() throws IOException {
+    tupleStream.close();
+  }
+
+  public Tuple read() throws IOException {
+    Tuple tuple = tupleStream.read();
+    if(tuple.EOF) {
+      return tuple;
+    }
+
+    if(currentTuple == null) {
+      currentTuple = tuple;
+      return tuple;
+    } else {
+      while(true) {
+        int i = comp.compare(currentTuple, tuple);
+        if(i == 0) {
+          //We have duplicate tuple so read the next tuple from the stream.
+          tuple = tupleStream.read();
+          if(tuple.EOF) {
+            return tuple;
+          }
+        } else {
+          //We have a non duplicate
+          this.currentTuple = tuple;
+          return tuple;
+        }
+      }
+    }
+  }
+
+  public int getCost() {
+    return 0;
+  }
+
+}
\ No newline at end of file

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpression.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpression.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpression.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpression.java Mon May 11 12:37:18 2015
@@ -0,0 +1,127 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Expression containing a function and set of parameters
+ */
+public class StreamExpression implements StreamExpressionParameter {
+  private String functionName;
+  private List<StreamExpressionParameter> parameters;
+  
+  public StreamExpression(String functionName){
+    this.functionName = functionName;
+    parameters = new ArrayList<StreamExpressionParameter>();
+  }
+  
+  public String getFunctionName(){
+    return this.functionName;
+  }
+  public void setFunctionName(String functionName){
+    if(null == functionName){
+      throw new IllegalArgumentException("Null functionName is not allowed.");
+    }
+    
+    this.functionName = functionName;
+  }
+  public StreamExpression withFunctionName(String functionName){
+    setFunctionName(functionName);
+    return this;
+  }  
+  
+  public void addParameter(StreamExpressionParameter parameter){
+    this.parameters.add(parameter);
+  }
+  public void addParameter(String parameter){
+    addParameter(new StreamExpressionValue(parameter));
+  }
+
+  public StreamExpression withParameter(StreamExpressionParameter parameter){
+    this.parameters.add(parameter);
+    return this;
+  }
+  public StreamExpression withParameter(String parameter){
+    return withParameter(new StreamExpressionValue(parameter));
+  }
+  
+  public List<StreamExpressionParameter> getParameters(){
+    return this.parameters;
+  }
+  public void setParameters(List<StreamExpressionParameter> parameters){
+    if(null == parameters){
+      throw new IllegalArgumentException("Null parameter list is not allowed.");
+    }
+    
+    this.parameters = parameters;
+  }
+  public StreamExpression withParameters(List<StreamExpressionParameter> parameters){
+    setParameters(parameters);
+    return this;
+  }
+  
+  @Override
+  public String toString(){
+    StringBuilder sb = new StringBuilder(this.functionName);
+    
+    sb.append("(");
+    for(int idx = 0; idx < parameters.size(); ++idx){
+      if(0 != idx){ sb.append(","); }
+      sb.append(parameters.get(idx));
+    }
+    sb.append(")");
+    
+    return sb.toString();
+  }
+  
+  @Override
+  public boolean equals(Object other){
+    if(other.getClass() != StreamExpression.class){
+      return false;
+    }
+    
+    StreamExpression check = (StreamExpression)other;
+    
+    if(null == this.functionName && null != check.functionName){
+      return false;
+    }
+    if(null != this.functionName && null == check.functionName){
+      return false;
+    }
+    
+    if(null != this.functionName && null != check.functionName && !this.functionName.equals(check.functionName)){
+      return false;
+    }
+    
+    if(this.parameters.size() != check.parameters.size()){
+      return false;
+    }
+    
+    for(int idx = 0; idx < this.parameters.size(); ++idx){
+      StreamExpressionParameter left = this.parameters.get(idx);
+      StreamExpressionParameter right = check.parameters.get(idx);
+      if(!left.equals(right)){
+        return false;
+      }
+    }
+
+    return true;
+  }
+}

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionNamedParameter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionNamedParameter.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionNamedParameter.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionNamedParameter.java Mon May 11 12:37:18 2015
@@ -0,0 +1,111 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provides a named parameter
+ */
+public class StreamExpressionNamedParameter implements StreamExpressionParameter {
+  private String name;
+  private StreamExpressionParameter parameter;
+  
+  public StreamExpressionNamedParameter(String name){
+    this.name = name;
+  }
+  public StreamExpressionNamedParameter(String name, String parameter){
+    this.name = name;
+    setParameter(parameter);
+  }
+  public StreamExpressionNamedParameter(String name, StreamExpressionParameter parameter){
+    this.name = name;
+    setParameter(parameter);
+  }
+  
+  public String getName(){
+    return this.name;
+  }
+  public void setName(String name){
+    if(null == name || 0 == name.length()){
+      throw new IllegalArgumentException("Null or empty name is not allowed is not allowed.");
+    }
+    
+    this.name = name;
+  }
+  
+  public StreamExpressionParameter getParameter(){
+    return this.parameter;
+  }
+  public void setParameter(StreamExpressionParameter parameter){
+    this.parameter = parameter;
+  }
+  public StreamExpressionNamedParameter withParameter(StreamExpressionParameter parameter){
+    setParameter(parameter);
+    return this;
+  }
+  public void setParameter(String parameter){
+    this.parameter = new StreamExpressionValue(parameter);
+  }
+  public StreamExpressionNamedParameter withParameter(String parameter){
+    setParameter(parameter);
+    return this;
+  }  
+  
+  @Override
+  public String toString(){
+    StringBuilder sb = new StringBuilder(name);
+    sb.append("=");
+    
+    // check if we require quoting
+    boolean requiresQuote = false;
+    if(parameter instanceof StreamExpressionValue){
+      String value = ((StreamExpressionValue)parameter).getValue();
+      requiresQuote = !StreamExpressionParser.wordToken(value);
+    }
+    
+    if(requiresQuote){ sb.append("\""); }
+    sb.append(parameter.toString());
+    if(requiresQuote){ sb.append("\""); }
+    
+    return sb.toString();
+  }
+  
+  @Override
+  public boolean equals(Object other){
+    if(other.getClass() != StreamExpressionNamedParameter.class){
+      return false;
+    }
+    
+    StreamExpressionNamedParameter check = (StreamExpressionNamedParameter)other;
+    
+    if(null == this.name && null != check.name){
+      return false;
+    }
+    if(null != this.name && null == check.name){
+      return false;
+    }
+    
+    if(null != this.name && null != check.name && !this.name.equals(check.name)){
+      return false;
+    }
+    
+    return this.parameter.equals(check.parameter);
+  }
+}

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParameter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParameter.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParameter.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParameter.java Mon May 11 12:37:18 2015
@@ -0,0 +1,25 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Base interface of a stream parameter
+ */
+public interface StreamExpressionParameter {
+  
+}

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParser.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParser.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParser.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionParser.java Mon May 11 12:37:18 2015
@@ -0,0 +1,313 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Takes a prefix notation expression and returns a tokenized expression
+ */
+public class StreamExpressionParser {
+
+
+  static char[] wordChars = {'_','.','-'};
+
+  static {
+    Arrays.sort(wordChars);
+  }
+
+  public static StreamExpression parse(String clause){
+    StreamExpressionParameter expr = generateStreamExpression(clause);
+    if(null != expr && expr instanceof StreamExpression){
+      return (StreamExpression)expr;
+    }
+    
+    return null;
+  }
+  
+  private static StreamExpressionParameter generateStreamExpression(String clause){    
+    String working = clause.trim();
+    
+    if(!isExpressionClause(working)){
+      throw new IllegalArgumentException(String.format(Locale.ROOT,"'%s' is not a proper expression clause", working));
+    }
+    
+    // Get functionName
+    int firstOpenParen = findNextClear(working, 0, '(');
+    StreamExpression expression = new StreamExpression(working.substring(0, firstOpenParen).trim());
+
+    // strip off functionName and ()
+    working = working.substring(firstOpenParen + 1,working.length() - 1).trim();
+    List<String> parts = splitOn(working,',');
+        
+    for(int idx = 0; idx < parts.size(); ++idx){
+      String part = parts.get(idx).trim();
+      if(isExpressionClause(part)){
+        StreamExpressionParameter parameter = generateStreamExpression(part);
+        if(null != parameter){
+          expression.addParameter(parameter);
+        }
+      }
+      else if(isNamedParameterClause(part)){
+        StreamExpressionNamedParameter parameter = generateNamedParameterExpression(part);
+        if(null != parameter){
+          expression.addParameter(parameter);
+        }
+      }
+      else{
+        expression.addParameter(new StreamExpressionValue(part));
+      }
+    }
+    
+    return expression;
+  }
+
+  private static StreamExpressionNamedParameter generateNamedParameterExpression(String clause){    
+    String working = clause.trim();
+    
+    // might be overkill as the only place this is called from does this check already
+    if(!isNamedParameterClause(working)){
+      throw new IllegalArgumentException(String.format(Locale.ROOT,"'%s' is not a proper named parameter clause", working));
+    }
+    
+    // Get name
+    int firstOpenEquals = findNextClear(working, 0, '=');
+    StreamExpressionNamedParameter namedParameter = new StreamExpressionNamedParameter(working.substring(0, firstOpenEquals).trim());
+
+    // we know this is ok because of the check in isNamedParameter
+    String parameter = working.substring(firstOpenEquals + 1, working.length());
+    if(isExpressionClause(parameter)){
+      namedParameter.setParameter(generateStreamExpression(parameter));
+    }
+    else{
+      // if wrapped in quotes, remove them
+      if(parameter.startsWith("\"") && parameter.endsWith("\"")){
+        parameter = parameter.substring(1, parameter.length() - 1).trim();
+        if(0 == parameter.length()){
+          throw new IllegalArgumentException(String.format(Locale.ROOT,"'%s' is not a proper named parameter clause", working));
+        }
+      }
+      namedParameter.setParameter(new StreamExpressionValue(parameter));
+    }
+    
+    return namedParameter;
+  }
+
+  
+  /* Returns true if the clause is a valid expression clause. This is defined to
+   * mean it begins with ( and ends with )
+   * Expects that the passed in clause has already been trimmed of leading and
+   * trailing spaces*/
+  private static boolean isExpressionClause(String clause){
+    // operator(.....something.....)
+    
+    // must be balanced
+    if(!isBalanced(clause)){ return false; }
+    
+    // find first (, then check from start to that location and only accept alphanumeric
+    int firstOpenParen = findNextClear(clause, 0, '(');
+    if(firstOpenParen <= 0 || firstOpenParen == clause.length() - 1){ return false; }
+    String functionName = clause.substring(0, firstOpenParen).trim();
+    if(!wordToken(functionName)){ return false; }
+    
+    // Must end with )
+    return clause.endsWith(")");
+  }
+  
+  private static boolean isNamedParameterClause(String clause){
+    // name=thing
+    
+    // find first = then check from start to that location and only accept alphanumeric
+    int firstOpenEquals = findNextClear(clause, 0, '=');
+    if(firstOpenEquals <= 0 || firstOpenEquals == clause.length() - 1){ return false; }
+    String name = clause.substring(0, firstOpenEquals);
+    if(!wordToken(name)){ return false; }
+    
+    return true;
+  }
+  
+  /* Finds index of the next char equal to findThis that is not within a quote or set of parens
+   * Does not work with the following values of findThis: " ' \ ) -- well, it might but wouldn't
+   * really give you what you want. Don't call with those characters */
+  private static int findNextClear(String clause, int startingIdx, char findThis){
+    int openParens = 0;
+    boolean isDoubleQuote = false;
+    boolean isSingleQuote = false;
+    boolean isEscaped = false;
+    
+    for(int idx = startingIdx; idx < clause.length(); ++idx){
+      char c = clause.charAt(idx);
+      
+      // if we're not in a non-escaped quote or paren state, then we've found the space we want
+      if(c == findThis && !isEscaped && !isSingleQuote && !isDoubleQuote && 0 == openParens){
+        return idx;
+      }
+      
+      
+      switch(c){
+        case '\\':
+          // We invert to support situations where \\ exists
+          isEscaped = !isEscaped;
+          break;
+          
+        case '"':
+          // if we're not in a non-escaped single quote state, then invert the double quote state
+          if(!isEscaped && !isSingleQuote){
+            isDoubleQuote = !isDoubleQuote;
+          }
+          isEscaped = false;
+          break;
+        
+        case '\'':
+          // if we're not in a non-escaped double quote state, then invert the single quote state
+          if(!isEscaped && !isDoubleQuote){
+            isSingleQuote = !isSingleQuote;
+          }
+          isEscaped = false;
+          break;
+          
+        case '(':
+          // if we're not in a non-escaped quote state, then increment the # of open parens
+          if(!isEscaped && !isSingleQuote && !isDoubleQuote){
+            openParens += 1;
+          }
+          isEscaped = false;
+          break;
+          
+        case ')':
+          // if we're not in a non-escaped quote state, then decrement the # of open parens
+          if(!isEscaped && !isSingleQuote && !isDoubleQuote){
+            openParens -= 1;
+          }
+          isEscaped = false;
+          break;
+        default:
+          isEscaped = false;
+      }
+    }
+
+    // Not found
+    return -1;    
+  }
+
+  
+  /* Returns a list of the tokens found. Assumed to be of the form
+   * 'foo bar baz' and not of the for '(foo bar baz)'
+   * 'foo bar (baz jaz)' is ok and will return three tokens of
+   * 'foo', 'bar', and '(baz jaz)'
+   */
+  private static List<String> splitOn(String clause, char splitOnThis){
+    String working = clause.trim();
+    
+    List<String> parts = new ArrayList<String>();
+    
+    while(true){ // will break when next splitOnThis isn't found
+      int nextIdx = findNextClear(working, 0, splitOnThis);
+      
+      if(nextIdx < 0){
+        parts.add(working);
+        break;
+      }
+      
+      parts.add(working.substring(0, nextIdx));
+      
+      // handle ending splitOnThis
+      if(nextIdx+1 == working.length()){
+        break;
+      }
+      
+      working = working.substring(nextIdx + 1).trim();      
+    }
+    
+    return parts;
+  }
+  
+  /* Returns true if the clause has balanced parenthesis */
+  private static boolean isBalanced(String clause){
+    int openParens = 0;
+    boolean isDoubleQuote = false;
+    boolean isSingleQuote = false;
+    boolean isEscaped = false;
+    
+    for(int idx = 0; idx < clause.length(); ++idx){
+      char c = clause.charAt(idx);
+      
+      switch(c){
+        case '\\':
+          // We invert to support situations where \\ exists
+          isEscaped = !isEscaped;
+          break;
+          
+        case '"':
+          // if we're not in a non-escaped single quote state, then invert the double quote state
+          if(!isEscaped && !isSingleQuote){
+            isDoubleQuote = !isDoubleQuote;
+          }
+          isEscaped = false;
+          break;
+        
+        case '\'':
+          // if we're not in a non-escaped double quote state, then invert the single quote state
+          if(!isEscaped && !isDoubleQuote){
+            isSingleQuote = !isSingleQuote;
+          }
+          isEscaped = false;
+          break;
+          
+        case '(':
+          // if we're not in a non-escaped quote state, then increment the # of open parens
+          if(!isEscaped && !isSingleQuote && !isDoubleQuote){
+            openParens += 1;
+          }
+          isEscaped = false;
+          break;
+          
+        case ')':
+          // if we're not in a non-escaped quote state, then decrement the # of open parens
+          if(!isEscaped && !isSingleQuote && !isDoubleQuote){
+            openParens -= 1;
+            
+            // If we're ever < 0 then we know we're not balanced
+            if(openParens < 0){
+              return false;
+            }
+          }
+          isEscaped = false;
+          break;
+          
+        default:
+          isEscaped = false;
+      }
+    }
+
+    return (0 == openParens);
+  }
+
+  public static boolean wordToken(String token) {
+    for(int i=0; i<token.length(); i++) {
+      char c = token.charAt(i);
+      if (!Character.isLetterOrDigit(c) && Arrays.binarySearch(wordChars, c) < 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionValue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionValue.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionValue.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExpressionValue.java Mon May 11 12:37:18 2015
@@ -0,0 +1,66 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** 
+ * Basic string stream expression
+ */
+public class StreamExpressionValue implements StreamExpressionParameter {
+  
+  private String value;
+  
+  public StreamExpressionValue(String value){
+    this.value = value;
+  }
+  
+  public String getValue(){
+    return this.value;
+  }
+  
+  public void setValue(String value){
+    this.value = value;
+  }
+  
+  public StreamExpressionValue withValue(String value){
+    this.value = value;
+    return this;
+  }
+  
+  @Override
+  public String toString(){
+    return this.value;
+  }
+  
+  @Override
+  public boolean equals(Object other){
+    if(other.getClass() != StreamExpressionValue.class){
+      return false;
+    }
+    
+    StreamExpressionValue check = (StreamExpressionValue)other;
+    
+    if(null == this.value && null == check.value){
+      return true;
+    }
+    if(null == this.value || null == check.value){
+      return false;
+    }
+    
+    return this.value.equals(((StreamExpressionValue)other).value);
+  }
+}

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java Mon May 11 12:37:18 2015
@@ -0,0 +1,223 @@
+package org.apache.solr.client.solrj.io.stream.expr;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.ExpressibleComparator;
+import org.apache.solr.client.solrj.io.comp.MultiComp;
+import org.apache.solr.client.solrj.io.stream.ExpressibleStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Used to convert strings into stream expressions
+ */
+public class StreamFactory implements Serializable {
+  
+  private transient HashMap<String,String> collectionZkHosts;
+  private transient HashMap<String,Class> streamFunctions;
+  
+  public StreamFactory(){
+    collectionZkHosts = new HashMap<String,String>();
+    streamFunctions = new HashMap<String,Class>();
+  }
+  
+  public StreamFactory withCollectionZkHost(String collectionName, String zkHost){
+    this.collectionZkHosts.put(collectionName, zkHost);
+    return this;
+  }
+  public String getCollectionZkHost(String collectionName){
+    return this.collectionZkHosts.getOrDefault(collectionName, null);
+  }
+  
+  public Map<String,Class> getStreamFunctions(){
+    return streamFunctions;
+  }
+  public StreamFactory withStreamFunction(String streamFunction, Class clazz){
+    this.streamFunctions.put(streamFunction, clazz);
+    return this;
+  }
+  
+  public StreamExpressionParameter getOperand(StreamExpression expression, int parameterIndex){
+    if(null == expression.getParameters() || parameterIndex >= expression.getParameters().size()){
+      return null;
+    }
+    
+    return expression.getParameters().get(parameterIndex);
+  }
+  
+  /** Given an expression, will return the value parameter at the given index, or null if doesn't exist */
+  public String getValueOperand(StreamExpression expression, int parameterIndex){
+    StreamExpressionParameter parameter = getOperand(expression, parameterIndex);
+    if(null != parameter){ 
+      if(parameter instanceof StreamExpressionValue){
+        return ((StreamExpressionValue)parameter).getValue();
+      }
+    }
+    
+    return null;
+  }
+  
+  public List<StreamExpressionNamedParameter> getNamedOperands(StreamExpression expression){
+    List<StreamExpressionNamedParameter> namedParameters = new ArrayList<StreamExpressionNamedParameter>();
+    for(StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpressionNamedParameter.class)){
+      namedParameters.add((StreamExpressionNamedParameter)parameter);
+    }
+    
+    return namedParameters;
+  }
+  public StreamExpressionNamedParameter getNamedOperand(StreamExpression expression, String name){
+    List<StreamExpressionNamedParameter> namedParameters = getNamedOperands(expression);
+    for(StreamExpressionNamedParameter param : namedParameters){
+      if(param.getName().equals(name)){
+        return param;
+      }
+    }
+    
+    return null;
+  }
+  
+  public List<StreamExpression> getExpressionOperands(StreamExpression expression){
+    List<StreamExpression> namedParameters = new ArrayList<StreamExpression>();
+    for(StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpression.class)){
+      namedParameters.add((StreamExpression)parameter);
+    }
+    
+    return namedParameters;
+  }
+  public List<StreamExpression> getExpressionOperands(StreamExpression expression, String functionName){
+    List<StreamExpression> namedParameters = new ArrayList<StreamExpression>();
+    for(StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpression.class)){
+      StreamExpression expressionOperand = (StreamExpression)parameter;
+      if(expressionOperand.getFunctionName().equals(functionName)){
+        namedParameters.add(expressionOperand);
+      }
+    }
+    
+    return namedParameters;
+  }
+  public List<StreamExpressionParameter> getOperandsOfType(StreamExpression expression, Class ... clazzes){
+    List<StreamExpressionParameter> parameters = new ArrayList<StreamExpressionParameter>();
+    
+    parameterLoop:
+     for(StreamExpressionParameter parameter : expression.getParameters()){
+      for(Class clazz : clazzes){
+        if(!clazz.isAssignableFrom(parameter.getClass())){
+          continue parameterLoop; // go to the next parameter since this parameter cannot be assigned to at least one of the classes
+        }
+      }
+      
+      parameters.add(parameter);
+    }
+    
+    return parameters;
+  }
+  
+  public List<StreamExpression> getExpressionOperandsRepresentingTypes(StreamExpression expression, Class ... clazzes){
+    List<StreamExpression> matchingStreamExpressions = new ArrayList<StreamExpression>();
+    List<StreamExpression> allStreamExpressions = getExpressionOperands(expression);
+    
+    parameterLoop:
+    for(StreamExpression streamExpression : allStreamExpressions){
+      if(streamFunctions.containsKey(streamExpression.getFunctionName())){
+        for(Class clazz : clazzes){
+          if(!clazz.isAssignableFrom(streamFunctions.get(streamExpression.getFunctionName()))){
+            continue parameterLoop;
+          }
+        }
+        
+        matchingStreamExpressions.add(streamExpression);
+      }
+    }
+    
+    return matchingStreamExpressions;   
+  }
+  
+  public TupleStream constructStream(String expressionClause) throws IOException {
+    return constructStream(StreamExpressionParser.parse(expressionClause));
+  }
+  public TupleStream constructStream(StreamExpression expression) throws IOException{
+    String function = expression.getFunctionName();
+    if(streamFunctions.containsKey(function)){
+      Class clazz = streamFunctions.get(function);
+      if(ExpressibleStream.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){
+        TupleStream stream = (TupleStream)createInstance(streamFunctions.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
+        return stream;
+      }
+    }
+    
+    throw new IOException(String.format(Locale.ROOT,"Invalid stream expression %s - function '%s' is unknown (not mapped to a valid TupleStream)", expression, expression.getFunctionName()));
+  }
+
+  public Comparator<Tuple> constructComparator(String comparatorString, Class comparatorType) throws IOException {
+    if(comparatorString.contains(",")){
+      String[] parts = comparatorString.split(",");
+      Comparator[] comps = new Comparator[parts.length];
+      for(int idx = 0; idx < parts.length; ++idx){
+        comps[idx] = constructComparator(parts[idx].trim(), comparatorType);
+      }
+      return new MultiComp(comps);
+    }
+    else{
+      String[] parts = comparatorString.split(" ");
+      if(2 != parts.length){
+        throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting fieldName and order",comparatorString));
+      }
+      
+      String fieldName = parts[0].trim();
+      String order = parts[1].trim();
+      
+      return (Comparator)createInstance(comparatorType, new Class[]{ String.class, ComparatorOrder.class }, new Object[]{ fieldName, ComparatorOrder.fromString(order) });
+    }
+  }
+    
+  public <T> T createInstance(Class<T> clazz, Class<?>[] paramTypes, Object[] params) throws IOException{
+    // This should use SolrResourceLoader - TODO
+    // This is adding a restriction that the class has a public constructor - we may not want to do that
+    Constructor<T> ctor;
+    try {
+      ctor = clazz.getConstructor(paramTypes);
+      return ctor.newInstance(params);
+      
+    } catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+      throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s", clazz.getName()),e);
+    }
+  }
+  
+  public String getFunctionName(Class clazz) throws IOException{
+    for(Entry<String,Class> entry : streamFunctions.entrySet()){
+      if(entry.getValue() == clazz){
+        return entry.getKey();
+      }
+    }
+    
+    throw new IOException(String.format(Locale.ROOT, "Unable to find function name for class '%s'", clazz.getName()));
+  }
+}

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/package-info.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/package-info.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/package-info.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/package-info.java Mon May 11 12:37:18 2015
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+/**
+ * Expression language for the Streaming Aggregation API
+ **/
+package org.apache.solr.client.solrj.io.stream.expr;
+
+
+
+

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/package-info.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/package-info.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/package-info.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/package-info.java Mon May 11 12:37:18 2015
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+/**
+ * Stream implementations for the Streaming Aggregation API
+ **/
+package org.apache.solr.client.solrj.io.stream;
+
+
+
+

Modified: lucene/dev/trunk/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml?rev=1678743&r1=1678742&r2=1678743&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml (original)
+++ lucene/dev/trunk/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml Mon May 11 12:37:18 2015
@@ -52,8 +52,11 @@
       <str name="wt">json</str>
       <str name="distrib">false</str>
     </lst>
+    <lst name="streamFunctions">
+      <str name="count">org.apache.solr.client.solrj.io.stream.CountStream</str>
+    </lst>
   </requestHandler>
-
+  
   <requestDispatcher handleSelect="true" >
     <requestParsers enableRemoteStreaming="false" multipartUploadLimitInKB="2048" />
   </requestDispatcher>

Added: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java?rev=1678743&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java (added)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java Mon May 11 12:37:18 2015
@@ -0,0 +1,98 @@
+package  org.apache.solr.client.solrj.io.stream;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class CountStream extends TupleStream implements ExpressibleStream, Serializable {
+
+  private TupleStream stream;
+  private int count;
+
+  public CountStream(TupleStream stream) {
+    this.stream = stream;
+  }
+  
+  public CountStream(StreamExpression expression, StreamFactory factory) throws IOException{
+    List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class);
+    
+    // validate expression contains only what we want.
+    if(expression.getParameters().size() != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
+    }
+        
+    if(1 != streamExpressions.size()){
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
+    }
+    
+    stream = factory.constructStream(streamExpressions.get(0));
+  }
+  
+  @Override
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+    
+    // stream
+    if(stream instanceof ExpressibleStream){
+      expression.addParameter(((ExpressibleStream)stream).toExpression(factory));
+    }
+    else{
+      throw new IOException("This CountStream contains a non-expressible TupleStream - it cannot be converted to an expression");
+    }
+    
+    return expression;
+  }
+
+  public void close() throws IOException {
+    this.stream.close();
+  }
+
+  public void open() throws IOException {
+    this.stream.open();
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l = new ArrayList();
+    l.add(stream);
+    return l;
+  }
+
+  public void setStreamContext(StreamContext streamContext) {
+    stream.setStreamContext(streamContext);
+  }
+
+  public Tuple read() throws IOException {
+    Tuple t = stream.read();
+    if(t.EOF) {
+      t.put("count", count);
+      return t;
+    } else {
+      ++count;
+      return t;
+    }
+  }
+}
\ No newline at end of file