You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2015/07/21 22:28:35 UTC

svn commit: r1692193 - in /lucene/dev/trunk/solr: core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/response/ core/src/java/org/apache/solr/search/ core/src/test-files/solr/collection1/conf/ core/src/test/org/apache/solr/handler/ ser...

Author: jbernste
Date: Tue Jul 21 20:28:35 2015
New Revision: 1692193

URL: http://svn.apache.org/r1692193
Log:
SOLR-7441:Improve overall robustness of the Streaming stack: Streaming API, Streaming Expressions, Parallel SQL

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java
    lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
    lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml
    lucene/dev/trunk/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SQLHandler.java Tue Jul 21 20:28:35 2015
@@ -19,7 +19,6 @@ package org.apache.solr.handler;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Locale;
@@ -43,7 +42,9 @@ import org.apache.solr.client.solrj.io.s
 import org.apache.solr.client.solrj.io.stream.TupleStream;
 import org.apache.solr.client.solrj.io.stream.ExceptionStream;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreContainer;
@@ -63,9 +64,8 @@ import com.facebook.presto.sql.parser.Sq
 
 public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
 
-  private Map<String, TableSpec> tableMappings = new HashMap();
-  private String defaultZkhost = null;
-  private String defaultWorkerCollection = null;
+  private static String defaultZkhost = null;
+  private static String defaultWorkerCollection = null;
 
   private Logger logger = LoggerFactory.getLogger(SQLHandler.class);
 
@@ -77,41 +77,42 @@ public class SQLHandler extends RequestH
       defaultZkhost = core.getCoreDescriptor().getCoreContainer().getZkController().getZkServerAddress();
       defaultWorkerCollection = core.getCoreDescriptor().getCollectionName();
     }
-
-    NamedList<String> tableConf = (NamedList<String>)initArgs.get("tables");
-
-    for(Entry<String,String> entry : tableConf) {
-      String tableName = entry.getKey();
-      if(entry.getValue().indexOf("@") > -1) {
-        String[] parts = entry.getValue().split("@");
-        tableMappings.put(tableName, new TableSpec(parts[0], parts[1]));
-      } else {
-        String collection = entry.getValue();
-        tableMappings.put(tableName, new TableSpec(collection, defaultZkhost));
-      }
-    }
   }
 
   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
     SolrParams params = req.getParams();
+    params = adjustParams(params);
+    req.setParams(params);
     String sql = params.get("sql");
     int numWorkers = params.getInt("numWorkers", 1);
     String workerCollection = params.get("workerCollection", defaultWorkerCollection);
     String workerZkhost = params.get("workerZkhost",defaultZkhost);
     StreamContext context = new StreamContext();
     try {
-      TupleStream tupleStream = SQLTupleStreamParser.parse(sql, tableMappings, numWorkers, workerCollection, workerZkhost);
+
+      if(sql == null) {
+        throw new Exception("sql parameter cannot be null");
+      }
+
+      TupleStream tupleStream = SQLTupleStreamParser.parse(sql, numWorkers, workerCollection, workerZkhost);
       context.numWorkers = numWorkers;
       context.setSolrClientCache(StreamHandler.clientCache);
       tupleStream.setStreamContext(context);
-      rsp.add("tuples", new ExceptionStream(tupleStream));
+      rsp.add("result-set", new StreamHandler.TimerStream(new ExceptionStream(tupleStream)));
     } catch(Exception e) {
       //Catch the SQL parsing and query transformation exceptions.
-      logger.error("Exception parsing SQL", e);
-      rsp.add("tuples", new StreamHandler.DummyErrorStream(e));
+      SolrException.log(logger, e);
+      rsp.add("result-set", new StreamHandler.DummyErrorStream(e));
     }
   }
 
+  private SolrParams adjustParams(SolrParams params) {
+    ModifiableSolrParams adjustedParams = new ModifiableSolrParams();
+    adjustedParams.add(params);
+    adjustedParams.add(CommonParams.OMIT_HEADER, "true");
+    return adjustedParams;
+  }
+
   public String getDescription() {
     return "SQLHandler";
   }
@@ -123,7 +124,6 @@ public class SQLHandler extends RequestH
   public static class SQLTupleStreamParser {
 
     public static TupleStream parse(String sql,
-                                    Map<String, TableSpec> tableMap,
                                     int numWorkers,
                                     String workerCollection,
                                     String workerZkhost) throws IOException {
@@ -137,30 +137,33 @@ public class SQLHandler extends RequestH
       TupleStream sqlStream = null;
 
       if(sqlVistor.groupByQuery) {
-        sqlStream = doGroupBy(sqlVistor, tableMap, numWorkers, workerCollection, workerZkhost);
+        sqlStream = doGroupByWithAggregates(sqlVistor, numWorkers, workerCollection, workerZkhost);
       } else {
-        sqlStream = doSelect(sqlVistor, tableMap, numWorkers, workerCollection, workerZkhost);
+        sqlStream = doSelect(sqlVistor);
       }
 
       return sqlStream;
     }
   }
 
-  private static TupleStream doGroupBy(SQLVisitor sqlVisitor,
-                                       Map<String, TableSpec> tableMap,
-                                       int numWorkers,
-                                       String workerCollection,
-                                       String workerZkHost) throws IOException {
+  private static TupleStream doGroupByWithAggregates(SQLVisitor sqlVisitor,
+                                                     int numWorkers,
+                                                     String workerCollection,
+                                                     String workerZkHost) throws IOException {
 
     Set<String> fieldSet = new HashSet();
     Bucket[] buckets = getBuckets(sqlVisitor.groupBy, fieldSet);
     Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
+    if(metrics.length == 0) {
+      throw new IOException("Group by queries must include atleast one aggregate function.");
+    }
 
     String fl = fields(fieldSet);
     String sortDirection = getSortDirection(sqlVisitor.sorts);
     String sort = bucketSort(buckets, sortDirection);
 
-    TableSpec tableSpec = tableMap.get(sqlVisitor.table);
+    TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
+
     String zkHost = tableSpec.zkHost;
     String collection = tableSpec.collection;
     Map<String, String> params = new HashMap();
@@ -229,17 +232,36 @@ public class SQLHandler extends RequestH
     return tupleStream;
   }
 
-  private static TupleStream doSelect(SQLVisitor sqlVisitor,
-                                      Map<String, TableSpec> tableMap,
-                                      int numWorkers,
-                                      String workerCollection,
-                                      String workerZkHost) throws IOException {
+  private static TupleStream doSelect(SQLVisitor sqlVisitor) throws IOException {
     List<String> fields = sqlVisitor.fields;
     StringBuilder flbuf = new StringBuilder();
     boolean comma = false;
-    for(String field : fields) {
 
-      if(comma) {
+    if(fields.size() == 0) {
+      throw new IOException("Select columns must be specified.");
+    }
+
+    boolean score = false;
+
+    for (String field : fields) {
+
+      if(field.contains("(")) {
+        throw new IOException("Aggregate functions only supported with group by queries.");
+      }
+
+      if(field.contains("*")) {
+        throw new IOException("* is not supported for column selection.");
+      }
+
+      if(field.equals("score")) {
+        if(sqlVisitor.limit < 0) {
+          throw new IOException("score is not a valid field for unlimited select queries");
+        } else {
+          score = true;
+        }
+      }
+
+      if (comma) {
         flbuf.append(",");
       }
 
@@ -254,21 +276,37 @@ public class SQLHandler extends RequestH
     StringBuilder siBuf = new StringBuilder();
 
     comma = false;
-    for(SortItem sortItem : sorts) {
-      if(comma) {
-        siBuf.append(",");
+
+    if(sorts != null) {
+      for (SortItem sortItem : sorts) {
+        if (comma) {
+          siBuf.append(",");
+        }
+        siBuf.append(stripQuotes(sortItem.getSortKey().toString()) + " " + ascDesc(sortItem.getOrdering().toString()));
+      }
+    } else {
+      if(sqlVisitor.limit < 0) {
+       throw new IOException("order by is required for unlimited select statements.");
+      } else {
+        siBuf.append("score desc");
+        if(!score) {
+          fl = fl+(",score");
+        }
       }
-      siBuf.append(stripQuotes(sortItem.getSortKey().toString()) + " " + ascDesc(sortItem.getOrdering().toString()));
     }
 
-    TableSpec tableSpec = tableMap.get(sqlVisitor.table);
+    TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
+
     String zkHost = tableSpec.zkHost;
     String collection = tableSpec.collection;
     Map<String, String> params = new HashMap();
 
     params.put("fl", fl.toString());
     params.put("q", sqlVisitor.query);
-    params.put("sort", siBuf.toString());
+
+    if(siBuf.length() > 0) {
+      params.put("sort", siBuf.toString());
+    }
 
     if(sqlVisitor.limit > -1) {
       params.put("rows", Integer.toString(sqlVisitor.limit));
@@ -384,15 +422,15 @@ public class SQLHandler extends RequestH
     return buf.toString();
   }
 
-  private static Metric[] getMetrics(List<String> fields, Set<String> fieldSet) {
+  private static Metric[] getMetrics(List<String> fields, Set<String> fieldSet) throws IOException {
     List<Metric> metrics = new ArrayList();
     for(String field : fields) {
-
       if(field.contains("(")) {
 
         field = field.substring(0, field.length()-1);
         String[] parts = field.split("\\(");
         String function = parts[0];
+        validateFunction(function);
         String column = parts[1];
         if(function.equals("min")) {
           metrics.add(new MinMetric(column));
@@ -414,6 +452,14 @@ public class SQLHandler extends RequestH
     return metrics.toArray(new Metric[metrics.size()]);
   }
 
+  private static void validateFunction(String function) throws IOException {
+    if(function.equals("min") || function.equals("max") || function.equals("sum") || function.equals("avg") || function.equals("count")) {
+      return;
+    } else {
+      throw new IOException("Invalid function: "+function);
+    }
+  }
+
   private static Bucket[] getBuckets(List<String> fields, Set<String> fieldSet) {
     List<Bucket> buckets = new ArrayList();
     for(String field : fields) {
@@ -466,13 +512,19 @@ public class SQLHandler extends RequestH
   }
 
 
-  private class TableSpec {
+  private static class TableSpec {
     private String collection;
     private String zkHost;
 
-    public TableSpec(String collection, String zkHost) {
-      this.collection = collection;
-      this.zkHost = zkHost;
+    public TableSpec(String table, String defaultZkHost) {
+      if(table.contains("@")) {
+        String[] parts = table.split("@");
+        this.collection = parts[0];
+        this.zkHost = parts[1];
+      } else {
+        this.collection = table;
+        this.zkHost = defaultZkHost;
+      }
     }
   }
 
@@ -496,7 +548,14 @@ public class SQLHandler extends RequestH
     protected Void visitComparisonExpression(ComparisonExpression node, StringBuilder buf) {
       String field = node.getLeft().toString();
       String value = node.getRight().toString();
-      buf.append('(').append(stripQuotes(field) + ":" + stripSingleQuotes(value)).append(')');
+      value = stripSingleQuotes(value);
+
+      if(!value.startsWith("(") && !value.startsWith("[")) {
+        //If no parens default to a phrase search.
+        value = '"'+value+'"';
+      }
+
+      buf.append('(').append(stripQuotes(field) + ":" + value).append(')');
       return null;
     }
   }
@@ -805,9 +864,9 @@ public class SQLHandler extends RequestH
         case EQUAL:
           return td == d;
         case GREATER_THAN:
-          return td <= d;
+          return td > d;
         case GREATER_THAN_OR_EQUAL:
-          return td <= d;
+          return td >= d;
         default:
           return false;
       }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/StreamHandler.java Tue Jul 21 20:28:35 2015
@@ -19,6 +19,7 @@ package org.apache.solr.handler;
 
 import java.io.ByteArrayInputStream;
 import java.io.ObjectInputStream;
+import java.io.IOException;
 import java.net.URLDecoder;
 import java.util.HashMap;
 import java.util.List;
@@ -45,6 +46,9 @@ import org.apache.solr.client.solrj.io.s
 import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.Base64;
 import org.apache.solr.common.util.NamedList;
@@ -129,7 +133,8 @@ public class StreamHandler extends Reque
 
   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
     SolrParams params = req.getParams();
-
+    params = adjustParams(params);
+    req.setParams(params);
     boolean objectSerialize = params.getBool("objectSerialize", false);
     TupleStream tupleStream = null;
 
@@ -146,8 +151,8 @@ public class StreamHandler extends Reque
       }
     } catch (Exception e) {
       //Catch exceptions that occur while the stream is being created. This will include streaming expression parse rules.
-      logger.error("Exception creating TupleStream", e);
-      rsp.add("tuples", new DummyErrorStream(e));
+      SolrException.log(logger, e);
+      rsp.add("result-set", new DummyErrorStream(e));
 
       return;
     }
@@ -159,7 +164,14 @@ public class StreamHandler extends Reque
     context.numWorkers = numWorkers;
     context.setSolrClientCache(clientCache);
     tupleStream.setStreamContext(context);
-    rsp.add("tuples", new ExceptionStream(tupleStream));
+    rsp.add("result-set", new TimerStream(new ExceptionStream(tupleStream)));
+  }
+
+  private SolrParams adjustParams(SolrParams params) {
+    ModifiableSolrParams adjustedParams = new ModifiableSolrParams();
+    adjustedParams.add(params);
+    adjustedParams.add(CommonParams.OMIT_HEADER, "true");
+    return adjustedParams;
   }
 
   public String getDescription() {
@@ -198,8 +210,49 @@ public class StreamHandler extends Reque
       String msg = e.getMessage();
       Map m = new HashMap();
       m.put("EOF", true);
-      m.put("_EXCEPTION_", msg);
+      m.put("EXCEPTION", msg);
       return new Tuple(m);
     }
   }
+
+  public static class TimerStream extends TupleStream {
+
+    private long begin;
+    private TupleStream tupleStream;
+
+    public TimerStream(TupleStream tupleStream) {
+      this.tupleStream = tupleStream;
+    }
+
+    public StreamComparator getStreamSort() {
+      return this.tupleStream.getStreamSort();
+    }
+
+    public void close() throws IOException {
+      this.tupleStream.close();
+    }
+
+    public void open() throws IOException {
+      this.begin = System.nanoTime();
+      this.tupleStream.open();
+    }
+
+    public void setStreamContext(StreamContext context) {
+      this.tupleStream.setStreamContext(context);
+    }
+
+    public List<TupleStream> children() {
+      return this.tupleStream.children();
+    }
+
+    public Tuple read() throws IOException {
+      Tuple tuple = this.tupleStream.read();
+      if(tuple.EOF) {
+        long totalTime = (System.nanoTime() - begin) / 1000000;
+        tuple.fields.put("RESPONSE_TIME", totalTime);
+      }
+      return tuple;
+    }
+  }
+
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java Tue Jul 21 20:28:35 2015
@@ -37,6 +37,7 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRefBuilder;
 import org.apache.lucene.util.FixedBitSet;
 import org.apache.lucene.util.LongValues;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.request.SolrQueryRequest;
@@ -72,7 +73,7 @@ public class SortingResponseWriter imple
     Exception e1 = res.getException();
     if(e1 != null) {
       if(!(e1 instanceof IgnoreException)) {
-        e1.printStackTrace(new PrintWriter(writer));
+        writeException(e1, writer, false);
       }
       return;
     }
@@ -128,16 +129,15 @@ public class SortingResponseWriter imple
       exception = e;
     }
 
-    writer.write("{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":"+totalHits+", \"docs\":[");
 
     if(exception != null) {
-      //We have an exception. Send it back to the client and return.
-      writeException(exception, writer);
-      writer.write("]}}");
-      writer.flush();
+      writeException(exception, writer, true);
       return;
     }
 
+    writer.write("{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":"+totalHits+", \"docs\":[");
+
+
     //Write the data.
     List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
     SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
@@ -189,6 +189,7 @@ public class SortingResponseWriter imple
         }
       } catch(Throwable e) {
         Throwable ex = e;
+        e.printStackTrace();
         while(ex != null) {
           String m = ex.getMessage();
           if(m != null && m.contains("Broken pipe")) {
@@ -242,10 +243,16 @@ public class SortingResponseWriter imple
     }
   }
 
-  protected void writeException(Exception e, Writer out) throws IOException{
-    out.write("{\"_EXCEPTION_\":\"");
+  protected void writeException(Exception e, Writer out, boolean log) throws IOException{
+    out.write("{\"responseHeader\": {\"status\": 400}, \"response\":{\"numFound\":0, \"docs\":[");
+    out.write("{\"EXCEPTION\":\"");
     writeStr(e.getMessage(), out);
     out.write("\"}");
+    out.write("]}}");
+    out.flush();
+    if(log) {
+      SolrException.log(logger, e);
+    }
   }
 
   protected FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
@@ -1138,6 +1145,7 @@ public class SortingResponseWriter imple
 
     public void setCurrentValue(int docId) {
       int ord = currentVals.getOrd(docId);
+
       if(ord < 0) {
         currentOrd = -1;
       } else {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java Tue Jul 21 20:28:35 2015
@@ -293,20 +293,21 @@ public abstract class TextResponseWriter
 
   public void writeTupleStream(TupleStream tupleStream) throws IOException {
     tupleStream.open();
-    writeStartDocumentList("response", -1, -1, -1, null);
+    tupleStream.writeStreamOpen(writer);
     boolean isFirst = true;
     while(true) {
       Tuple tuple = tupleStream.read();
       if(!isFirst) {
         writer.write(",");
       }
+      writer.write("\n");
       writeMap(null, tuple.fields, false, true);
       isFirst = false;
       if(tuple.EOF) {
         break;
       }
     }
-    writeEndDocumentList();
+    tupleStream.writeStreamClose(writer);
     tupleStream.close();
   }
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java Tue Jul 21 20:28:35 2015
@@ -62,14 +62,12 @@ public class ExportQParserPlugin extends
 
   public class ExportQuery extends RankQuery {
     
-    private int leafCount;
     private Query mainQuery;
     private Object id;
 
     public RankQuery clone() {
       ExportQuery clone = new ExportQuery();
       clone.id = id;
-      clone.leafCount = leafCount;
       return clone;
     }
 
@@ -98,7 +96,8 @@ public class ExportQParserPlugin extends
     public TopDocsCollector getTopDocsCollector(int len,
                                                 SolrIndexSearcher.QueryCommand cmd,
                                                 IndexSearcher searcher) throws IOException {
-      FixedBitSet[] sets = new FixedBitSet[this.leafCount];
+      int leafCount = searcher.getTopReaderContext().leaves().size();
+      FixedBitSet[] sets = new FixedBitSet[leafCount];
       return new ExportCollector(sets);
     }
 
@@ -124,7 +123,6 @@ public class ExportQParserPlugin extends
     }
     
     public ExportQuery(SolrParams localParams, SolrParams params, SolrQueryRequest request) throws IOException {
-      this.leafCount = request.getSearcher().getTopReaderContext().leaves().size();
       id = new Object();
     }
   }

Modified: lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml (original)
+++ lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml Tue Jul 21 20:28:35 2015
@@ -71,10 +71,6 @@
       <str name="wt">json</str>
       <str name="distrib">false</str>
     </lst>
-    <lst name="tables">
-      <str name="mytable">collection1</str>
-    </lst>
-
   </requestHandler>
 
 

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java Tue Jul 21 20:28:35 2015
@@ -102,7 +102,7 @@ public class TestSQLHandler extends Abst
     SQLHandler.SQLVisitor sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
     sqlVistor.process(statement, new Integer(0));
 
-    assert(sqlVistor.query.equals("(c:d)"));
+    assert(sqlVistor.query.equals("(c:\"d\")"));
 
     //Add parens
     parser = new SqlParser();
@@ -111,11 +111,11 @@ public class TestSQLHandler extends Abst
     sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
     sqlVistor.process(statement, new Integer(0));
 
-    assert(sqlVistor.query.equals("(c:d)"));
+    assert(sqlVistor.query.equals("(c:\"d\")"));
 
     //Phrase
     parser = new SqlParser();
-    sql = "select a from b where (c = '\"d d\"')";
+    sql = "select a from b where (c = 'd d')";
     statement = parser.createStatement(sql);
     sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
     sqlVistor.process(statement, new Integer(0));
@@ -129,7 +129,7 @@ public class TestSQLHandler extends Abst
     sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
     sqlVistor.process(statement, new Integer(0));
 
-    assert(sqlVistor.query.equals("((c:d) AND (l:z))"));
+    assert(sqlVistor.query.equals("((c:\"d\") AND (l:\"z\"))"));
 
     // OR
 
@@ -139,7 +139,7 @@ public class TestSQLHandler extends Abst
     sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
     sqlVistor.process(statement, new Integer(0));
 
-    assert(sqlVistor.query.equals("((c:d) OR (l:z))"));
+    assert(sqlVistor.query.equals("((c:\"d\") OR (l:\"z\"))"));
 
     // AND NOT
 
@@ -149,7 +149,7 @@ public class TestSQLHandler extends Abst
     sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
     sqlVistor.process(statement, new Integer(0));
 
-    assert(sqlVistor.query.equals("((c:d) AND -(l:z))"));
+    assert(sqlVistor.query.equals("((c:\"d\") AND -(l:\"z\"))"));
 
     // NESTED
     parser = new SqlParser();
@@ -158,7 +158,7 @@ public class TestSQLHandler extends Abst
     sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
     sqlVistor.process(statement, new Integer(0));
 
-    assert(sqlVistor.query.equals("((c:d) OR ((l:z) AND (m:j)))"));
+    assert(sqlVistor.query.equals("((c:\"d\") OR ((l:\"z\") AND (m:\"j\")))"));
 
     // NESTED NOT
     parser = new SqlParser();
@@ -167,33 +167,33 @@ public class TestSQLHandler extends Abst
     sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
     sqlVistor.process(statement, new Integer(0));
 
-    assert(sqlVistor.query.equals("((c:d) OR ((l:z) AND -(m:j)))"));
+    assert(sqlVistor.query.equals("((c:\"d\") OR ((l:\"z\") AND -(m:\"j\")))"));
 
     // RANGE - Will have to do until SQL BETWEEN is supported.
     // NESTED
     parser = new SqlParser();
-    sql = "select a from b where ((c = '[0 TO 100]') OR ((l = 'z') AND (m = 'j')))";
+    sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z)') AND (m = 'j')))";
     statement = parser.createStatement(sql);
     sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
     sqlVistor.process(statement, new Integer(0));
 
-    assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:z) AND (m:j)))"));
+    assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z)) AND (m:\"j\")))"));
 
     // Wildcard
     parser = new SqlParser();
-    sql = "select a from b where ((c = '[0 TO 100]') OR ((l = 'z*') AND (m = 'j')))";
+    sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z*)') AND (m = 'j')))";
     statement = parser.createStatement(sql);
     sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
     sqlVistor.process(statement, new Integer(0));
-    assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:z*) AND (m:j)))"));
+    assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z*)) AND (m:\"j\")))"));
 
     // Complex Lucene/Solr Query
     parser = new SqlParser();
-    sql = "select a from b where ((c = '[0 TO 100]') OR ((l = 'z*') AND (m = '(j OR (k NOT s))')))";
+    sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z*)') AND (m = '(j OR (k NOT s))')))";
     statement = parser.createStatement(sql);
     sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
     sqlVistor.process(statement, new Integer(0));
-    assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:z*) AND (m:(j OR (k NOT s)))))"));
+    assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z*)) AND (m:(j OR (k NOT s)))))"));
   }
 
   private void testBasicSelect() throws Exception {
@@ -216,7 +216,7 @@ public class TestSQLHandler extends Abst
       commit();
       Map params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' order by field_i desc");
+      params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' order by field_i desc");
 
       SolrStream solrStream = new SolrStream(jetty.url, params);
       List<Tuple> tuples = getTuples(solrStream);
@@ -267,7 +267,7 @@ public class TestSQLHandler extends Abst
 
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' order by field_i desc limit 1");
+      params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' order by field_i desc limit 1");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -281,7 +281,7 @@ public class TestSQLHandler extends Abst
 
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' AND id='(1 2 3)' order by field_i desc");
+      params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' AND id='(1 2 3)' order by field_i desc");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -326,9 +326,10 @@ public class TestSQLHandler extends Abst
       indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
       indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
       commit();
+
       Map params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' order by field_iff desc");
+      params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' order by field_iff desc");
 
       SolrStream solrStream = new SolrStream(jetty.url, params);
       Tuple tuple = getTuple(new ExceptionStream(solrStream));
@@ -339,18 +340,18 @@ public class TestSQLHandler extends Abst
 
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select id, field_iff, str_s from mytable where text='XXXX' order by field_iff desc");
+      params.put("sql", "select id, field_iff, str_s from collection1 where text='XXXX' order by field_iff desc");
 
       solrStream = new SolrStream(jetty.url, params);
       tuple = getTuple(new ExceptionStream(solrStream));
       assert(tuple.EOF);
       assert(tuple.EXCEPTION);
       //An exception not detected by the parser thrown from the /select handler
-      assert(tuple.getException().contains("An exception has occurred on the server, refer to server log for details"));
+      assert(tuple.getException().contains("sort param field can't be found:"));
 
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select str_s, count(*), sum(field_iff), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))");
+      params.put("sql", "select str_s, count(*), sum(field_iff), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))");
 
       solrStream = new SolrStream(jetty.url, params);
       tuple = getTuple(new ExceptionStream(solrStream));
@@ -359,6 +360,27 @@ public class TestSQLHandler extends Abst
       //An exception not detected by the parser thrown from the /export handler
       assert(tuple.getException().contains("undefined field:"));
 
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("sql", "select str_s, count(*), blah(field_iff), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuple = getTuple(new ExceptionStream(solrStream));
+      assert(tuple.EOF);
+      assert(tuple.EXCEPTION);
+      //An exception not detected by the parser thrown from the /export handler
+      assert(tuple.getException().contains("Invalid function: blah"));
+
+      params = new HashMap();
+      params.put(CommonParams.QT, "/sql");
+      params.put("sql", "select str_s from collection1 where text='XXXX' group by str_s");
+
+      solrStream = new SolrStream(jetty.url, params);
+      tuple = getTuple(new ExceptionStream(solrStream));
+      assert(tuple.EOF);
+      assert(tuple.EXCEPTION);
+      assert(tuple.getException().contains("Group by queries must include atleast one aggregate function."));
+
     } finally {
       delete();
     }
@@ -384,7 +406,7 @@ public class TestSQLHandler extends Abst
       commit();
       Map params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
 
       SolrStream solrStream = new SolrStream(jetty.url, params);
       List<Tuple> tuples = getTuples(solrStream);
@@ -412,7 +434,7 @@ public class TestSQLHandler extends Abst
 
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where (text='XXXX' AND NOT text='\"XXXX XXX\"') group by str_s order by str_s desc");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where (text='XXXX' AND NOT text='XXXX XXX') group by str_s order by str_s desc");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -449,7 +471,7 @@ public class TestSQLHandler extends Abst
 
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having sum(field_i) = 19");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having sum(field_i) = 19");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -466,7 +488,7 @@ public class TestSQLHandler extends Abst
 
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -484,7 +506,7 @@ public class TestSQLHandler extends Abst
 
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -518,7 +540,7 @@ public class TestSQLHandler extends Abst
       Map params = new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("numWorkers", "2");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
 
       SolrStream solrStream = new SolrStream(jetty.url, params);
       List<Tuple> tuples = getTuples(solrStream);
@@ -547,7 +569,7 @@ public class TestSQLHandler extends Abst
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("numWorkers", "2");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s order by str_s desc");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by str_s desc");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -584,7 +606,7 @@ public class TestSQLHandler extends Abst
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("numWorkers", "2");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having sum(field_i) = 19");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having sum(field_i) = 19");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -611,7 +633,7 @@ public class TestSQLHandler extends Abst
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("numWorkers", "2");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -630,7 +652,7 @@ public class TestSQLHandler extends Abst
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("numWorkers", "2");
-      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))");
+      params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -663,7 +685,7 @@ public class TestSQLHandler extends Abst
       commit();
       Map params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select year_i, sum(item_i) from mytable group by year_i order by year_i desc");
+      params.put("sql", "select year_i, sum(item_i) from collection1 group by year_i order by year_i desc");
 
       SolrStream solrStream = new SolrStream(jetty.url, params);
       List<Tuple> tuples = getTuples(solrStream);
@@ -681,7 +703,7 @@ public class TestSQLHandler extends Abst
       assert(tuple.getLong("year_i") == 2014);
       assert(tuple.getDouble("sum(item_i)") == 7);
 
-      params.put("sql", "select year_i, month_i, sum(item_i) from mytable group by year_i, month_i order by year_i desc, month_i desc");
+      params.put("sql", "select year_i, month_i, sum(item_i) from collection1 group by year_i, month_i order by year_i desc, month_i desc");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -708,7 +730,7 @@ public class TestSQLHandler extends Abst
 
       params = new HashMap();
       params.put(CommonParams.QT, "/sql");
-      params.put("sql", "select year_i, month_i, day_i, sum(item_i) from mytable group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc");
+      params.put("sql", "select year_i, month_i, day_i, sum(item_i) from collection1 group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -781,7 +803,7 @@ public class TestSQLHandler extends Abst
       Map params = new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("numWorkers", 2);
-      params.put("sql", "select year_i, sum(item_i) from mytable group by year_i order by year_i desc");
+      params.put("sql", "select year_i, sum(item_i) from collection1 group by year_i order by year_i desc");
 
       SolrStream solrStream = new SolrStream(jetty.url, params);
       List<Tuple> tuples = getTuples(solrStream);
@@ -802,7 +824,7 @@ public class TestSQLHandler extends Abst
       new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("numWorkers", 2);
-      params.put("sql", "select year_i, month_i, sum(item_i) from mytable group by year_i, month_i order by year_i desc, month_i desc");
+      params.put("sql", "select year_i, month_i, sum(item_i) from collection1 group by year_i, month_i order by year_i desc, month_i desc");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);
@@ -831,7 +853,7 @@ public class TestSQLHandler extends Abst
       new HashMap();
       params.put(CommonParams.QT, "/sql");
       params.put("numWorkers", 2);
-      params.put("sql", "select year_i, month_i, day_i, sum(item_i) from mytable group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc");
+      params.put("sql", "select year_i, month_i, day_i, sum(item_i) from collection1 group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc");
 
       solrStream = new SolrStream(jetty.url, params);
       tuples = getTuples(solrStream);

Modified: lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml (original)
+++ lucene/dev/trunk/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml Tue Jul 21 20:28:35 2015
@@ -857,29 +857,38 @@
      Do not change these defaults.
    -->
 
-    <requestHandler name="/export" class="solr.SearchHandler">
-        <lst name="invariants">
-            <str name="rq">{!xport}</str>
-            <str name="wt">xsort</str>
-            <str name="distrib">false</str>
-        </lst>
-
-        <arr name="components">
-            <str>query</str>
-        </arr>
-    </requestHandler>
+  <requestHandler name="/export" class="solr.SearchHandler">
+    <lst name="invariants">
+      <str name="rq">{!xport}</str>
+      <str name="wt">xsort</str>
+      <str name="distrib">false</str>
+    </lst>
+
+    <arr name="components">
+      <str>query</str>
+    </arr>
+  </requestHandler>
 
 
     <!--
     Distributed Stream processing.
     -->
 
-    <requestHandler name="/stream" class="solr.StreamHandler">
-        <lst name="invariants">
-            <str name="wt">json</str>
-            <str name="distrib">false</str>
-        </lst>
-    </requestHandler>
+  <requestHandler name="/stream" class="solr.StreamHandler">
+    <lst name="invariants">
+      <str name="wt">json</str>
+      <str name="distrib">false</str>
+    </lst>
+  </requestHandler>
+
+
+  <requestHandler name="/sql" class="solr.SQLHandler">
+    <lst name="invariants">
+      <str name="wt">json</str>
+      <str name="distrib">false</str>
+    </lst>
+  </requestHandler>
+
 
 
   <!-- A Robust Example
@@ -942,19 +951,6 @@
   </requestHandler>
 
 
-  <!--
-  Distributed Stream processing.
-  -->
-
-  <requestHandler name="/stream" class="solr.StreamHandler">
-    <lst name="invariants">
-      <str name="wt">json</str>
-      <str name="distrib">false</str>
-    </lst>
-  </requestHandler>
-
-
-
   <!-- Field Analysis Request Handler
 
        RequestHandler that provides much the same functionality as

Modified: lucene/dev/trunk/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml (original)
+++ lucene/dev/trunk/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml Tue Jul 21 20:28:35 2015
@@ -910,7 +910,12 @@
   </requestHandler>
 
 
-
+  <requestHandler name="/sql" class="solr.SQLHandler">
+    <lst name="invariants">
+      <str name="wt">json</str>
+      <str name="distrib">false</str>
+    </lst>
+  </requestHandler>
 
 
 

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java Tue Jul 21 20:28:35 2015
@@ -48,7 +48,7 @@ public class Tuple implements Cloneable
       EOF = true;
     }
 
-    if(fields.containsKey("_EXCEPTION_")){
+    if(fields.containsKey("EXCEPTION")){
       EXCEPTION = true;
     }
 
@@ -67,7 +67,7 @@ public class Tuple implements Cloneable
     return this.fields.get(key).toString();
   }
 
-  public String getException(){ return (String)this.fields.get("_EXCEPTION_"); }
+  public String getException(){ return (String)this.fields.get("EXCEPTION"); }
 
   public Long getLong(Object key) {
     Object o = this.fields.get(key);

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java Tue Jul 21 20:28:35 2015
@@ -289,6 +289,11 @@ public class CloudSolrStream extends Tup
       //System.out.println("Connected to zk an got cluster state.");
 
       Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
+
+      if(slices == null) {
+        throw new Exception("Collection not found:"+this.collection);
+      }
+
       long time = System.currentTimeMillis();
       params.put("distrib","false"); // We are the aggregator.
 

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java Tue Jul 21 20:28:35 2015
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.common.SolrException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,9 +54,9 @@ public class ExceptionStream extends Tup
     if(openException != null) {
       //There was an exception during the open.
       Map fields = new HashMap();
-      fields.put("_EXCEPTION_", openException.getMessage());
+      fields.put("EXCEPTION", openException.getMessage());
       fields.put("EOF", true);
-      log.error("Error while opening Stream", openException);
+      SolrException.log(log, openException);
       return new Tuple(fields);
     }
 
@@ -63,9 +64,9 @@ public class ExceptionStream extends Tup
       return stream.read();
     } catch (Exception e) {
       Map fields = new HashMap();
-      fields.put("_EXCEPTION_", e.getMessage());
+      fields.put("EXCEPTION", e.getMessage());
       fields.put("EOF", true);
-      log.error("Error while reading Stream:" + e);
+      SolrException.log(log, e);
       return new Tuple(fields);
     }
   }

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JSONTupleStream.java Tue Jul 21 20:28:35 2015
@@ -114,6 +114,8 @@ public class JSONTupleStream {
             String val = parser.getString();
             if (key.equals(val)) {
               return true;
+            } else if("error".equals(val)) {
+              handleError();
             }
           }
           break;
@@ -135,6 +137,26 @@ public class JSONTupleStream {
       }
     }
   }
+
+  private void handleError() throws IOException {
+    for (;;) {
+      int event = parser.nextEvent();
+      if(event == JSONParser.STRING) {
+        String val = parser.getString();
+        if("msg".equals(val)) {
+          event = parser.nextEvent();
+          if(event == JSONParser.STRING) {
+            String msg = parser.getString();
+            if(msg != null) {
+              throw new SolrStream.HandledException(msg);
+            }
+          }
+        }
+      } else if (event == JSONParser.OBJECT_END) {
+        throw new IOException("");
+      }
+    }
+  }
 
   private void skipArray(String key, boolean deepSearch) throws IOException {
     for (;;) {

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java Tue Jul 21 20:28:35 2015
@@ -205,14 +205,20 @@ public class ParallelStream extends Clou
       m.put("EOF", true);
       Tuple t = new Tuple(m);
 
+      /*
       Map<String, Map> metrics = new HashMap();
       Iterator<Entry<String,Tuple>> it = this.eofTuples.entrySet().iterator();
       while(it.hasNext()) {
         Map.Entry<String, Tuple> entry = it.next();
-        metrics.put(entry.getKey(), entry.getValue().fields);
+        if(entry.getValue().fields.size() > 1) {
+          metrics.put(entry.getKey(), entry.getValue().fields);
+        }
       }
 
-      t.setMetrics(metrics);
+      if(metrics.size() > 0) {
+        t.setMetrics(metrics);
+      }
+      */
       return t;
     }
 

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java Tue Jul 21 20:28:35 2015
@@ -170,6 +170,11 @@ public class RollupStream extends TupleS
       Tuple tuple = tupleStream.read();
       if(tuple.EOF) {
         if(!finished) {
+
+          if(currentMetrics == null) {
+            return tuple;
+          }
+
           Map map = new HashMap();
           for(Metric metric : currentMetrics) {
             map.put(metric.getIdentifier(), metric.getValue());

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java Tue Jul 21 20:28:35 2015
@@ -159,9 +159,9 @@ public class SolrStream extends TupleStr
         return new Tuple(m);
       } else {
 
-        String msg = (String) fields.get("_EXCEPTION_");
+        String msg = (String) fields.get("EXCEPTION");
         if (msg != null) {
-          HandledException ioException = new HandledException(this.baseUrl + ":" + msg);
+          HandledException ioException = new HandledException(msg);
           throw ioException;
         }
 
@@ -175,11 +175,10 @@ public class SolrStream extends TupleStr
         return new Tuple(fields);
       }
     } catch (HandledException e) {
-      throw e;
+      throw new IOException("--> "+this.baseUrl+":"+e.getMessage());
     } catch (Exception e) {
       //The Stream source did not provide an exception in a format that the SolrStream could propagate.
-      e.printStackTrace();
-      throw new IOException(this.baseUrl+": An exception has occurred on the server, refer to server log for details.");
+      throw new IOException("--> "+this.baseUrl+": An exception has occurred on the server, refer to server log for details.");
     }
   }
 

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java Tue Jul 21 20:28:35 2015
@@ -19,12 +19,12 @@ package org.apache.solr.client.solrj.io.
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.io.Writer;
 import java.util.List;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
-import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
 
 public abstract class TupleStream implements Serializable {
 
@@ -33,7 +33,15 @@ public abstract class TupleStream implem
   public TupleStream() {
 
   }
-  
+
+  public static void writeStreamOpen(Writer out) throws IOException {
+    out.write("{\"docs\":[");
+  }
+
+  public static void writeStreamClose(Writer out) throws IOException {
+    out.write("]}");
+  }
+
   public abstract void setStreamContext(StreamContext context);
 
   public abstract List<TupleStream> children();

Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java?rev=1692193&r1=1692192&r2=1692193&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java Tue Jul 21 20:28:35 2015
@@ -511,9 +511,7 @@ public class StreamingTest extends Abstr
     Tuple t = getTuple(estream);
     assert(t.EOF);
     assert(t.EXCEPTION);
-    //The /select handler does not return exceptions in tuple so the generic exception is returned.
-    assert(t.getException().contains("An exception has occurred on the server, refer to server log for details."));
-
+    assert(t.getException().contains("sort param field can't be found: blah"));
 
     //Test an error that comes originates from the /export handler
     paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export");
@@ -553,6 +551,18 @@ public class StreamingTest extends Abstr
     //ParallelStream requires that partitionKeys be set.
     assert(t.getException().contains("When numWorkers > 1 partitionKeys must be set."));
 
+
+    //Test an error that originates from the /select handler
+    paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys","a_s");
+    stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    pstream = new ParallelStream(zkHost,"collection1", stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
+    estream = new ExceptionStream(pstream);
+    t = getTuple(estream);
+    assert(t.EOF);
+    assert(t.EXCEPTION);
+    assert(t.getException().contains("sort param field can't be found: blah"));
+
+
     //Test an error that originates from the /export handler
     paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export", "partitionKeys","a_s");
     stream = new CloudSolrStream(zkHost, "collection1", paramsA);