You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/06/08 14:03:45 UTC

[lucene-solr] branch master updated: SOLR-14470: Add streaming expressions to /export handler.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 30924f2  SOLR-14470: Add streaming expressions to /export handler.
30924f2 is described below

commit 30924f23d6834605b9bf2d24509755ff61c4e878
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Jun 8 16:03:07 2020 +0200

    SOLR-14470: Add streaming expressions to /export handler.
---
 solr/CHANGES.txt                                   |   2 +
 .../java/org/apache/solr/handler/CatStream.java    |  21 +-
 .../org/apache/solr/handler/ExportHandler.java     |  69 +++++-
 .../java/org/apache/solr/handler/GraphHandler.java |  12 +-
 .../java/org/apache/solr/handler/SQLHandler.java   |  10 +-
 .../org/apache/solr/handler/StreamHandler.java     |  54 ++---
 .../apache/solr/handler/export/ExportWriter.java   | 246 ++++++++++++++++++++-
 .../solr/handler/export/SingleValueSortDoc.java    |   5 +
 .../org/apache/solr/handler/export/SortDoc.java    |   4 +
 .../solr/handler/export/StringFieldWriter.java     |  16 +-
 .../apache/solr/handler/export/StringValue.java    |  22 +-
 .../org/apache/solr/handler/sql/LimitStream.java   |   6 +-
 .../solr/response/GraphMLResponseWriter.java       |   4 +-
 .../src/test/org/apache/solr/core/HelloStream.java |  10 +-
 .../solr/handler/export/TestExportWriter.java      |  53 ++++-
 solr/solr-ref-guide/src/exporting-result-sets.adoc |  28 +++
 .../org/apache/solr/client/solrj/io/Tuple.java     | 154 ++++++++++---
 .../solr/client/solrj/io/eval/AnovaEvaluator.java  |  11 +-
 .../solrj/io/eval/ChiSquareDataSetEvaluator.java   |  11 +-
 .../client/solrj/io/eval/DescribeEvaluator.java    |  30 ++-
 .../solrj/io/eval/FrequencyTableEvaluator.java     |  16 +-
 .../solrj/io/eval/GTestDataSetEvaluator.java       |  11 +-
 .../client/solrj/io/eval/HistogramEvaluator.java   |  24 +-
 .../solrj/io/eval/KolmogorovSmirnovEvaluator.java  |  19 +-
 .../solrj/io/eval/MannWhitneyUEvaluator.java       |  11 +-
 .../client/solrj/io/eval/OutliersEvaluator.java    |   3 +-
 .../client/solrj/io/eval/PairedTTestEvaluator.java |   9 +-
 .../client/solrj/io/eval/RecursiveEvaluator.java   |  15 +-
 .../client/solrj/io/eval/SetValueEvaluator.java    |  10 +-
 .../solr/client/solrj/io/eval/TTestEvaluator.java  |  11 +-
 .../client/solrj/io/graph/GatherNodesStream.java   |  13 +-
 .../apache/solr/client/solrj/io/graph/Node.java    |  16 +-
 .../client/solrj/io/graph/ShortestPathStream.java  |  10 +-
 .../solr/client/solrj/io/ops/GroupOperation.java   |   9 +-
 .../client/solrj/io/stream/CalculatorStream.java   |  12 +-
 .../solr/client/solrj/io/stream/CellStream.java    |   7 +-
 .../client/solrj/io/stream/CloudSolrStream.java    |   9 +-
 .../solr/client/solrj/io/stream/CommitStream.java  |   2 +-
 .../solr/client/solrj/io/stream/CsvStream.java     |   3 +-
 .../solr/client/solrj/io/stream/DaemonStream.java  |  12 +-
 .../client/solrj/io/stream/DeepRandomStream.java   |   9 +-
 .../solr/client/solrj/io/stream/EchoStream.java    |  11 +-
 .../client/solrj/io/stream/ExceptionStream.java    |  12 +-
 .../solr/client/solrj/io/stream/Facet2DStream.java |  11 +-
 .../solr/client/solrj/io/stream/FacetStream.java   |  12 +-
 .../solrj/io/stream/FeaturesSelectionStream.java   |  20 +-
 .../solr/client/solrj/io/stream/GetStream.java     |  10 +-
 .../client/solrj/io/stream/HashRollupStream.java   |   8 +-
 .../solr/client/solrj/io/stream/JDBCStream.java    |  23 +-
 .../solr/client/solrj/io/stream/KnnStream.java     |  11 +-
 .../solr/client/solrj/io/stream/ListStream.java    |   5 +-
 .../solr/client/solrj/io/stream/ModelStream.java   |   4 +-
 .../solr/client/solrj/io/stream/NoOpStream.java    |   6 +-
 .../solr/client/solrj/io/stream/NullStream.java    |   3 +-
 .../client/solrj/io/stream/ParallelListStream.java |   7 +-
 .../client/solrj/io/stream/ParallelStream.java     |   8 +-
 .../solr/client/solrj/io/stream/PlotStream.java    |  10 +-
 .../solr/client/solrj/io/stream/RandomStream.java  |  13 +-
 .../solr/client/solrj/io/stream/RollupStream.java  |  16 +-
 .../client/solrj/io/stream/ScoreNodesStream.java   |   8 +-
 .../solr/client/solrj/io/stream/SearchStream.java  |  13 +-
 .../solr/client/solrj/io/stream/SelectStream.java  |   6 +-
 .../solrj/io/stream/SignificantTermsStream.java    |   4 +-
 .../solr/client/solrj/io/stream/SolrStream.java    |   8 +-
 .../solr/client/solrj/io/stream/StatsStream.java   |   8 +-
 .../client/solrj/io/stream/TextLogitStream.java    |  27 +--
 .../client/solrj/io/stream/TimeSeriesStream.java   |   9 +-
 .../solr/client/solrj/io/stream/TupStream.java     |  12 +-
 .../solr/client/solrj/io/stream/UpdateStream.java  |  18 +-
 .../solr/client/solrj/io/stream/ZplotStream.java   |  29 ++-
 .../client/solrj/io/stream/expr/StreamFactory.java | 234 ++++++++++----------
 .../solrj/io/stream/metrics/CountMetric.java       |   4 +-
 .../apache/solr/common/params/StreamParams.java    |  41 ++++
 .../solrj/io/stream/CloudAuthStreamTest.java       |   4 +-
 .../client/solrj/io/stream/JDBCStreamTest.java     |   4 +-
 .../solrj/io/stream/SelectWithEvaluatorsTest.java  |   4 +-
 .../solrj/io/stream/StreamDecoratorTest.java       |   6 +-
 .../solrj/io/stream/StreamExpressionTest.java      |   4 +-
 78 files changed, 999 insertions(+), 633 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 26994d6..19970d4 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -94,6 +94,8 @@ New Features
 * SOLR-14476: Add percentiles and standard deviation aggregations to stats, facet and
   timeseries Streaming Expressions (Joel Bernstein)
 
+* SOLR-14470: Add streaming expressions to /export handler. (ab, Joel Bernstein)
+
 Improvements
 ---------------------
 * SOLR-14316: Remove unchecked type conversion warning in JavaBinCodec's readMapEntry's equals() method
diff --git a/solr/core/src/java/org/apache/solr/handler/CatStream.java b/solr/core/src/java/org/apache/solr/handler/CatStream.java
index 696fb89..806c94a 100644
--- a/solr/core/src/java/org/apache/solr/handler/CatStream.java
+++ b/solr/core/src/java/org/apache/solr/handler/CatStream.java
@@ -24,7 +24,6 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
@@ -126,14 +125,14 @@ public class CatStream extends TupleStream implements Expressible {
   public Tuple read() throws IOException {
     if (maxLines >= 0 && linesReturned >= maxLines) {
       closeCurrentFileIfSet();
-      return createEofTuple();
+      return Tuple.EOF();
     } else if (currentFileHasMoreLinesToRead()) {
       return fetchNextLineFromCurrentFile();
     } else if (advanceToNextFileWithData()) {
       return fetchNextLineFromCurrentFile();
     } else { // No more data
       closeCurrentFileIfSet();
-      return createEofTuple();
+      return Tuple.EOF();
     }
   }
 
@@ -201,18 +200,10 @@ public class CatStream extends TupleStream implements Expressible {
   private Tuple fetchNextLineFromCurrentFile() {
     linesReturned++;
 
-    @SuppressWarnings({"rawtypes"})
-    HashMap m = new HashMap();
-    m.put("file", currentFilePath.displayPath);
-    m.put("line", currentFileLines.next());
-    return new Tuple(m);
-  }
-
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  private Tuple createEofTuple() {
-    HashMap m = new HashMap();
-    m.put("EOF", true);
-    return new Tuple(m);
+    return new Tuple(
+        "file", currentFilePath.displayPath,
+        "line", currentFileLines.next()
+    );
   }
 
   private boolean currentFileHasMoreLinesToRead() {
diff --git a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
index ea9239d..04800a3 100644
--- a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
@@ -18,20 +18,87 @@
 package org.apache.solr.handler;
 
 
+import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
+import org.apache.solr.client.solrj.io.ModelCache;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.component.SearchHandler;
 import org.apache.solr.handler.export.ExportWriter;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.params.CommonParams.JSON;
 
 public class ExportHandler extends SearchHandler {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private ModelCache modelCache = null;
+  private ConcurrentMap objectCache = new ConcurrentHashMap();
+  private SolrDefaultStreamFactory streamFactory = new ExportHandlerStreamFactory();
+  private String coreName;
+  private SolrClientCache solrClientCache;
+  private StreamContext initialStreamContext;
+
+  public static class ExportHandlerStreamFactory extends SolrDefaultStreamFactory {
+    static final String[] forbiddenStreams = new String[] {
+        // source streams
+        "search", "facet", "facet2D", "update", "delete", "jdbc", "topic",
+        "commit", "random", "knnSearch",
+        // execution streams
+        "parallel", "executor", "daemon"
+        // other streams?
+    };
+
+    public ExportHandlerStreamFactory() {
+      super();
+      for (String function : forbiddenStreams) {
+        this.withoutFunctionName(function);
+      }
+      this.withFunctionName("input", ExportWriter.ExportWriterStream.class);
+    }
+  }
+
+  @Override
+  public void inform(SolrCore core) {
+    super.inform(core);
+    String defaultCollection;
+    String defaultZkhost;
+    CoreContainer coreContainer = core.getCoreContainer();
+    this.solrClientCache = coreContainer.getSolrClientCache();
+    this.coreName = core.getName();
+
+    if (coreContainer.isZooKeeperAware()) {
+      defaultCollection = core.getCoreDescriptor().getCollectionName();
+      defaultZkhost = core.getCoreContainer().getZkController().getZkServerAddress();
+      streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost);
+      streamFactory.withDefaultZkHost(defaultZkhost);
+      modelCache = new ModelCache(250,
+          defaultZkhost,
+          solrClientCache);
+    }
+    streamFactory.withSolrResourceLoader(core.getResourceLoader());
+    StreamHandler.addExpressiblePlugins(streamFactory, core);
+    initialStreamContext = new StreamContext();
+    initialStreamContext.setStreamFactory(streamFactory);
+    initialStreamContext.setSolrClientCache(solrClientCache);
+    initialStreamContext.setModelCache(modelCache);
+    initialStreamContext.setObjectCache(objectCache);
+    initialStreamContext.put("core", this.coreName);
+    initialStreamContext.put("solr-core", core);
+  }
+
   @Override
   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
     try {
@@ -44,6 +111,6 @@ public class ExportHandler extends SearchHandler {
     Map<String, String> map = new HashMap<>(1);
     map.put(CommonParams.WT, ReplicationHandler.FILE_STREAM);
     req.setParams(SolrParams.wrapDefaults(new MapSolrParams(map),req.getParams()));
-    rsp.add(ReplicationHandler.FILE_STREAM, new ExportWriter(req, rsp, wt));
+    rsp.add(ReplicationHandler.FILE_STREAM, new ExportWriter(req, rsp, wt, initialStreamContext));
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/GraphHandler.java b/solr/core/src/java/org/apache/solr/handler/GraphHandler.java
index 0a4bd82..5c159e7 100644
--- a/solr/core/src/java/org/apache/solr/handler/GraphHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/GraphHandler.java
@@ -20,7 +20,6 @@ package org.apache.solr.handler;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -40,6 +39,7 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.StreamParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.PluginInfo;
@@ -206,14 +206,8 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P
       return null;
     }
 
-    @SuppressWarnings({"unchecked"})
     public Tuple read() {
-      String msg = e.getMessage();
-      @SuppressWarnings({"rawtypes"})
-      Map m = new HashMap();
-      m.put("EOF", true);
-      m.put("EXCEPTION", msg);
-      return new Tuple(m);
+      return Tuple.EXCEPTION(e.getMessage(), true);
     }
   }
 
@@ -265,7 +259,7 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P
       Tuple tuple = this.tupleStream.read();
       if(tuple.EOF) {
         long totalTime = (System.nanoTime() - begin) / 1000000;
-        tuple.fields.put("RESPONSE_TIME", totalTime);
+        tuple.put(StreamParams.RESPONSE_TIME, totalTime);
       }
       return tuple;
     }
diff --git a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
index 6b0330a..8bc1491 100644
--- a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
@@ -159,7 +159,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware, Per
       // Return a metadata tuple as the first tuple and then pass through to the JDBCStream.
       if(firstTuple) {
         try {
-          Map<String, Object> fields = new HashMap<>();
+          Tuple tuple = new Tuple();
 
           firstTuple = false;
 
@@ -173,10 +173,10 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware, Per
           }
 
           if(includeMetadata) {
-            fields.put("isMetadata", true);
-            fields.put("fields", metadataFields);
-            fields.put("aliases", metadataAliases);
-            return new Tuple(fields);
+            tuple.put("isMetadata", true);
+            tuple.put("fields", metadataFields);
+            tuple.put("aliases", metadataAliases);
+            return tuple;
           }
         } catch (SQLException e) {
           throw new IOException(e);
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index 75940eb..f1b1544 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -53,6 +53,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.StreamParams;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrConfig;
@@ -176,10 +177,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
     TupleStream tupleStream;
 
     try {
-      StreamExpression streamExpression = StreamExpressionParser.parse(params.get("expr"));
+      StreamExpression streamExpression = StreamExpressionParser.parse(params.get(StreamParams.EXPR));
       if (this.streamFactory.isEvaluator(streamExpression)) {
-        StreamExpression tupleExpression = new StreamExpression("tuple");
-        tupleExpression.addParameter(new StreamExpressionNamedParameter("return-value", streamExpression));
+        StreamExpression tupleExpression = new StreamExpression(StreamParams.TUPLE);
+        tupleExpression.addParameter(new StreamExpressionNamedParameter(StreamParams.RETURN_VALUE, streamExpression));
         tupleStream = this.streamFactory.constructStream(tupleExpression);
       } else {
         tupleStream = this.streamFactory.constructStream(streamExpression);
@@ -188,7 +189,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
       // Catch exceptions that occur while the stream is being created. This will include streaming expression parse
       // rules.
       SolrException.log(log, e);
-      rsp.add("result-set", new DummyErrorStream(e));
+      rsp.add(StreamParams.RESULT_SET, new DummyErrorStream(e));
 
       return;
     }
@@ -241,9 +242,9 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
       daemonStream.setDaemons(daemons);
       daemonStream.open(); // This will start the daemonStream
       daemons.put(daemonStream.getId(), daemonStream);
-      rsp.add("result-set", new DaemonResponseStream("Daemon:" + daemonStream.getId() + " started on " + coreName));
+      rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon:" + daemonStream.getId() + " started on " + coreName));
     } else {
-      rsp.add("result-set", new TimerStream(new ExceptionStream(tupleStream)));
+      rsp.add(StreamParams.RESULT_SET, new TimerStream(new ExceptionStream(tupleStream)));
     }
   }
 
@@ -256,40 +257,40 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
 
     if ("list".equals(action)) {
       Collection<DaemonStream> vals = daemons.values();
-      rsp.add("result-set", new DaemonCollectionStream(vals));
+      rsp.add(StreamParams.RESULT_SET, new DaemonCollectionStream(vals));
       return;
     }
 
     String id = params.get(ID);
     DaemonStream d = daemons.get(id);
     if (d == null) {
-      rsp.add("result-set", new DaemonResponseStream("Daemon:" + id + " not found on " + coreName));
+      rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon:" + id + " not found on " + coreName));
       return;
     }
 
     switch (action) {
       case "stop":
         d.close();
-        rsp.add("result-set", new DaemonResponseStream("Daemon:" + id + " stopped on " + coreName));
+        rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon:" + id + " stopped on " + coreName));
         break;
 
       case "start":
         try {
           d.open();
         } catch (IOException e) {
-          rsp.add("result-set", new DaemonResponseStream("Daemon: " + id + " error: " + e.getMessage()));
+          rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon: " + id + " error: " + e.getMessage()));
         }
-        rsp.add("result-set", new DaemonResponseStream("Daemon:" + id + " started on " + coreName));
+        rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon:" + id + " started on " + coreName));
         break;
 
       case "kill":
         daemons.remove(id);
         d.close(); // we already found it in the daemons list, so we don't need to verify we removed it.
-        rsp.add("result-set", new DaemonResponseStream("Daemon:" + id + " killed on " + coreName));
+        rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon:" + id + " killed on " + coreName));
         break;
 
       default:
-        rsp.add("result-set", new DaemonResponseStream("Daemon:" + id + " action '"
+        rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon:" + id + " action '"
             + action + "' not recognized on " + coreName));
         break;
     }
@@ -344,7 +345,6 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
           .withExpression("--non-expressible--");
     }
 
-    @SuppressWarnings({"unchecked"})
     public Tuple read() {
       String msg = e.getMessage();
 
@@ -353,12 +353,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
         msg = t.getMessage();
         t = t.getCause();
       }
-
-      @SuppressWarnings({"rawtypes"})
-      Map m = new HashMap();
-      m.put("EOF", true);
-      m.put("EXCEPTION", msg);
-      return new Tuple(m);
+      return Tuple.EXCEPTION(msg, true);
     }
   }
 
@@ -396,15 +391,11 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
           .withExpression("--non-expressible--");
     }
 
-    @SuppressWarnings({"unchecked"})
     public Tuple read() {
       if (it.hasNext()) {
         return it.next().getInfo();
       } else {
-        @SuppressWarnings({"rawtypes"})
-        Map m = new HashMap();
-        m.put("EOF", true);
-        return new Tuple(m);
+        return Tuple.EOF();
       }
     }
   }
@@ -444,19 +435,12 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
           .withExpression("--non-expressible--");
     }
 
-    @SuppressWarnings({"unchecked"})
     public Tuple read() {
       if (sendEOF) {
-        @SuppressWarnings({"rawtypes"})
-        Map m = new HashMap();
-        m.put("EOF", true);
-        return new Tuple(m);
+        return Tuple.EOF();
       } else {
         sendEOF = true;
-        @SuppressWarnings({"rawtypes"})
-        Map m = new HashMap();
-        m.put("DaemonOp", message);
-        return new Tuple(m);
+        return new Tuple("DaemonOp", message);
       }
     }
   }
@@ -506,7 +490,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
       Tuple tuple = this.tupleStream.read();
       if (tuple.EOF) {
         long totalTime = (System.nanoTime() - begin) / 1000000;
-        tuple.fields.put("RESPONSE_TIME", totalTime);
+        tuple.put(StreamParams.RESPONSE_TIME, totalTime);
       }
       return tuple;
     }
diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index adacd77..3eccd0d 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -35,12 +35,29 @@ import org.apache.lucene.search.SortField;
 import org.apache.lucene.util.BitSetIterator;
 import org.apache.lucene.util.FixedBitSet;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.common.IteratorWriter;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.MapWriter.EntryWriter;
 import org.apache.solr.common.PushWriter;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.StreamParams;
 import org.apache.solr.common.util.JavaBinCodec;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
@@ -84,23 +101,164 @@ import static org.apache.solr.common.util.Utils.makeMap;
  * once), and it allows {@link ExportWriter} to scale well with regard to numDocs.
  */
 public class ExportWriter implements SolrCore.RawWriter, Closeable {
-  private static final int DOCUMENT_BATCH_SIZE = 30000;
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int DOCUMENT_BATCH_SIZE = 30000;
+
+  private static final String EXPORT_WRITER_KEY = "__ew__";
+  private static final String SORT_DOCS_KEY = "_ew_docs_";
+  private static final String TOTAL_HITS_KEY = "_ew_totalHits_";
+  private static final String LEAF_READERS_KEY = "_ew_leaves_";
+  private static final String SORT_QUEUE_KEY = "_ew_queue_";
+  private static final String SORT_DOC_KEY = "_ew_sort_";
+
   private OutputStreamWriter respWriter;
   final SolrQueryRequest req;
   final SolrQueryResponse res;
+  final StreamContext initialStreamContext;
+  StreamExpression streamExpression;
+  StreamContext streamContext;
   FieldWriter[] fieldWriters;
   int totalHits = 0;
   FixedBitSet[] sets = null;
   PushWriter writer;
   private String wt;
 
+  private static class TupleEntryWriter implements EntryWriter {
+    Tuple tuple;
+
+    void setTuple(Tuple tuple) {
+      this.tuple = tuple;
+    }
+
+    @Override
+    public EntryWriter put(CharSequence k, Object v) throws IOException {
+      tuple.put(k, v);
+      return this;
+    }
+  }
+
+  public static class ExportWriterStream extends TupleStream implements Expressible {
+    StreamContext context;
+    StreamComparator streamComparator;
+    int pos = -1;
+    int outDocIndex = -1;
+    int count;
+    SortDoc sortDoc;
+    SortQueue queue;
+    SortDoc[] docs;
+    int totalHits;
+    ExportWriter exportWriter;
+    List<LeafReaderContext> leaves;
+    final TupleEntryWriter entryWriter = new TupleEntryWriter();
+
+    public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
+      streamComparator = parseComp(factory.getDefaultSort());
+    }
+
+    @Override
+    public void setStreamContext(StreamContext context) {
+      this.context = context;
+    }
+
+    @Override
+    public List<TupleStream> children() {
+      return null;
+    }
+
+    private StreamComparator parseComp(String sort) throws IOException {
 
-  public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt) {
+      String[] sorts = sort.split(",");
+      StreamComparator[] comps = new StreamComparator[sorts.length];
+      for(int i=0; i<sorts.length; i++) {
+        String s = sorts[i];
+
+        String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
+
+        if (spec.length != 2) {
+          throw new IOException("Invalid sort spec:" + s);
+        }
+
+        String fieldName = spec[0].trim();
+        String order = spec[1].trim();
+
+        comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
+      }
+
+      if(comps.length > 1) {
+        return new MultipleFieldComparator(comps);
+      } else {
+        return comps[0];
+      }
+    }
+
+    @Override
+    public void open() throws IOException {
+      docs = (SortDoc[]) context.get(SORT_DOCS_KEY);
+      queue = (SortQueue) context.get(SORT_QUEUE_KEY);
+      sortDoc = (SortDoc) context.get(SORT_DOC_KEY);
+      totalHits = (Integer) context.get(TOTAL_HITS_KEY);
+      exportWriter = (ExportWriter) context.get(EXPORT_WRITER_KEY);
+      leaves = (List<LeafReaderContext>) context.get(LEAF_READERS_KEY);
+      count = 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+      exportWriter = null;
+      leaves = null;
+    }
+
+    @Override
+    public Tuple read() throws IOException {
+      if (pos < 0) {
+        if (count < totalHits) {
+          outDocIndex = exportWriter.fillOutDocs(leaves, sortDoc, queue, docs);
+          count += (outDocIndex + 1);
+          pos = outDocIndex;
+        } else {
+          return Tuple.EOF();
+        }
+      }
+      if (pos < 0) {
+        return Tuple.EOF();
+      }
+      Tuple tuple = new Tuple();
+      entryWriter.setTuple(tuple);
+      SortDoc s = docs[pos];
+      exportWriter.writeDoc(s, leaves, entryWriter);
+      s.reset();
+      pos--;
+      return tuple;
+    }
+
+    @Override
+    public StreamComparator getStreamSort() {
+      return streamComparator;
+    }
+
+    @Override
+    public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
+      StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+      return expression;
+    }
+
+    @Override
+    public Explanation toExplanation(StreamFactory factory) throws IOException {
+      return new StreamExplanation(getStreamNodeId().toString())
+          .withFunctionName("input")
+          .withImplementingClass(this.getClass().getName())
+          .withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
+          .withExpression("--non-expressible--");
+    }
+  }
+
+
+  public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt, StreamContext initialStreamContext) {
     this.req = req;
     this.res = res;
     this.wt = wt;
-
+    this.initialStreamContext = initialStreamContext;
   }
 
   @Override
@@ -216,6 +374,36 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
       return;
     }
 
+    String expr = params.get(StreamParams.EXPR);
+    if (expr != null) {
+      StreamFactory streamFactory = initialStreamContext.getStreamFactory();
+      streamFactory.withDefaultSort(params.get(CommonParams.SORT));
+      try {
+        StreamExpression expression = StreamExpressionParser.parse(expr);
+        if (streamFactory.isEvaluator(expression)) {
+          streamExpression = new StreamExpression(StreamParams.TUPLE);
+          streamExpression.addParameter(new StreamExpressionNamedParameter(StreamParams.RETURN_VALUE, expression));
+        } else {
+          streamExpression = expression;
+        }
+      } catch (Exception e) {
+        writeException(e, writer, true);
+        return;
+      }
+      streamContext = new StreamContext();
+      streamContext.setRequestParams(params);
+      streamContext.setLocal(true);
+
+      streamContext.workerID = 0;
+      streamContext.numWorkers = 1;
+      streamContext.setSolrClientCache(initialStreamContext.getSolrClientCache());
+      streamContext.setModelCache(initialStreamContext.getModelCache());
+      streamContext.setObjectCache(initialStreamContext.getObjectCache());
+      streamContext.put("core", req.getCore().getName());
+      streamContext.put("solr-core", req.getCore());
+      streamContext.put(CommonParams.SORT, params.get(CommonParams.SORT));
+    }
+
     writer.writeMap(m -> {
       m.put("responseHeader", singletonMap("status", 0));
       m.put("response", (MapWriter) mw -> {
@@ -223,7 +411,18 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
         mw.put("docs", (IteratorWriter) iw -> writeDocs(req, iw, sort));
       });
     });
+    if (streamContext != null) {
+      streamContext = null;
+    }
+  }
 
+  private TupleStream createTupleStream() throws IOException {
+    StreamFactory streamFactory = (StreamFactory)initialStreamContext.getStreamFactory().clone();
+    //Set the sort in the stream factory so it can be used during initialization.
+    streamFactory.withDefaultSort(((String)streamContext.get(CommonParams.SORT)));
+    TupleStream tupleStream = streamFactory.constructStream(streamExpression);
+    tupleStream.setStreamContext(streamContext);
+    return tupleStream;
   }
 
   protected void identifyLowestSortingUnexportedDocs(List<LeafReaderContext> leaves, SortDoc sortDoc, SortQueue queue) throws IOException {
@@ -285,22 +484,47 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   protected void writeDocs(SolrQueryRequest req, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
     List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
     SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
-    int count = 0;
     final int queueSize = Math.min(DOCUMENT_BATCH_SIZE, totalHits);
 
     SortQueue queue = new SortQueue(queueSize, sortDoc);
     SortDoc[] outDocs = new SortDoc[queueSize];
 
-    while (count < totalHits) {
-      identifyLowestSortingUnexportedDocs(leaves, sortDoc, queue);
-      int outDocsIndex = transferBatchToArrayForOutput(queue, outDocs);
-
-      count += (outDocsIndex + 1);
-      addDocsToItemWriter(leaves, writer, outDocs, outDocsIndex);
+    if (streamExpression != null) {
+      streamContext.put(SORT_DOCS_KEY, outDocs);
+      streamContext.put(SORT_QUEUE_KEY, queue);
+      streamContext.put(SORT_DOC_KEY, sortDoc);
+      streamContext.put(TOTAL_HITS_KEY, totalHits);
+      streamContext.put(EXPORT_WRITER_KEY, this);
+      streamContext.put(LEAF_READERS_KEY, leaves);
+      TupleStream tupleStream = createTupleStream();
+      tupleStream.open();
+      for (;;) {
+        final Tuple t = tupleStream.read();
+        if (t == null) {
+          break;
+        }
+        if (t.EOF) {
+          break;
+        }
+        writer.add((MapWriter) ew -> t.writeMap(ew));
+      }
+      tupleStream.close();
+    } else {
+      for (int count = 0; count < totalHits; ) {
+        int outDocsIndex = fillOutDocs(leaves, sortDoc, queue, outDocs);
+        count += (outDocsIndex + 1);
+        addDocsToItemWriter(leaves, writer, outDocs, outDocsIndex);
+      }
     }
   }
 
-  protected void writeDoc(SortDoc sortDoc,
+  private int fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
+                          SortQueue sortQueue, SortDoc[] outDocs) throws IOException {
+    identifyLowestSortingUnexportedDocs(leaves, sortDoc, sortQueue);
+    return transferBatchToArrayForOutput(sortQueue, outDocs);
+  }
+
+  void writeDoc(SortDoc sortDoc,
                           List<LeafReaderContext> leaves,
                           EntryWriter ew) throws IOException {
 
diff --git a/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
index 963901c..164c07b 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java
@@ -32,6 +32,11 @@ class SingleValueSortDoc extends SortDoc {
     return null;
   }
 
+  @Override
+  public SortValue[] getSortValues() {
+    return new SortValue[] { value1 };
+  }
+
   public void setNextReader(LeafReaderContext context) throws IOException {
     this.ord = context.ord;
     this.docBase = context.docBase;
diff --git a/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
index 5e2c75d..292e795 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java
@@ -45,6 +45,10 @@ class SortDoc {
     return null;
   }
 
+  public SortValue[] getSortValues() {
+    return sortValues;
+  }
+
   public void setNextReader(LeafReaderContext context) throws IOException {
     this.ord = context.ord;
     this.docBase = context.docBase;
diff --git a/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java b/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
index c14e4d7..b82c365 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java
@@ -72,9 +72,21 @@ class StringFieldWriter extends FieldWriter {
     if (ew instanceof JavaBinCodec.BinEntryWriter) {
       ew.put(this.field, utf8.reset(ref.bytes, ref.offset, ref.length, null));
     } else {
-      fieldType.indexedToReadable(ref, cref);
-      String v = cref.toString();
+      String v = null;
+      if(sortValue != null) {
+        v = ((StringValue) sortValue).getLastString();
+        if(v == null) {
+          fieldType.indexedToReadable(ref, cref);
+          v = cref.toString();
+          ((StringValue) sortValue).setLastString(v);
+        }
+      } else {
+        fieldType.indexedToReadable(ref, cref);
+        v = cref.toString();
+      }
+
       ew.put(this.field, v);
+
     }
     return true;
   }
diff --git a/solr/core/src/java/org/apache/solr/handler/export/StringValue.java b/solr/core/src/java/org/apache/solr/handler/export/StringValue.java
index 5df4eeb..fc70565 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/StringValue.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/StringValue.java
@@ -24,6 +24,7 @@ import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.MultiDocValues;
 import org.apache.lucene.index.OrdinalMap;
 import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.LongValues;
 
 class StringValue implements SortValue {
@@ -40,6 +41,10 @@ class StringValue implements SortValue {
   protected int lastDocID;
   private boolean present;
 
+  private BytesRef lastBytes;
+  private String lastString;
+  private int lastOrd = -1;
+
   public StringValue(SortedDocValues globalDocValues, String field, IntComp comp)  {
     this.globalDocValues = globalDocValues;
     this.docValues = globalDocValues;
@@ -52,6 +57,14 @@ class StringValue implements SortValue {
     this.present = false;
   }
 
+  public String getLastString() {
+    return this.lastString;
+  }
+
+  public void setLastString(String lastString) {
+    this.lastString = lastString;
+  }
+
   public StringValue copy() {
     return new StringValue(globalDocValues, field, comp);
   }
@@ -88,7 +101,12 @@ class StringValue implements SortValue {
 
   public Object getCurrentValue() throws IOException {
     assert present == true;
-    return docValues.lookupOrd(currentOrd);
+    if (currentOrd != lastOrd) {
+      lastBytes = docValues.lookupOrd(currentOrd);
+      lastOrd = currentOrd;
+      lastString = null;
+    }
+    return lastBytes;
   }
 
   public String getField() {
@@ -109,7 +127,7 @@ class StringValue implements SortValue {
   }
 
   public int compareTo(SortValue o) {
-    StringValue sv = (StringValue)o;
+    StringValue sv = (StringValue) o;
     return comp.compare(currentOrd, sv.currentOrd);
   }
 
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java b/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java
index 0d4bb72..772f639 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java
@@ -26,9 +26,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 class LimitStream extends TupleStream {
 
@@ -79,9 +77,7 @@ class LimitStream extends TupleStream {
   public Tuple read() throws IOException {
     ++count;
     if(count > limit) {
-      Map<String, String> fields = new HashMap<>();
-      fields.put("EOF", "true");
-      return new Tuple(fields);
+      return Tuple.EOF();
     }
 
     return stream.read();
diff --git a/solr/core/src/java/org/apache/solr/response/GraphMLResponseWriter.java b/solr/core/src/java/org/apache/solr/response/GraphMLResponseWriter.java
index 9bb7403..926d79f 100644
--- a/solr/core/src/java/org/apache/solr/response/GraphMLResponseWriter.java
+++ b/solr/core/src/java/org/apache/solr/response/GraphMLResponseWriter.java
@@ -95,9 +95,9 @@ public class GraphMLResponseWriter implements QueryResponseWriter {
         printWriter.write("<node id=\""+ xmlEscape(id)+"\"");
 
         List<String> outfields = new ArrayList();
-        Iterator<String> keys = tuple.fields.keySet().iterator();
+        Iterator<Object> keys = tuple.getFields().keySet().iterator();
         while(keys.hasNext()) {
-          String key = keys.next();
+          String key = String.valueOf(keys.next());
           if(key.equals("node") || key.equals("ancestors") || key.equals("collection")) {
             continue;
           } else {
diff --git a/solr/core/src/test/org/apache/solr/core/HelloStream.java b/solr/core/src/test/org/apache/solr/core/HelloStream.java
index be285e5..3702005 100644
--- a/solr/core/src/test/org/apache/solr/core/HelloStream.java
+++ b/solr/core/src/test/org/apache/solr/core/HelloStream.java
@@ -18,9 +18,7 @@
 package org.apache.solr.core;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -67,14 +65,10 @@ public class HelloStream extends TupleStream implements Expressible{
   @Override
   public Tuple read() throws IOException {
     if (isSentHelloWorld) {
-      Map m = new HashMap();
-      m.put("EOF", true);
-      return new Tuple(m);
+      return Tuple.EOF();
     } else {
       isSentHelloWorld = true;
-      Map m = new HashMap<>();
-      m.put("msg", "Hello World!");
-      return new Tuple(m);
+      return new Tuple("msg", "Hello World!");
     }
   }
 
diff --git a/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java b/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
index 4bd21fe..c033b79 100644
--- a/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
+++ b/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
@@ -42,6 +42,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestExportWriter extends SolrTestCaseJ4 {
+
+  private ObjectMapper mapper = new ObjectMapper();
   
   @BeforeClass
   public static void beforeClass() throws Exception {
@@ -706,6 +708,56 @@ public class TestExportWriter extends SolrTestCaseJ4 {
     validateSort(numDocs);
   }
 
+  private void createLargeIndex() throws Exception {
+    int BATCH_SIZE = 1000;
+    int NUM_BATCHES = 100;
+    SolrInputDocument[] docs = new SolrInputDocument[BATCH_SIZE];
+    for (int i = 0; i < NUM_BATCHES; i++) {
+      for (int j = 0; j < BATCH_SIZE; j++) {
+        docs[j] = new SolrInputDocument(
+            "id", String.valueOf(i * BATCH_SIZE + j),
+            "batch_i_p", String.valueOf(i),
+            "random_i_p", String.valueOf(random().nextInt(BATCH_SIZE)),
+            "sortabledv", TestUtil.randomSimpleString(random(), 2, 3),
+            "sortabledv_udvas", String.valueOf(random().nextInt(100)),
+            "small_i_p", String.valueOf((i + j) % 7)
+            );
+      }
+      updateJ(jsonAdd(docs), null);
+    }
+    assertU(commit());
+  }
+
+  @Test
+  public void testExpr() throws Exception {
+    assertU(delQ("*:*"));
+    assertU(commit());
+    createLargeIndex();
+    SolrQueryRequest req = req("q", "*:*", "qt", "/export", "fl", "id", "sort", "id asc", "expr", "top(n=2,input(),sort=\"id desc\")");
+    assertJQ(req,
+        "response/numFound==100000",
+        "response/docs/[0]/id=='99999'",
+        "response/docs/[1]/id=='99998'"
+        );
+    req = req("q", "*:*", "qt", "/export", "fl", "id,sortabledv_udvas", "sort", "sortabledv_udvas asc", "expr", "unique(input(),over=\"sortabledv_udvas\")");
+    String rsp = h.query(req);
+    Map<String, Object> rspMap = mapper.readValue(rsp, HashMap.class);
+    List<Map<String, Object>> docs = (List<Map<String, Object>>) Utils.getObjectByPath(rspMap, false, "/response/docs");
+    assertNotNull("missing document results: " + rspMap, docs);
+    assertEquals("wrong number of unique docs", 100, docs.size());
+    for (int i = 0; i < 99; i++) {
+      boolean found = false;
+      String si = String.valueOf(i);
+      for (int j = 0; j < docs.size(); j++) {
+        if (docs.get(j).get("sortabledv_udvas").equals(si)) {
+          found = true;
+          break;
+        }
+      }
+      assertTrue("missing value " + i + " in results", found);
+    }
+  }
+
   private void validateSort(int numDocs) throws Exception {
     // 10 fields
     List<String> fieldNames = new ArrayList<>(Arrays.asList("floatdv", "intdv", "stringdv", "longdv", "doubledv",
@@ -727,7 +779,6 @@ public class TestExportWriter extends SolrTestCaseJ4 {
     String fieldsStr = String.join(",", fieldStrs); // fl :  field1, field2
 
     String resp = h.query(req("q", "*:*", "qt", "/export", "fl", "id," + fieldsStr, "sort", sortStr));
-    ObjectMapper mapper = new ObjectMapper();
     HashMap respMap = mapper.readValue(resp, HashMap.class);
     List docs = (ArrayList) ((HashMap) respMap.get("response")).get("docs");
 
diff --git a/solr/solr-ref-guide/src/exporting-result-sets.adoc b/solr/solr-ref-guide/src/exporting-result-sets.adoc
index b0565d8..8a072f2 100644
--- a/solr/solr-ref-guide/src/exporting-result-sets.adoc
+++ b/solr/solr-ref-guide/src/exporting-result-sets.adoc
@@ -59,6 +59,34 @@ It can get worse otherwise.
 
 The `fl` property defines the fields that will be exported with the result set. Any of the field types that can be sorted (i.e., int, long, float, double, string, date, boolean) can be used in the field list. The fields can be single or multi-valued. However, returning scores and wildcards are not supported at this time.
 
+=== Specifying the Local Streaming Expression
+
+The optional `expr` property defines a <<streaming-expressions.adoc#streaming-expressions,stream expression>> that allows documents to be processed locally before they are exported in the result set.
+
+Expressions have to use a special `input()` stream that represents original results from the `/export` handler. Output from the stream expression then becomes the output from the `/export` handler. The `&streamLocalOnly=true` flag is always set for this streaming expression.
+
+Only stream <<stream-decorator-reference.adoc#stream-decorator-reference,decorators>> and <<stream-evaluator-reference.adoc#stream-evaluator-reference,evaluators>> are supported in these expressions - using any of the <<stream-source-reference.adoc#stream-source-reference,source>> expressions except for the pre-defined `input()` will result in an error.
+
+Using stream expressions with the `/export` handler may result in dramatic performance improvements due to the local in-memory reduction of the number of documents to be returned.
+
+Here's an example of using `top` decorator for returning only top N results:
+[source,text]
+----
+http://localhost:8983/solr/core_name/export?q=my-query&sort=timestamp+desc,&fl=timestamp,reporter,severity&expr=top(n=2,input(),sort="timestamp+desc")
+----
+
+(Note that the sort spec in the `top` decorator must match the sort spec in the
+handler parameter).
+
+Here's an example of using `unique` decorator:
+
+[source,text]
+----
+http://localhost:8983/solr/core_name/export?q=my-query&sort=reporter+desc,&fl=reporter&expr=unique(input(),over="reporter")
+----
+
+(Note that the `over` parameter must use one of the fields requested in the `fl` parameter).
+
 == Distributed Support
 
 See the section <<streaming-expressions.adoc#streaming-expressions,Streaming Expressions>> for distributed support.
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
index 56d86fe..2bdb2aa 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.params.StreamParams;
 
 /**
  *  A simple abstraction of a record containing key/value pairs.
@@ -40,28 +41,58 @@ public class Tuple implements Cloneable, MapWriter {
    *  The EOF Tuple will not contain a record from the stream, but it may contain
    *  metrics/aggregates gathered by underlying streams.
    * */
-
   public boolean EOF;
+  /**
+   * When EXCEPTION field is true the Tuple marks an exception in the stream
+   * and the corresponding "EXCEPTION" field contains a related message.
+   */
   public boolean EXCEPTION;
 
-  public Map fields = new HashMap();
+  /**
+   * Tuple fields.
+   * @deprecated use {@link #getFields()} instead of this public field.
+   */
+  public Map<Object, Object> fields = new HashMap<>(2);
+  /**
+   * External serializable field names.
+   * @deprecated use {@link #getFieldNames()} instead of this public field.
+   */
   public List<String> fieldNames;
+  /**
+   * Mapping of external field names to internal tuple field names.
+   * @deprecated use {@link #getFieldLabels()} instead of this public field.
+   */
   public Map<String, String> fieldLabels;
 
-  public Tuple(){
+  public Tuple() {
     // just an empty tuple
   }
-  
-  public Tuple(Map fields) {
-    if(fields.containsKey("EOF")) {
-      EOF = true;
-    }
 
-    if(fields.containsKey("EXCEPTION")){
-      EXCEPTION = true;
+  /**
+   * A copy constructor.
+   * @param fields map containing keys and values to be copied to this tuple
+   */
+  public Tuple(Map<?, ?> fields) {
+    for (Map.Entry<?, ?> entry : fields.entrySet()) {
+      put(entry.getKey(), entry.getValue());
     }
+  }
 
-    this.fields.putAll(fields);
+  /**
+   * Constructor that accepts an even number of arguments as key / value pairs.
+   * @param fields a list of key / value pairs, with keys at odd and values at
+   *               even positions.
+   */
+  public Tuple(Object... fields) {
+    if (fields == null) {
+      return;
+    }
+    if ((fields.length % 2) != 0) {
+      throw new RuntimeException("must have a matching number of key-value pairs");
+    }
+    for (int i = 0; i < fields.length; i += 2) {
+      put(fields[i], fields[i + 1]);
+    }
   }
 
   public Object get(Object key) {
@@ -70,9 +101,14 @@ public class Tuple implements Cloneable, MapWriter {
 
   public void put(Object key, Object value) {
     this.fields.put(key, value);
+    if (key.equals(StreamParams.EOF)) {
+      EOF = true;
+    } else if (key.equals(StreamParams.EXCEPTION)) {
+      EXCEPTION = true;
+    }
   }
-  
-  public void remove(Object key){
+
+  public void remove(Object key) {
     this.fields.remove(key);
   }
 
@@ -80,16 +116,16 @@ public class Tuple implements Cloneable, MapWriter {
     return String.valueOf(this.fields.get(key));
   }
 
-  public String getException(){ return (String)this.fields.get("EXCEPTION"); }
+  public String getException() { return (String)this.fields.get(StreamParams.EXCEPTION); }
 
   public Long getLong(Object key) {
     Object o = this.fields.get(key);
 
-    if(o == null) {
+    if (o == null) {
       return null;
     }
 
-    if(o instanceof Long) {
+    if (o instanceof Long) {
       return (Long) o;
     } else if (o instanceof Number) {
       return ((Number)o).longValue();
@@ -149,11 +185,11 @@ public class Tuple implements Cloneable, MapWriter {
   public Double getDouble(Object key) {
     Object o = this.fields.get(key);
 
-    if(o == null) {
+    if (o == null) {
       return null;
     }
 
-    if(o instanceof Double) {
+    if (o instanceof Double) {
       return (Double)o;
     } else {
       //Attempt to parse the double
@@ -173,39 +209,78 @@ public class Tuple implements Cloneable, MapWriter {
     return (List<Double>)this.fields.get(key);
   }
 
+  /**
+   * Return all tuple fields and their values.
+   */
+  public Map<Object, Object> getFields() {
+    return this.fields;
+  }
+
+  /**
+   * Return all tuple fields.
+   * @deprecated use {@link #getFields()} instead.
+   */
+  @Deprecated(since = "8.6.0")
   public Map getMap() {
     return this.fields;
   }
 
+  /**
+   * This represents the mapping of external field labels to the tuple's
+   * internal field names if they are different from field names.
+   * @return field labels or null
+   */
+  public Map<String, String> getFieldLabels() {
+    return fieldLabels;
+  }
+
+  public void setFieldLabels(Map<String, String> fieldLabels) {
+    this.fieldLabels = fieldLabels;
+  }
+
+  /**
+   * A list of field names to serialize. This list (together with
+   * the mapping in {@link #getFieldLabels()} determines what tuple values
+   * are serialized and their external (serialized) names.
+   * @return list of external field names or null
+   */
+  public List<String> getFieldNames() {
+    return fieldNames;
+  }
+
+  public void setFieldNames(List<String> fieldNames) {
+    this.fieldNames = fieldNames;
+  }
+
   public List<Map> getMaps(Object key) {
-    return (List<Map>)this.fields.get(key);
+    return (List<Map>) this.fields.get(key);
   }
 
   public void setMaps(Object key, List<Map> maps) {
     this.fields.put(key, maps);
   }
 
-  public Map<String,Map> getMetrics() {
-    return (Map<String,Map>)this.fields.get("_METRICS_");
+  public Map<String, Map> getMetrics() {
+    return (Map<String, Map>) this.fields.get(StreamParams.METRICS);
   }
 
   public void setMetrics(Map<String, Map> metrics) {
-    this.fields.put("_METRICS_", metrics);
+    this.fields.put(StreamParams.METRICS, metrics);
   }
 
   public Tuple clone() {
-    HashMap m = new HashMap(fields);
-    Tuple clone = new Tuple(m);
+    Tuple clone = new Tuple();
+    clone.fields.putAll(fields);
     return clone;
   }
   
-  public void merge(Tuple other){
-    fields.putAll(other.getMap());
+  public void merge(Tuple other) {
+    fields.putAll(other.getFields());
   }
 
   @Override
   public void writeMap(EntryWriter ew) throws IOException {
-    if(fieldNames == null) {
+    if (fieldNames == null) {
       fields.forEach((k, v) -> {
         try {
           ew.put((String) k, v);
@@ -214,10 +289,33 @@ public class Tuple implements Cloneable, MapWriter {
         }
       });
     } else {
-      for(String fieldName : fieldNames) {
+      for (String fieldName : fieldNames) {
         String label = fieldLabels.get(fieldName);
         ew.put(label, fields.get(label));
       }
     }
   }
+
+  /**
+   * Create a new empty tuple marked as EOF.
+   */
+  public static Tuple EOF() {
+    Tuple tuple = new Tuple();
+    tuple.put(StreamParams.EOF, true);
+    return tuple;
+  }
+
+  /**
+   * Create a new empty tuple marked as EXCEPTION, and optionally EOF.
+   * @param msg exception message
+   * @param eof if true the tuple will be marked as EOF
+   */
+  public static Tuple EXCEPTION(String msg, boolean eof) {
+    Tuple tuple = new Tuple();
+    tuple.put(StreamParams.EXCEPTION, msg);
+    if (eof) {
+      tuple.put(StreamParams.EOF, true);
+    }
+    return tuple;
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AnovaEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AnovaEvaluator.java
index f859392..b570712 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AnovaEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AnovaEvaluator.java
@@ -18,16 +18,15 @@ package org.apache.solr.client.solrj.io.eval;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.commons.math3.stat.inference.OneWayAnova;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.StreamParams;
 
 public class AnovaEvaluator extends RecursiveNumericListEvaluator implements ManyValueWorker {
   protected static final long serialVersionUID = 1L;
@@ -55,10 +54,10 @@ public class AnovaEvaluator extends RecursiveNumericListEvaluator implements Man
     OneWayAnova anova = new OneWayAnova();
     double p = anova.anovaPValue(anovaInput);
     double f = anova.anovaFValue(anovaInput);
-    Map<String,Number> m = new HashMap<>();
-    m.put("p-value", p);
-    m.put("f-ratio", f);
-    return new Tuple(m);
+    Tuple tuple = new Tuple();
+    tuple.put(StreamParams.P_VALUE, p);
+    tuple.put("f-ratio", f);
+    return tuple;
   }
   
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ChiSquareDataSetEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ChiSquareDataSetEvaluator.java
index 70131d0..26ab319 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ChiSquareDataSetEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ChiSquareDataSetEvaluator.java
@@ -18,14 +18,13 @@
 package org.apache.solr.client.solrj.io.eval;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.math3.stat.inference.ChiSquareTest;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.StreamParams;
 
 
 public class ChiSquareDataSetEvaluator extends RecursiveNumericListEvaluator implements TwoValueWorker {
@@ -58,10 +57,10 @@ public class ChiSquareDataSetEvaluator extends RecursiveNumericListEvaluator imp
     double chiSquare = chiSquareTest.chiSquareDataSetsComparison(sampleA, sampleB);
     double p = chiSquareTest.chiSquareTestDataSetsComparison(sampleA, sampleB);
 
-    Map<String,Number> m = new HashMap<>();
-    m.put("chisquare-statistic", chiSquare);
-    m.put("p-value", p);
-    return new Tuple(m);
+    Tuple tuple = new Tuple();
+    tuple.put("chisquare-statistic", chiSquare);
+    tuple.put(StreamParams.P_VALUE, p);
+    return tuple;
 
   }
 }
\ No newline at end of file
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DescribeEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DescribeEvaluator.java
index 2fce7a0..27ef0de 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DescribeEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DescribeEvaluator.java
@@ -17,10 +17,8 @@
 package org.apache.solr.client.solrj.io.eval;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.solr.client.solrj.io.Tuple;
@@ -49,20 +47,20 @@ public class DescribeEvaluator extends RecursiveNumericEvaluator implements OneV
     DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics();
     ((List<?>)value).stream().mapToDouble(innerValue -> ((Number)innerValue).doubleValue()).forEach(innerValue -> descriptiveStatistics.addValue(innerValue));
 
-    Map<String,Number> map = new HashMap<>();
-    map.put("max", descriptiveStatistics.getMax());
-    map.put("mean", descriptiveStatistics.getMean());
-    map.put("min", descriptiveStatistics.getMin());
-    map.put("stdev", descriptiveStatistics.getStandardDeviation());
-    map.put("sum", descriptiveStatistics.getSum());
-    map.put("N", descriptiveStatistics.getN());
-    map.put("var", descriptiveStatistics.getVariance());
-    map.put("kurtosis", descriptiveStatistics.getKurtosis());
-    map.put("skewness", descriptiveStatistics.getSkewness());
-    map.put("popVar", descriptiveStatistics.getPopulationVariance());
-    map.put("geometricMean", descriptiveStatistics.getGeometricMean());
-    map.put("sumsq", descriptiveStatistics.getSumsq());
+    Tuple tuple = new Tuple();
+    tuple.put("max", descriptiveStatistics.getMax());
+    tuple.put("mean", descriptiveStatistics.getMean());
+    tuple.put("min", descriptiveStatistics.getMin());
+    tuple.put("stdev", descriptiveStatistics.getStandardDeviation());
+    tuple.put("sum", descriptiveStatistics.getSum());
+    tuple.put("N", descriptiveStatistics.getN());
+    tuple.put("var", descriptiveStatistics.getVariance());
+    tuple.put("kurtosis", descriptiveStatistics.getKurtosis());
+    tuple.put("skewness", descriptiveStatistics.getSkewness());
+    tuple.put("popVar", descriptiveStatistics.getPopulationVariance());
+    tuple.put("geometricMean", descriptiveStatistics.getGeometricMean());
+    tuple.put("sumsq", descriptiveStatistics.getSumsq());
 
-    return new Tuple(map);
+    return tuple;
   }  
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FrequencyTableEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FrequencyTableEvaluator.java
index efab902..2000178 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FrequencyTableEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FrequencyTableEvaluator.java
@@ -19,11 +19,9 @@ package org.apache.solr.client.solrj.io.eval;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 
 import org.apache.commons.math3.stat.Frequency;
 
@@ -72,13 +70,13 @@ public class FrequencyTableEvaluator extends RecursiveNumericEvaluator implement
 
     while(iterator.hasNext()){
       Long value = (Long)iterator.next();
-      Map<String,Number> map = new HashMap<>();
-      map.put("value", value.longValue());
-      map.put("count", frequency.getCount(value));
-      map.put("cumFreq", frequency.getCumFreq(value));
-      map.put("cumPct", frequency.getCumPct(value));
-      map.put("pct", frequency.getPct(value));
-      histogramBins.add(new Tuple(map));
+      Tuple tuple = new Tuple();
+      tuple.put("value", value.longValue());
+      tuple.put("count", frequency.getCount(value));
+      tuple.put("cumFreq", frequency.getCumFreq(value));
+      tuple.put("cumPct", frequency.getCumPct(value));
+      tuple.put("pct", frequency.getPct(value));
+      histogramBins.add(tuple);
     }
     return histogramBins;
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/GTestDataSetEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/GTestDataSetEvaluator.java
index 6aa5d8a..e83b4f9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/GTestDataSetEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/GTestDataSetEvaluator.java
@@ -18,14 +18,13 @@
 package org.apache.solr.client.solrj.io.eval;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.math3.stat.inference.GTest;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.StreamParams;
 
 
 public class GTestDataSetEvaluator extends RecursiveNumericListEvaluator implements TwoValueWorker {
@@ -58,9 +57,9 @@ public class GTestDataSetEvaluator extends RecursiveNumericListEvaluator impleme
     double g = gTest.gDataSetsComparison(sampleA, sampleB);
     double p = gTest.gTestDataSetsComparison(sampleA, sampleB);
 
-    Map<String,Number> m = new HashMap<>();
-    m.put("G-statistic", g);
-    m.put("p-value", p);
-    return new Tuple(m);
+   Tuple tuple = new Tuple();
+    tuple.put("G-statistic", g);
+    tuple.put(StreamParams.P_VALUE, p);
+    return tuple;
   }
 }
\ No newline at end of file
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/HistogramEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/HistogramEvaluator.java
index 8d27614..fd6fcf6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/HistogramEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/HistogramEvaluator.java
@@ -19,10 +19,8 @@ package org.apache.solr.client.solrj.io.eval;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 
 import org.apache.commons.math3.random.EmpiricalDistribution;
 import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
@@ -71,17 +69,17 @@ public class HistogramEvaluator extends RecursiveNumericEvaluator implements Man
 
     List<Tuple> histogramBins = new ArrayList<>();
     for(SummaryStatistics binSummary : distribution.getBinStats()) {
-      Map<String,Number> map = new HashMap<>();
-      map.put("max", binSummary.getMax());
-      map.put("mean", binSummary.getMean());
-      map.put("min", binSummary.getMin());
-      map.put("stdev", binSummary.getStandardDeviation());
-      map.put("sum", binSummary.getSum());
-      map.put("N", binSummary.getN());
-      map.put("var", binSummary.getVariance());
-      map.put("cumProb", distribution.cumulativeProbability(binSummary.getMean()));
-      map.put("prob", distribution.probability(binSummary.getMin(), binSummary.getMax()));
-      histogramBins.add(new Tuple(map));
+      Tuple tuple = new Tuple();
+      tuple.put("max", binSummary.getMax());
+      tuple.put("mean", binSummary.getMean());
+      tuple.put("min", binSummary.getMin());
+      tuple.put("stdev", binSummary.getStandardDeviation());
+      tuple.put("sum", binSummary.getSum());
+      tuple.put("N", binSummary.getN());
+      tuple.put("var", binSummary.getVariance());
+      tuple.put("cumProb", distribution.cumulativeProbability(binSummary.getMean()));
+      tuple.put("prob", distribution.probability(binSummary.getMin(), binSummary.getMax()));
+      histogramBins.add(tuple);
     }
     
     return histogramBins;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/KolmogorovSmirnovEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/KolmogorovSmirnovEvaluator.java
index 58e783e..27256b1 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/KolmogorovSmirnovEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/KolmogorovSmirnovEvaluator.java
@@ -17,16 +17,15 @@
 package org.apache.solr.client.solrj.io.eval;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 
 import org.apache.commons.math3.distribution.RealDistribution;
 import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.StreamParams;
 
 public class KolmogorovSmirnovEvaluator extends RecursiveObjectEvaluator implements TwoValueWorker {
 
@@ -54,17 +53,17 @@ public class KolmogorovSmirnovEvaluator extends RecursiveObjectEvaluator impleme
     if(first instanceof RealDistribution){
       RealDistribution realDistribution = (RealDistribution)first;
 
-      Map<String,Double> m = new HashMap<>();
-      m.put("p-value", ks.kolmogorovSmirnovTest(realDistribution, data));
-      m.put("d-statistic", ks.kolmogorovSmirnovStatistic(realDistribution, data));
-      return new Tuple(m);
+      Tuple tuple = new Tuple();
+      tuple.put(StreamParams.P_VALUE, ks.kolmogorovSmirnovTest(realDistribution, data));
+      tuple.put("d-statistic", ks.kolmogorovSmirnovStatistic(realDistribution, data));
+      return tuple;
     }
     else if(first instanceof List<?> && ((List<?>) first).stream().noneMatch(item -> !(item instanceof Number))){
       double[] data2 = ((List<?>)first).stream().mapToDouble(item -> ((Number)item).doubleValue()).toArray();
-      
-      Map<String,Double> m = new HashMap<>();
-      m.put("d-statistic", ks.kolmogorovSmirnovTest(data, data2));
-      return new Tuple(m);
+
+      Tuple tuple = new Tuple();
+      tuple.put("d-statistic", ks.kolmogorovSmirnovTest(data, data2));
+      return tuple;
     }
     else{
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - found type %s for the first value, expecting a RealDistribution or list of numbers",toExpression(constructingFactory), first.getClass().getSimpleName()));
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/MannWhitneyUEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/MannWhitneyUEvaluator.java
index d8cb214..6c6e278 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/MannWhitneyUEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/MannWhitneyUEvaluator.java
@@ -19,16 +19,15 @@ package org.apache.solr.client.solrj.io.eval;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.commons.math3.stat.inference.MannWhitneyUTest;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.StreamParams;
 
 
 public class MannWhitneyUEvaluator extends RecursiveNumericListEvaluator implements ManyValueWorker {
@@ -52,10 +51,10 @@ public class MannWhitneyUEvaluator extends RecursiveNumericListEvaluator impleme
       MannWhitneyUTest mannwhitneyutest = new MannWhitneyUTest();
       double u = mannwhitneyutest.mannWhitneyU(mannWhitneyUInput.get(0), mannWhitneyUInput.get(1));
       double p = mannwhitneyutest.mannWhitneyUTest(mannWhitneyUInput.get(0), mannWhitneyUInput.get(1));
-      Map<String,Number> m = new HashMap<>();
-      m.put("u-statistic", u);
-      m.put("p-value", p);
-      return new Tuple(m);
+      Tuple tuple = new Tuple();
+      tuple.put("u-statistic", u);
+      tuple.put(StreamParams.P_VALUE, p);
+      return tuple;
     }else{
       throw new IOException(String.format(Locale.ROOT,"%s(...) only works with a list of 2 arrays but a list of %d array(s) was provided.", constructingFactory.getFunctionName(getClass()), mannWhitneyUInput.size()));
     }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/OutliersEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/OutliersEvaluator.java
index 0f2474d..ac6b854 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/OutliersEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/OutliersEvaluator.java
@@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.eval;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 
 import org.apache.commons.math3.distribution.IntegerDistribution;
@@ -77,7 +76,7 @@ public class OutliersEvaluator extends RecursiveObjectEvaluator implements ManyV
     } else {
       tuples = new ArrayList<>();
       for(int i=0; i<vec.size(); i++) {
-        tuples.add(new Tuple(new HashMap()));
+        tuples.add(new Tuple());
       }
     }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/PairedTTestEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/PairedTTestEvaluator.java
index 371f90f..fc865db 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/PairedTTestEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/PairedTTestEvaluator.java
@@ -17,15 +17,14 @@
 package org.apache.solr.client.solrj.io.eval;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 
 import org.apache.commons.math3.stat.inference.TTest;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.StreamParams;
 
 public class PairedTTestEvaluator extends RecursiveNumericListEvaluator implements TwoValueWorker {
   protected static final long serialVersionUID = 1L;
@@ -42,9 +41,7 @@ public class PairedTTestEvaluator extends RecursiveNumericListEvaluator implemen
   public Object doWork(Object value1, Object value2) throws IOException {
 
     TTest tTest = new TTest();
-    @SuppressWarnings({"rawtypes"})
-    Map map = new HashMap();
-    Tuple tuple = new Tuple(map);
+    Tuple tuple = new Tuple();
     if(value1 instanceof List) {
       @SuppressWarnings({"unchecked"})
       List<Number> values1 = (List<Number>)value1;
@@ -66,7 +63,7 @@ public class PairedTTestEvaluator extends RecursiveNumericListEvaluator implemen
         double tstat = tTest.pairedT(samples1, samples2);
         double pval = tTest.pairedTTest(samples1, samples2);
         tuple.put("t-statistic", tstat);
-        tuple.put("p-value", pval);
+        tuple.put(StreamParams.P_VALUE, pval);
         return tuple;
       } else {
         throw new IOException("Second parameter for pairedTtest must be a double array");
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/RecursiveEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/RecursiveEvaluator.java
index 30455be..04f987a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/RecursiveEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/RecursiveEvaluator.java
@@ -22,8 +22,6 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
@@ -119,7 +117,6 @@ public abstract class RecursiveEvaluator implements StreamEvaluator, ValueWorker
 
   }
   
-  @SuppressWarnings({"unchecked"})
   protected Object normalizeOutputType(Object value) {
     if(null == value){
       return null;
@@ -145,15 +142,13 @@ public abstract class RecursiveEvaluator implements StreamEvaluator, ValueWorker
       //If its a tuple and not a inner class that has extended tuple, which is done in a number of cases so that mathematical models
       //can be contained within a tuple.
 
-      @SuppressWarnings({"rawtypes"})
       Tuple tuple = (Tuple)value;
-      @SuppressWarnings({"rawtypes"})
-      Map map = new HashMap();
-      for(Object o : tuple.fields.keySet()) {
-        Object v = tuple.fields.get(o);
-        map.put(o, normalizeOutputType(v));
+      Tuple newTuple = new Tuple();
+      for(Object o : tuple.getFields().keySet()) {
+        Object v = tuple.get(o);
+        newTuple.put(o, normalizeOutputType(v));
       }
-      return new Tuple(map);
+      return newTuple;
     }
     else{
       // anything else can just be returned as is
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SetValueEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SetValueEvaluator.java
index fb85e8d..c56ecc2 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SetValueEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SetValueEvaluator.java
@@ -19,8 +19,6 @@ package org.apache.solr.client.solrj.io.eval;
 import java.io.IOException;
 
 import java.util.Locale;
-import java.util.Map;
-import java.util.HashMap;
 
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
@@ -38,7 +36,6 @@ public class SetValueEvaluator extends RecursiveObjectEvaluator implements ManyV
   }
 
   @Override
-  @SuppressWarnings({"unchecked"})
   public Object doWork(Object... values) throws IOException {
     if(values[0] instanceof Tuple) {
       Tuple tuple = (Tuple)values[0];
@@ -48,10 +45,9 @@ public class SetValueEvaluator extends RecursiveObjectEvaluator implements ManyV
         value = ((String)value).replace("\"", "");
       }
       key = key.replace("\"", "");
-      @SuppressWarnings({"rawtypes"})
-      Map map = new HashMap(tuple.fields);
-      map.put(key, value);
-      return new Tuple(map);
+      Tuple newTuple = tuple.clone();
+      newTuple.put(key, value);
+      return newTuple;
     } else {
       throw new IOException("The setValue function expects a Tuple as the first parameter");
     }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/TTestEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/TTestEvaluator.java
index dc8b37a..acc5e80 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/TTestEvaluator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/TTestEvaluator.java
@@ -17,15 +17,14 @@
 package org.apache.solr.client.solrj.io.eval;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 
 import org.apache.commons.math3.stat.inference.TTest;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.StreamParams;
 
 public class TTestEvaluator extends RecursiveNumericEvaluator implements TwoValueWorker {
   protected static final long serialVersionUID = 1L;
@@ -42,9 +41,7 @@ public class TTestEvaluator extends RecursiveNumericEvaluator implements TwoValu
   public Object doWork(Object value1, Object value2) throws IOException {
 
     TTest tTest = new TTest();
-    @SuppressWarnings({"rawtypes"})
-    Map map = new HashMap();
-    Tuple tuple = new Tuple(map);
+    Tuple tuple = new Tuple();
     if(value1 instanceof Number) {
       double mean = ((Number) value1).doubleValue();
 
@@ -60,7 +57,7 @@ public class TTestEvaluator extends RecursiveNumericEvaluator implements TwoValu
         double pval = tTest.tTest(mean, samples);
 
         tuple.put("t-statistic", tstat);
-        tuple.put("p-value", pval);
+        tuple.put(StreamParams.P_VALUE, pval);
         return tuple;
       } else {
         throw new IOException("Second parameter for ttest must be a double array");
@@ -87,7 +84,7 @@ public class TTestEvaluator extends RecursiveNumericEvaluator implements TwoValu
         double tstat = tTest.t(samples1, samples2);
         double pval = tTest.tTest(samples1, samples2);
         tuple.put("t-statistic", tstat);
-        tuple.put("p-value", pval);
+        tuple.put(StreamParams.P_VALUE, pval);
         return tuple;
       } else {
         throw new IOException("Second parameter for ttest must be a double array");
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
index 5796065..702d034 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java
@@ -613,10 +613,7 @@ public class GatherNodesStream extends TupleStream implements Expressible {
     if (out.hasNext()) {
       return out.next();
     } else {
-      Map map = new HashMap();
-      map.put("EOF", true);
-      Tuple tuple = new Tuple(map);
-      return tuple;
+      return Tuple.EOF();
     }
   }
 
@@ -645,14 +642,10 @@ public class GatherNodesStream extends TupleStream implements Expressible {
     public void setStreamContext(StreamContext context) {}
 
     public Tuple read() {
-      HashMap map = new HashMap();
       if(it.hasNext()) {
-        map.put("node",it.next());
-        return new Tuple(map);
+        return new Tuple("node",it.next());
       } else {
-
-        map.put("EOF", true);
-        return new Tuple(map);
+        return Tuple.EOF();
       }
     }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java
index befa5a7..7c99f75 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java
@@ -51,12 +51,12 @@ public class Node {
   }
 
   public Tuple toTuple(String collection, String field, int level, Traversal traversal) {
-    Map map = new HashMap();
+    Tuple tuple = new Tuple();
 
-    map.put("node", id);
-    map.put("collection", collection);
-    map.put("field", field);
-    map.put("level", level);
+    tuple.put("node", id);
+    tuple.put("collection", collection);
+    tuple.put("field", field);
+    tuple.put("level", level);
 
     boolean prependCollection = traversal.isMultiCollection();
     List<String> cols = traversal.getCollections();
@@ -76,15 +76,15 @@ public class Node {
         }
       }
 
-      map.put("ancestors", l);
+      tuple.put("ancestors", l);
     }
 
     if(metrics != null) {
       for(Metric metric : metrics) {
-        map.put(metric.getIdentifier(), metric.getValue());
+        tuple.put(metric.getIdentifier(), metric.getValue());
       }
     }
 
-    return new Tuple(map);
+    return tuple;
   }
 }
\ No newline at end of file
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java
index 314ab92..9d12e48 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java
@@ -403,8 +403,7 @@ public class ShortestPathStream extends TupleStream implements Expressible {
         for(LinkedList p : paths) {
           String s = p.toString();
           if (!finalPaths.contains(s)){
-            Tuple shortestPath = new Tuple(new HashMap());
-            shortestPath.put("path", p);
+            Tuple shortestPath = new Tuple("path", p);
             shortestPaths.add(shortestPath);
             finalPaths.add(s);
           }
@@ -501,12 +500,11 @@ public class ShortestPathStream extends TupleStream implements Expressible {
       Tuple t = shortestPaths.removeFirst();
       return t;
     } else {
-      Map m = new HashMap();
-      m.put("EOF", true);
+      Tuple tuple = Tuple.EOF();
       if(!found) {
-        m.put("sorry", "No path found");
+        tuple.put("sorry", "No path found");
       }
-      return new Tuple(m);
+      return tuple;
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java
index 3db76ec..59d83c5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
@@ -108,15 +107,15 @@ public class GroupOperation implements ReduceOperation {
   public Tuple reduce() {
     LinkedList ll = new LinkedList();
     while(priorityQueue.size() > 0) {
-      ll.addFirst(priorityQueue.poll().getMap());
+      ll.addFirst(priorityQueue.poll().getFields());
       //This will clear priority queue and so it will be ready for the next group.
     }
 
     List<Map> list = new ArrayList(ll);
     Map groupHead = list.get(0);
-    Map map = new HashMap(groupHead);
-    map.put("group", list);
-    return new Tuple(map);
+    Tuple tuple = new Tuple(groupHead);
+    tuple.put("group", list);
+    return tuple;
   }
 
   public void operate(Tuple tuple) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CalculatorStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CalculatorStream.java
index 49b5953..d2efbb2 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CalculatorStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CalculatorStream.java
@@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 
 import org.apache.solr.client.solrj.io.Tuple;
@@ -86,16 +85,11 @@ public class CalculatorStream extends TupleStream implements Expressible {
 
   public Tuple read() throws IOException {
 
-    if(finished) {
-      HashMap m = new HashMap();
-      m.put("EOF", true);
-      Tuple tuple = new Tuple(m);
-      return tuple;
+    if (finished) {
+      return Tuple.EOF();
     } else {
-      HashMap m = new HashMap();
-      Tuple tuple = new Tuple(m);
       finished = true;
-      return tuple;
+      return new Tuple();
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java
index a87c9ee..fac8ca8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java
@@ -18,10 +18,8 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -134,9 +132,8 @@ public class CellStream extends TupleStream implements Expressible {
         }
       }
 
-      Map map = new HashMap();
-      map.put(name, list);
-      tuple = new Tuple(map);
+      tuple = new Tuple();
+      tuple.put(name, list);
     } finally {
       stream.close();
     }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 95cf239..dfa0211 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -453,14 +453,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
       }
       return t;
     } else {
-      Map m = new HashMap();
+      Tuple tuple = Tuple.EOF();
       if(trace) {
-        m.put("_COLLECTION_", this.collection);
+        tuple.put("_COLLECTION_", this.collection);
       }
-
-      m.put("EOF", true);
-
-      return new Tuple(m);
+      return tuple;
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java
index b29ea09..5885862 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java
@@ -123,7 +123,7 @@ public class CommitStream extends TupleStream implements Expressible {
       // if the read document contains field 'batchIndexed' then it's a summary
       // document and we can update our count based on it's value. If not then 
       // just increment by 1
-      if(tuple.fields.containsKey(UpdateStream.BATCH_INDEXED_FIELD_NAME) && isInteger(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME))){
+      if(tuple.getFields().containsKey(UpdateStream.BATCH_INDEXED_FIELD_NAME) && isInteger(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME))){
         docsSinceCommit += Integer.parseInt(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME));
       }
       else{
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java
index 561204f..386cb5d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java
@@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 
@@ -130,7 +129,7 @@ public class CsvStream extends TupleStream implements Expressible {
         if(fields.length != headers.length) {
           throw new IOException("Headers and lines must have the same number of fields [file:"+file+" line number:"+lineNumber+"]");
         }
-        Tuple out = new Tuple(new HashMap());
+        Tuple out = new Tuple();
         out.put("id", file+"_"+lineNumber);
         for(int i=0; i<headers.length; i++) {
           if(fields[i] != null && fields[i].length() > 0) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index 0257be9..962b61a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -21,7 +21,6 @@ import java.lang.Thread.State;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -252,13 +251,13 @@ public class DaemonStream extends TupleStream implements Expressible {
   }
 
   public synchronized Tuple getInfo() {
-    Tuple tuple = new Tuple(new HashMap());
+    Tuple tuple = new Tuple();
     tuple.put(ID, id);
     tuple.put("startTime", startTime);
     tuple.put("stopTime", stopTime);
     tuple.put("iterations", iterations.get());
     tuple.put("state", streamRunner.getState().toString());
-    if(exception != null) {
+    if (exception != null) {
       tuple.put("exception", exception.getMessage());
     }
 
@@ -338,7 +337,7 @@ public class DaemonStream extends TupleStream implements Expressible {
               Tuple tuple = tupleStream.read();
               if (tuple.EOF) {
                 errors = 0; // Reset errors on successful run.
-                if (tuple.fields.containsKey("sleepMillis")) {
+                if (tuple.getFields().containsKey("sleepMillis")) {
                   this.sleepMillis = tuple.getLong("sleepMillis");
 
                   if(terminate && sleepMillis > 0) {
@@ -400,11 +399,8 @@ public class DaemonStream extends TupleStream implements Expressible {
       }
 
       if(!eatTuples) {
-        Map m = new HashMap();
-        m.put("EOF", true);
-        Tuple tuple = new Tuple(m);
         try {
-          queue.put(tuple);
+          queue.put(Tuple.EOF());
         } catch (InterruptedException e) {
           log.error("Error in DaemonStream:{}", id, e);
         }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
index 9c9c201..1d53604 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
@@ -403,14 +403,11 @@ public class DeepRandomStream extends TupleStream implements Expressible {
       }
       return t;
     } else {
-      Map m = new HashMap();
+      Tuple tuple = Tuple.EOF();
       if(trace) {
-        m.put("_COLLECTION_", this.collection);
+        tuple.put("_COLLECTION_", this.collection);
       }
-
-      m.put("EOF", true);
-
-      return new Tuple(m);
+      return tuple;
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EchoStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EchoStream.java
index 38e1cca..7749a0f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EchoStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EchoStream.java
@@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 
 import org.apache.solr.client.solrj.io.Tuple;
@@ -96,16 +95,10 @@ public class EchoStream extends TupleStream implements Expressible {
   public Tuple read() throws IOException {
 
     if(finished) {
-      HashMap m = new HashMap();
-      m.put("EOF", true);
-      Tuple tuple = new Tuple(m);
-      return tuple;
+      return Tuple.EOF();
     } else {
-      HashMap m = new HashMap();
-      m.put("echo", echo);
-      Tuple tuple = new Tuple(m);
       finished = true;
-      return tuple;
+      return new Tuple("echo", echo);
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java
index 9d1f450..d6cabf1 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java
@@ -18,9 +18,7 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -60,21 +58,15 @@ public class ExceptionStream extends TupleStream {
   public Tuple read() {
     if(openException != null) {
       //There was an exception during the open.
-      Map fields = new HashMap();
-      fields.put("EXCEPTION", openException.getMessage());
-      fields.put("EOF", true);
       SolrException.log(log, openException);
-      return new Tuple(fields);
+      return Tuple.EXCEPTION(openException.getMessage(), true);
     }
 
     try {
       return stream.read();
     } catch (Exception e) {
-      Map fields = new HashMap();
-      fields.put("EXCEPTION", e.getMessage());
-      fields.put("EOF", true);
       SolrException.log(log, e);
-      return new Tuple(fields);
+      return Tuple.EXCEPTION(e.getMessage(), true);
     }
   }
   
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java
index 2ccb147..7f3d635 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java
@@ -20,11 +20,9 @@ package org.apache.solr.client.solrj.io.stream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -308,11 +306,8 @@ public class Facet2DStream extends TupleStream implements Expressible {
     if (out.hasNext()) {
       return out.next();
     } else {
-      Map fields = new HashMap();
-      fields.put("rows", tuples.size());
-
-      fields.put("EOF", true);
-      Tuple tuple = new Tuple(fields);
+      Tuple tuple = Tuple.EOF();
+      tuple.put("rows", tuples.size());
       return tuple;
     }
 
@@ -395,7 +390,7 @@ public class Facet2DStream extends TupleStream implements Expressible {
   }
 
   private void getTuples(NamedList response, Bucket x, Bucket y, Metric metric) {
-    Tuple tuple = new Tuple(new HashMap());
+    Tuple tuple = new Tuple();
     NamedList facets = (NamedList) response.get("facets");
     fillTuples(0, tuples, tuple, facets, x, y, metric);
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index 6e96cfd..29c48ae 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -20,10 +20,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -626,15 +624,11 @@ public class FacetStream extends TupleStream implements Expressible  {
       ++index;
       return tuple;
     } else {
-      Map fields = new HashMap();
+      Tuple tuple = Tuple.EOF();
 
       if(bucketSizeLimit == Integer.MAX_VALUE) {
-        fields.put("totalRows", tuples.size());
+        tuple.put("totalRows", tuples.size());
       }
-
-      fields.put("EOF", true);
-
-      Tuple tuple = new Tuple(fields);
       return tuple;
     }
   }
@@ -771,7 +765,7 @@ public class FacetStream extends TupleStream implements Expressible  {
                                 Bucket[] buckets,
                                 Metric[] metrics) {
 
-    Tuple tuple = new Tuple(new HashMap());
+    Tuple tuple = new Tuple();
     NamedList facets = (NamedList)response.get("facets");
     fillTuples(0,
                tuples,
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
index a9963d0..041e7c3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
@@ -362,21 +362,19 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible{
         for (Map.Entry<String, Double> termScore : termScores.entrySet()) {
           if (tuples.size() == numTerms) break;
           index++;
-          Map map = new HashMap();
-          map.put(ID, featureSet + "_" + index);
-          map.put("index_i", index);
-          map.put("term_s", termScore.getKey());
-          map.put("score_f", termScore.getValue());
-          map.put("featureSet_s", featureSet);
+          Tuple tuple = new Tuple();
+          tuple.put(ID, featureSet + "_" + index);
+          tuple.put("index_i", index);
+          tuple.put("term_s", termScore.getKey());
+          tuple.put("score_f", termScore.getValue());
+          tuple.put("featureSet_s", featureSet);
           long docFreq = docFreqs.get(termScore.getKey());
           double d = Math.log(((double)numDocs / (double)(docFreq + 1)));
-          map.put("idf_d", d);
-          tuples.add(new Tuple(map));
+          tuple.put("idf_d", d);
+          tuples.add(tuple);
         }
 
-        Map map = new HashMap();
-        map.put("EOF", true);
-        tuples.add(new Tuple(map));
+        tuples.add(Tuple.EOF());
 
         tupleIterator = tuples.iterator();
       }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/GetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/GetStream.java
index 1655bfb..8b82761 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/GetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/GetStream.java
@@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -89,14 +88,11 @@ public class GetStream extends TupleStream implements Expressible {
   }
 
   public Tuple read() throws IOException {
-    Map map = new HashMap();
-    if(tupleIterator.hasNext()) {
+    if (tupleIterator.hasNext()) {
       Tuple t = tupleIterator.next();
-      map.putAll(t.fields);
-      return new Tuple(map);
+      return t.clone();
     } else {
-      map.put("EOF", true);
-      return new Tuple(map);
+      return Tuple.EOF();
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashRollupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashRollupStream.java
index 8bf82c6..0754692 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashRollupStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashRollupStream.java
@@ -199,18 +199,16 @@ public class HashRollupStream extends TupleStream implements Expressible {
         if (tuple.EOF) {
           List tuples = new ArrayList();
           for(Map.Entry<HashKey, Metric[]> entry : metricMap.entrySet()) {
-            Map<String, Object> map = new HashMap<String, Object>();
+            Tuple t = new Tuple();
             Metric[] finishedMetrics = entry.getValue();
             for (Metric metric : finishedMetrics) {
-              map.put(metric.getIdentifier(), metric.getValue());
+              t.put(metric.getIdentifier(), metric.getValue());
             }
 
             HashKey hashKey = entry.getKey();
             for (int i = 0; i < buckets.length; i++) {
-              map.put(buckets[i].toString(), hashKey.getParts()[i]);
+              t.put(buckets[i].toString(), hashKey.getParts()[i]);
             }
-
-            Tuple t = new Tuple(map);
             tuples.add(t);
           }
           tuples.add(tuple);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
index ea40715..778f60c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
@@ -31,11 +31,9 @@ import java.sql.Timestamp;
 import java.sql.Types;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Properties;
 
 import org.apache.solr.client.solrj.io.Tuple;
@@ -50,6 +48,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.StreamParams;
 
 import static org.apache.solr.common.params.CommonParams.SORT;
 
@@ -515,22 +514,20 @@ public class JDBCStream extends TupleStream implements Expressible {
   
   public Tuple read() throws IOException {
     
-    try{
-      Map<Object,Object> fields = new HashMap<>();
-      if(resultSet.next()){
+    try {
+      Tuple tuple = new Tuple();
+      if (resultSet.next()) {
         // we have a record
-        for(ResultSetValueSelector selector : valueSelectors){
-          fields.put(selector.getColumnName(), selector.selectValue(resultSet));
+        for (ResultSetValueSelector selector : valueSelectors) {
+          tuple.put(selector.getColumnName(), selector.selectValue(resultSet));
         }
-      }
-      else{
+      } else {
         // we do not have a record
-        fields.put("EOF", true);
+        tuple.put(StreamParams.EOF, true);
       }
       
-      return new Tuple(fields);
-    }
-    catch(SQLException e){
+      return tuple;
+    } catch (SQLException e) {
       throw new IOException(String.format(Locale.ROOT, "Failed to read next record with error '%s'", e.getMessage()), e);
     }
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
index c03db38..39c9cd6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
@@ -225,17 +225,14 @@ public class KnnStream extends TupleStream implements Expressible  {
 
   public Tuple read() throws IOException {
     if(documentIterator.hasNext()) {
-      Map map = new HashMap();
+      Tuple tuple = new Tuple();
       SolrDocument doc = documentIterator.next();
       for(Entry<String, Object> entry : doc.entrySet()) {
-        map.put(entry.getKey(), entry.getValue());
+        tuple.put(entry.getKey(), entry.getValue());
       }
-      return new Tuple(map);
-    } else {
-      Map fields = new HashMap();
-      fields.put("EOF", true);
-      Tuple tuple = new Tuple(fields);
       return tuple;
+    } else {
+      return Tuple.EOF();
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java
index 33f8fd5..3858df5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java
@@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 
 import org.apache.solr.client.solrj.io.Tuple;
@@ -114,9 +113,7 @@ public class ListStream extends TupleStream implements Expressible {
           streams[streamIndex] = null;
           currentStream.open();
         } else {
-          HashMap map = new HashMap();
-          map.put("EOF", true);
-          return new Tuple(map);
+          return Tuple.EOF();
         }
       }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java
index ffaf313..f13e736 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java
@@ -193,9 +193,7 @@ public class ModelStream extends TupleStream implements Expressible {
       tuple = model;
       model = null;
     } else {
-      Map map = new HashMap();
-      map.put("EOF", true);
-      tuple = new Tuple(map);
+      tuple = Tuple.EOF();
     }
 
     return tuple;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NoOpStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NoOpStream.java
index 8d55c31..85a0f55 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NoOpStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NoOpStream.java
@@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 
 import org.apache.solr.client.solrj.io.Tuple;
@@ -88,10 +87,7 @@ public class NoOpStream extends TupleStream implements Expressible {
   }
 
   public Tuple read() throws IOException {
-      HashMap m = new HashMap();
-      m.put("EOF", true);
-      Tuple tuple = new Tuple(m);
-      return tuple;
+    return Tuple.EOF();
   }
 
   /** Return the stream sort - ie, the order in which records are returned */
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NullStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NullStream.java
index 4f0181b..067acb5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NullStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NullStream.java
@@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Date;
 
@@ -133,7 +132,7 @@ public class NullStream extends TupleStream implements Expressible {
       if(tuple.EOF) {
         eof = tuple;
         long end = new Date().getTime();
-        Tuple t = new Tuple(new HashMap());
+        Tuple t = new Tuple();
         t.put("nullCount", count);
         t.put("timer", end-start);
         return t;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
index aeadd90..dbf2901 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
@@ -19,7 +19,6 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -107,14 +106,12 @@ public class ParallelListStream extends TupleStream implements Expressible {
   }
 
   public Tuple read() throws IOException {
-    while(true) {
+    while (true) {
       if (currentStream == null) {
         if (streamIndex < streams.length) {
           currentStream = streams[streamIndex];
         } else {
-          HashMap map = new HashMap();
-          map.put("EOF", true);
-          return new Tuple(map);
+          return Tuple.EOF();
         }
       }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index d81aa54..141411e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -18,10 +18,8 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -217,10 +215,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
     Tuple tuple = _read();
 
     if(tuple.EOF) {
-      Map m = new HashMap();
-      m.put("EOF", true);
-      Tuple t = new Tuple(m);
-
       /*
       Map<String, Map> metrics = new HashMap();
       Iterator<Entry<String,Tuple>> it = this.eofTuples.entrySet().iterator();
@@ -235,7 +229,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
         t.setMetrics(metrics);
       }
       */
-      return t;
+      return Tuple.EOF();
     }
 
     return tuple;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PlotStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PlotStream.java
index a83349a..142e3d0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PlotStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PlotStream.java
@@ -150,10 +150,8 @@ public class PlotStream extends TupleStream implements Expressible {
 
   public Tuple read() throws IOException {
 
-    if(finished) {
-      Map<String,Object> m = new HashMap<>();
-      m.put("EOF", true);
-      return new Tuple(m);
+    if (finished) {
+      return Tuple.EOF();
     } else {
       finished = true;
       Map<String, Object> values = new HashMap<>();
@@ -197,8 +195,8 @@ public class PlotStream extends TupleStream implements Expressible {
       values.put("data", xy);
 
       Tuple tup = new Tuple(values);
-      tup.fieldLabels = fieldLabels;
-      tup.fieldNames = fieldNames;
+      tup.setFieldLabels(fieldLabels);
+      tup.setFieldNames(fieldNames);
       return tup;
     }
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
index aca0e3d..20a055c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
@@ -234,25 +234,22 @@ public class RandomStream extends TupleStream implements Expressible  {
 
   public Tuple read() throws IOException {
     if(documentIterator.hasNext()) {
-      Map map = new HashMap();
+      Tuple tuple = new Tuple();
       SolrDocument doc = documentIterator.next();
 
       // Put the generated x-axis first. If there really is an x field it will overwrite it.
       if(outputX) {
-        map.put("x", x++);
+        tuple.put("x", x++);
       }
 
       for(Entry<String, Object> entry : doc.entrySet()) {
-        map.put(entry.getKey(), entry.getValue());
+        tuple.put(entry.getKey(), entry.getValue());
       }
 
 
-      return new Tuple(map);
-    } else {
-      Map fields = new HashMap();
-      fields.put("EOF", true);
-      Tuple tuple = new Tuple(fields);
       return tuple;
+    } else {
+      return Tuple.EOF();
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
index c1b6894..cdd8641 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java
@@ -18,10 +18,8 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.HashKey;
@@ -206,15 +204,14 @@ public class RollupStream extends TupleStream implements Expressible {
             return tuple;
           }
 
-          Map<String,Object> map = new HashMap<String,Object>();
+          Tuple t = new Tuple();
           for(Metric metric : currentMetrics) {
-            map.put(metric.getIdentifier(), metric.getValue());
+            t.put(metric.getIdentifier(), metric.getValue());
           }
 
           for(int i=0; i<buckets.length; i++) {
-            map.put(buckets[i].toString(), currentKey.getParts()[i]);
+            t.put(buckets[i].toString(), currentKey.getParts()[i]);
           }
-          Tuple t = new Tuple(map);
           tupleStream.pushBack(tuple);
           finished = true;
           return t;
@@ -237,15 +234,14 @@ public class RollupStream extends TupleStream implements Expressible {
       } else {
         Tuple t = null;
         if(currentMetrics != null) {
-          Map<String,Object> map = new HashMap<String,Object>();
+          t = new Tuple();
           for(Metric metric : currentMetrics) {
-            map.put(metric.getIdentifier(), metric.getValue());
+            t.put(metric.getIdentifier(), metric.getValue());
           }
 
           for(int i=0; i<buckets.length; i++) {
-            map.put(buckets[i].toString(), currentKey.getParts()[i]);
+            t.put(buckets[i].toString(), currentKey.getParts()[i]);
           }
-          t = new Tuple(map);
         }
 
         currentKey = hashKey;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ScoreNodesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ScoreNodesStream.java
index a7bb15d..95aa700 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ScoreNodesStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ScoreNodesStream.java
@@ -191,7 +191,7 @@ public class ScoreNodesStream extends TupleStream implements Expressible
         node.put("field", bucket);
       }
 
-      if(!node.fields.containsKey("node")) {
+      if(!node.getFields().containsKey("node")) {
         throw new IOException("node field not present in the Tuple");
       }
 
@@ -236,7 +236,7 @@ public class ScoreNodesStream extends TupleStream implements Expressible
           String term = terms.getName(t);
           Number docFreq = terms.get(term);
           Tuple tuple = nodes.get(term);
-          if(!tuple.fields.containsKey(termFreq)) {
+          if(!tuple.getFields().containsKey(termFreq)) {
             throw new Exception("termFreq field not present in the Tuple");
           }
           Number termFreqValue = (Number)tuple.get(termFreq);
@@ -265,9 +265,7 @@ public class ScoreNodesStream extends TupleStream implements Expressible
     if(tuples.hasNext()) {
       return tuples.next();
     } else {
-      Map map = new HashMap();
-      map.put("EOF", true);
-      return new Tuple(map);
+      return Tuple.EOF();
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
index 24368a0..2d96d60 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
@@ -19,12 +19,10 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 
@@ -208,17 +206,14 @@ public class SearchStream extends TupleStream implements Expressible  {
 
   public Tuple read() throws IOException {
     if(documentIterator.hasNext()) {
-      Map map = new HashMap();
+      Tuple tuple = new Tuple();
       SolrDocument doc = documentIterator.next();
       for(Entry<String, Object> entry : doc.entrySet()) {
-        map.put(entry.getKey(), entry.getValue());
+        tuple.put(entry.getKey(), entry.getValue());
       }
-      return new Tuple(map);
-    } else {
-      Map fields = new HashMap();
-      fields.put("EOF", true);
-      Tuple tuple = new Tuple(fields);
       return tuple;
+    } else {
+      return Tuple.EOF();
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java
index c538560..aee9c4c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java
@@ -257,8 +257,8 @@ public class SelectStream extends TupleStream implements Expressible {
     }
 
     // create a copy with the limited set of fields
-    Tuple workingToReturn = new Tuple(new HashMap<>());
-    Tuple workingForEvaluators = new Tuple(new HashMap<>());
+    Tuple workingToReturn = new Tuple();
+    Tuple workingForEvaluators = new Tuple();
 
     //Clear the TupleContext before running the evaluators.
     //The TupleContext allows evaluators to cache values within the scope of a single tuple.
@@ -267,7 +267,7 @@ public class SelectStream extends TupleStream implements Expressible {
 
     streamContext.getTupleContext().clear();
 
-    for(Object fieldName : original.fields.keySet()){
+    for(Object fieldName : original.getFields().keySet()){
       workingForEvaluators.put(fieldName, original.get(fieldName));
       if(selectedFields.containsKey(fieldName)){
         workingToReturn.put(selectedFields.get(fieldName), original.get(fieldName));
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
index c3a120f..ac100b3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
@@ -335,9 +335,7 @@ public class SignificantTermsStream extends TupleStream implements Expressible{
           tuples.add(new Tuple(map));
         }
 
-        Map map = new HashMap();
-        map.put("EOF", true);
-        tuples.add(new Tuple(map));
+        tuples.add(Tuple.EOF());
         tupleIterator = tuples.iterator();
       }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index fad08d2..f3542ea 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -21,7 +21,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -43,6 +42,7 @@ import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.StreamParams;
 import org.apache.solr.common.util.NamedList;
 
 /**
@@ -202,12 +202,10 @@ public class SolrStream extends TupleStream {
 
       if (fields == null) {
         //Return the EOF tuple.
-        Map m = new HashMap();
-        m.put("EOF", true);
-        return new Tuple(m);
+        return Tuple.EOF();
       } else {
 
-        String msg = (String) fields.get("EXCEPTION");
+        String msg = (String) fields.get(StreamParams.EXCEPTION);
         if (msg != null) {
           HandledException ioException = new HandledException(msg);
           throw ioException;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
index 0fd7246..c05fc3e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -270,10 +269,7 @@ public class StatsStream extends TupleStream implements Expressible  {
       ++index;
       return tuple;
     } else {
-      Map fields = new HashMap();
-      fields.put("EOF", true);
-      Tuple tuple = new Tuple(fields);
-      return tuple;
+      return Tuple.EOF();
     }
   }
 
@@ -308,7 +304,7 @@ public class StatsStream extends TupleStream implements Expressible  {
   private void getTuples(NamedList response,
                          Metric[] metrics) {
 
-    this.tuple = new Tuple(new HashMap());
+    this.tuple = new Tuple();
     NamedList facets = (NamedList)response.get("facets");
     fillTuple(tuple, facets, metrics);
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
index 819d3ae..3e6666d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
@@ -442,9 +442,7 @@ public class TextLogitStream extends TupleStream implements Expressible {
     try {
 
       if(++iteration > maxIterations) {
-        Map map = new HashMap();
-        map.put("EOF", true);
-        return new Tuple(map);
+        return Tuple.EOF();
       } else {
 
         if (this.idfs == null) {
@@ -560,14 +558,13 @@ public class TextLogitStream extends TupleStream implements Expressible {
 
     @Override
     public Tuple read() throws IOException {
-      HashMap map = new HashMap();
-      if(it.hasNext()) {
-        map.put("term_s",it.next());
-        map.put("score_f",1.0);
-        return new Tuple(map);
+      if (it.hasNext()) {
+        Tuple tuple = new Tuple();
+        tuple.put("term_s", it.next());
+        tuple.put("score_f", 1.0);
+        return tuple;
       } else {
-        map.put("EOF", true);
-        return new Tuple(map);
+        return Tuple.EOF();
       }
     }
 
@@ -650,13 +647,13 @@ public class TextLogitStream extends TupleStream implements Expressible {
       List<Double> shardWeights = (List<Double>)logit.get("weights");
       double shardError = (double)logit.get("error");
 
-      Map map = new HashMap();
+      Tuple tuple = new Tuple();
 
-      map.put("error", shardError);
-      map.put("weights", shardWeights);
-      map.put("evaluation", logit.get("evaluation"));
+      tuple.put("error", shardError);
+      tuple.put("weights", shardWeights);
+      tuple.put("evaluation", logit.get("evaluation"));
 
-      return new Tuple(map);
+      return tuple;
     }
   }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
index ee4570d..e9d7e7a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
@@ -22,10 +22,8 @@ import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
@@ -328,10 +326,7 @@ public class TimeSeriesStream extends TupleStream implements Expressible  {
       ++index;
       return tuple;
     } else {
-      Map fields = new HashMap();
-      fields.put("EOF", true);
-      Tuple tuple = new Tuple(fields);
-      return tuple;
+      return Tuple.EOF();
     }
   }
 
@@ -383,7 +378,7 @@ public class TimeSeriesStream extends TupleStream implements Expressible  {
                          String field,
                          Metric[] metrics) {
 
-    Tuple tuple = new Tuple(new HashMap());
+    Tuple tuple = new Tuple();
     NamedList facets = (NamedList)response.get("facets");
     fillTuples(tuples, tuple, facets, field, metrics);
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
index a7bca77..7dd6204 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java
@@ -152,9 +152,7 @@ public class TupStream extends TupleStream implements Expressible {
 
     if(unnestedTuples == null) {
       if (finished) {
-        Map<String, Object> m = new HashMap<>();
-        m.put("EOF", true);
-        return new Tuple(m);
+        return Tuple.EOF();
       } else {
         finished = true;
         if(unnestedTuple != null) {
@@ -167,9 +165,7 @@ public class TupStream extends TupleStream implements Expressible {
       if(unnestedTuples.hasNext()) {
         return unnestedTuples.next();
       } else {
-        Map<String, Object> m = new HashMap<>();
-        m.put("EOF", true);
-        return new Tuple(m);
+        return Tuple.EOF();
       }
     }
   }
@@ -234,8 +230,8 @@ public class TupStream extends TupleStream implements Expressible {
       }
     }
     this.tup = new Tuple(values);
-    tup.fieldNames = fieldNames;
-    tup.fieldLabels = fieldLabels;
+    tup.setFieldNames(fieldNames);
+    tup.setFieldLabels(fieldLabels);
     // nothing to do here
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
index 5313f14..453f842 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
@@ -19,10 +19,8 @@ package org.apache.solr.client.solrj.io.stream;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Optional;
 
 import org.apache.solr.client.solrj.SolrServerException;
@@ -298,7 +296,7 @@ public class UpdateStream extends TupleStream implements Expressible {
   
   private SolrInputDocument convertTupleToSolrDocument(Tuple tuple) {
     SolrInputDocument doc = new SolrInputDocument();
-    for (Object field : tuple.fields.keySet()) {
+    for (Object field : tuple.getFields().keySet()) {
 
       if (! (field.equals(CommonParams.VERSION_FIELD) && pruneVersionField)) {
         Object value = tuple.get(field);
@@ -347,16 +345,16 @@ public class UpdateStream extends TupleStream implements Expressible {
   
   private Tuple createBatchSummaryTuple(int batchSize) {
     assert batchSize > 0;
-    Map m = new HashMap();
+    Tuple tuple = new Tuple();
     this.totalDocsIndex += batchSize;
     ++batchNumber;
-    m.put(BATCH_INDEXED_FIELD_NAME, batchSize);
-    m.put("totalIndexed", this.totalDocsIndex);
-    m.put("batchNumber", batchNumber);
-    if(coreName != null) {
-      m.put("worker", coreName);
+    tuple.put(BATCH_INDEXED_FIELD_NAME, batchSize);
+    tuple.put("totalIndexed", this.totalDocsIndex);
+    tuple.put("batchNumber", batchNumber);
+    if (coreName != null) {
+      tuple.put("worker", coreName);
     }
-    return new Tuple(m);
+    return tuple;
   }
 
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java
index a85c33e..ea6606c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java
@@ -111,10 +111,7 @@ public class ZplotStream extends TupleStream implements Expressible {
     if(out.hasNext()) {
       return out.next();
     } else {
-      Map m = new HashMap();
-      m.put("EOF", true);
-      Tuple t = new Tuple(m);
-      return t;
+      return Tuple.EOF();
     }
   }
 
@@ -198,7 +195,7 @@ public class ZplotStream extends TupleStream implements Expressible {
     if(!table && !distribution && !clusters && !heat) {
       //Handle the vectors
       for (int i = 0; i < numTuples; i++) {
-        Tuple tuple = new Tuple(new HashMap());
+        Tuple tuple = new Tuple();
         for (Map.Entry<String, Object> entry : evaluated.entrySet()) {
           List l = (List) entry.getValue();
           tuple.put(entry.getKey(), l.get(i));
@@ -208,7 +205,7 @@ public class ZplotStream extends TupleStream implements Expressible {
       }
 
       //Generate the x axis if the tuples contain y and not x
-      if (outTuples.get(0).fields.containsKey("y") && !outTuples.get(0).fields.containsKey("x")) {
+      if (outTuples.get(0).getFields().containsKey("y") && !outTuples.get(0).getFields().containsKey("x")) {
         int x = 0;
         for (Tuple tuple : outTuples) {
           tuple.put("x", x++);
@@ -224,7 +221,7 @@ public class ZplotStream extends TupleStream implements Expressible {
           clusterNum++;
           List<KmeansEvaluator.ClusterPoint> points = c.getPoints();
           for (KmeansEvaluator.ClusterPoint p : points) {
-            Tuple tuple = new Tuple(new HashMap());
+            Tuple tuple = new Tuple();
             tuple.put("x", p.getPoint()[0]);
             tuple.put("y", p.getPoint()[1]);
             tuple.put("cluster", "cluster" + clusterNum);
@@ -239,7 +236,7 @@ public class ZplotStream extends TupleStream implements Expressible {
           clusterNum++;
           List<DbscanEvaluator.ClusterPoint> points = c.getPoints();
           for (DbscanEvaluator.ClusterPoint p : points) {
-            Tuple tuple = new Tuple(new HashMap());
+            Tuple tuple = new Tuple();
             tuple.put("x", p.getPoint()[0]);
             tuple.put("y", p.getPoint()[1]);
             tuple.put("cluster", "cluster" + clusterNum);
@@ -269,7 +266,7 @@ public class ZplotStream extends TupleStream implements Expressible {
         }
 
         for (int i = 0; i < x.length; i++) {
-          Tuple tuple = new Tuple(new HashMap());
+          Tuple tuple = new Tuple();
           if(!Double.isNaN(x[i])) {
             tuple.put("x", Precision.round(x[i], 2));
             if(y[i] == Double.NEGATIVE_INFINITY || y[i] == Double.POSITIVE_INFINITY) {
@@ -302,7 +299,7 @@ public class ZplotStream extends TupleStream implements Expressible {
         }
 
         for (int i = 0; i < x.length; i++) {
-          Tuple tuple = new Tuple(new HashMap());
+          Tuple tuple = new Tuple();
           tuple.put("x", x[i]);
           tuple.put("y", y[i]);
           outTuples.add(tuple);
@@ -312,16 +309,16 @@ public class ZplotStream extends TupleStream implements Expressible {
         if(list.get(0) instanceof Tuple) {
           List<Tuple> tlist = (List<Tuple>)o;
           Tuple tuple = tlist.get(0);
-          if(tuple.fields.containsKey("N")) {
+          if(tuple.getFields().containsKey("N")) {
             for(Tuple t : tlist) {
-              Tuple outtuple = new Tuple(new HashMap());
+              Tuple outtuple = new Tuple();
               outtuple.put("x", Precision.round(((double)t.get("mean")), 2));
               outtuple.put("y", t.get("prob"));
               outTuples.add(outtuple);
             }
-          } else if(tuple.fields.containsKey("count")) {
+          } else if(tuple.getFields().containsKey("count")) {
             for(Tuple t : tlist) {
-              Tuple outtuple = new Tuple(new HashMap());
+              Tuple outtuple = new Tuple();
               outtuple.put("x", t.get("value"));
               outtuple.put("y", t.get("pct"));
               outTuples.add(outtuple);
@@ -344,7 +341,7 @@ public class ZplotStream extends TupleStream implements Expressible {
           } else {
             rowLabel = Integer.toString(i);
           }
-          Tuple tuple = new Tuple(new HashMap());
+          Tuple tuple = new Tuple();
           tuple.put("rowLabel", rowLabel);
           double[] row = data[i];
           for (int j = 0; j < row.length; j++) {
@@ -378,7 +375,7 @@ public class ZplotStream extends TupleStream implements Expressible {
 
           double[] row = data[i];
           for (int j = 0; j < row.length; j++) {
-            Tuple tuple = new Tuple(new HashMap());
+            Tuple tuple = new Tuple();
             tuple.put("y", rowLabel);
             String colLabel = null;
             if (colLabels != null) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
index 4e176dd..451add8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java
@@ -45,17 +45,23 @@ import org.apache.solr.client.solrj.io.stream.metrics.Metric;
  */
 public class StreamFactory implements Serializable {
   
-  private transient HashMap<String,String> collectionZkHosts;
-  private transient HashMap<String,Supplier<Class<? extends Expressible>>> functionNames;
+  private transient HashMap<String, String> collectionZkHosts;
+  private transient HashMap<String, Supplier<Class<? extends Expressible>>> functionNames;
   private transient String defaultZkHost;
   private transient String defaultCollection;
+  private transient String defaultSort;
   
   public StreamFactory(){
     collectionZkHosts = new HashMap<>();
     functionNames = new HashMap<>();
   }
+
+  public StreamFactory(HashMap<String, Supplier<Class<? extends Expressible>>> functionNames) {
+    this.functionNames = functionNames;
+    collectionZkHosts = new HashMap<>();
+  }
   
-  public StreamFactory withCollectionZkHost(String collectionName, String zkHost){
+  public StreamFactory withCollectionZkHost(String collectionName, String zkHost) {
     this.collectionZkHosts.put(collectionName, zkHost);
     this.defaultCollection = collectionName;
     return this;
@@ -70,12 +76,27 @@ public class StreamFactory implements Serializable {
     return this;
   }
 
+  public Object clone() {
+    //Shallow copy
+    StreamFactory clone = new StreamFactory(functionNames);
+    return clone.withCollectionZkHost(defaultCollection, defaultZkHost).withDefaultSort(defaultSort);
+  }
+
+  public StreamFactory withDefaultSort(String sort) {
+    this.defaultSort = sort;
+    return this;
+  }
+
+  public String getDefaultSort() {
+    return this.defaultSort;
+  }
+
   public String getDefaultZkHost() {
     return this.defaultZkHost;
   }
 
-  public String getCollectionZkHost(String collectionName){
-    if(this.collectionZkHosts.containsKey(collectionName)){
+  public String getCollectionZkHost(String collectionName) {
+    if (this.collectionZkHosts.containsKey(collectionName)) {
       return this.collectionZkHosts.get(collectionName);
     }
     return null;
@@ -84,104 +105,104 @@ public class StreamFactory implements Serializable {
   public Map<String, Supplier<Class<? extends Expressible>>> getFunctionNames() {
     return Collections.unmodifiableMap(functionNames);
   }
-  public StreamFactory withFunctionName(String functionName, Class<? extends Expressible> clazz){
+
+  public StreamFactory withFunctionName(String functionName, Class<? extends Expressible> clazz) {
     this.functionNames.put(functionName, () -> clazz);
     return this;
   }
 
-   public StreamFactory withFunctionName(String functionName, Supplier< Class<? extends Expressible>> clazz){
+   public StreamFactory withFunctionName(String functionName, Supplier< Class<? extends Expressible>> clazz) {
     this.functionNames.put(functionName, clazz);
     return this;
   }
 
+  public StreamFactory withoutFunctionName(String functionName) {
+    this.functionNames.remove(functionName);
+    return this;
+  }
 
-  public StreamExpressionParameter getOperand(StreamExpression expression, int parameterIndex){
-    if(null == expression.getParameters() || parameterIndex >= expression.getParameters().size()){
+  public StreamExpressionParameter getOperand(StreamExpression expression, int parameterIndex) {
+    if (null == expression.getParameters() || parameterIndex >= expression.getParameters().size()) {
       return null;
     }
-    
     return expression.getParameters().get(parameterIndex);
   }
   
-  public List<String> getValueOperands(StreamExpression expression){
+  public List<String> getValueOperands(StreamExpression expression) {
     return getOperandsOfType(expression, StreamExpressionValue.class).stream().map(item -> ((StreamExpressionValue) item).getValue()).collect(Collectors.toList());
   }
   
   /** Given an expression, will return the value parameter at the given index, or null if doesn't exist */
-  public String getValueOperand(StreamExpression expression, int parameterIndex){
+  public String getValueOperand(StreamExpression expression, int parameterIndex) {
     StreamExpressionParameter parameter = getOperand(expression, parameterIndex);
-    if(null != parameter){ 
-      if(parameter instanceof StreamExpressionValue){
+    if (null != parameter) {
+      if (parameter instanceof StreamExpressionValue) {
         return ((StreamExpressionValue)parameter).getValue();
-      } else if(parameter instanceof StreamExpression) {
+      } else if (parameter instanceof StreamExpression) {
         return parameter.toString();
       }
     }
-    
     return null;
   }
   
-  public List<StreamExpressionNamedParameter> getNamedOperands(StreamExpression expression){
+  public List<StreamExpressionNamedParameter> getNamedOperands(StreamExpression expression) {
     List<StreamExpressionNamedParameter> namedParameters = new ArrayList<>();
-    for(StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpressionNamedParameter.class)){
-      namedParameters.add((StreamExpressionNamedParameter)parameter);
+    for (StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpressionNamedParameter.class)) {
+      namedParameters.add((StreamExpressionNamedParameter) parameter);
     }
-    
     return namedParameters;
   }
-  public StreamExpressionNamedParameter getNamedOperand(StreamExpression expression, String name){
+
+  public StreamExpressionNamedParameter getNamedOperand(StreamExpression expression, String name) {
     List<StreamExpressionNamedParameter> namedParameters = getNamedOperands(expression);
-    for(StreamExpressionNamedParameter param : namedParameters){
-      if(param.getName().equals(name)){
+    for (StreamExpressionNamedParameter param : namedParameters) {
+      if (param.getName().equals(name)) {
         return param;
       }
     }
-    
     return null;
   }
   
-  public List<StreamExpression> getExpressionOperands(StreamExpression expression){
+  public List<StreamExpression> getExpressionOperands(StreamExpression expression) {
     List<StreamExpression> namedParameters = new ArrayList<>();
-    for(StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpression.class)){
-      namedParameters.add((StreamExpression)parameter);
+    for (StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpression.class)) {
+      namedParameters.add((StreamExpression) parameter);
     }
-    
     return namedParameters;
   }
-  public List<StreamExpression> getExpressionOperands(StreamExpression expression, String functionName){
+
+  public List<StreamExpression> getExpressionOperands(StreamExpression expression, String functionName) {
     List<StreamExpression> namedParameters = new ArrayList<>();
-    for(StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpression.class)){
-      StreamExpression expressionOperand = (StreamExpression)parameter;
-      if(expressionOperand.getFunctionName().equals(functionName)){
+    for (StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpression.class)) {
+      StreamExpression expressionOperand = (StreamExpression) parameter;
+      if (expressionOperand.getFunctionName().equals(functionName)) {
         namedParameters.add(expressionOperand);
       }
     }
-    
     return namedParameters;
   }
-  public List<StreamExpressionParameter> getOperandsOfType(StreamExpression expression, Class ... clazzes){
+
+  public List<StreamExpressionParameter> getOperandsOfType(StreamExpression expression, Class ... clazzes) {
     List<StreamExpressionParameter> parameters = new ArrayList<>();
     
     parameterLoop:
-     for(StreamExpressionParameter parameter : expression.getParameters()){
-      for(Class clazz : clazzes){
-        if(!clazz.isAssignableFrom(parameter.getClass())){
+     for (StreamExpressionParameter parameter : expression.getParameters()) {
+      for (Class clazz : clazzes) {
+        if (!clazz.isAssignableFrom(parameter.getClass())) {
           continue parameterLoop; // go to the next parameter since this parameter cannot be assigned to at least one of the classes
         }
       }
-      
       parameters.add(parameter);
     }
-    
     return parameters;
   }
   
-  public List<StreamExpression> getExpressionOperandsRepresentingTypes(StreamExpression expression, Class ... clazzes){
+  public List<StreamExpression> getExpressionOperandsRepresentingTypes(StreamExpression expression, Class ... clazzes) {
     List<StreamExpression> matchingStreamExpressions = new ArrayList<>();
     List<StreamExpression> allStreamExpressions = getExpressionOperands(expression);
     
     parameterLoop:
-    for(StreamExpression streamExpression : allStreamExpressions) {
+    for (StreamExpression streamExpression : allStreamExpressions) {
       Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(streamExpression.getFunctionName());
       if (classSupplier != null) {
         for (Class clazz : clazzes) {
@@ -189,180 +210,170 @@ public class StreamFactory implements Serializable {
             continue parameterLoop;
           }
         }
-
         matchingStreamExpressions.add(streamExpression);
       }
     }
-    
     return matchingStreamExpressions;   
   }
   
-  public boolean doesRepresentTypes(StreamExpression expression, Class ... clazzes){
+  public boolean doesRepresentTypes(StreamExpression expression, Class ... clazzes) {
     Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(expression.getFunctionName());
-    if(classSupplier != null){
-      for(Class clazz : clazzes){
-        if(!clazz.isAssignableFrom(classSupplier.get())){
+    if (classSupplier != null) {
+      for (Class clazz : clazzes) {
+        if (!clazz.isAssignableFrom(classSupplier.get())) {
           return false;
         }
       }
       return true;
     }
-    
     return false;    
   }
   
-  public int getIntOperand(StreamExpression expression, String paramName, Integer defaultValue) throws IOException{
+  public int getIntOperand(StreamExpression expression, String paramName, Integer defaultValue) throws IOException {
     StreamExpressionNamedParameter param = getNamedOperand(expression, paramName);
 
-    if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){
-      if(null != defaultValue){
+    if (null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)) {
+      if (null != defaultValue) {
         return defaultValue;
       }
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type integer but didn't find one",expression, paramName));
     }
-    String nStr = ((StreamExpressionValue)param.getParameter()).getValue();
-    try{
+    String nStr = ((StreamExpressionValue) param.getParameter()).getValue();
+    try {
       return Integer.parseInt(nStr);
-    }
-    catch(NumberFormatException e){
-      if(null != defaultValue){
+    } catch (NumberFormatException e) {
+      if (null != defaultValue) {
         return defaultValue;
       }
-      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - %s '%s' is not a valid integer.",expression, paramName, nStr));
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - %s '%s' is not a valid integer.", expression, paramName, nStr));
     }
   }
 
-  public boolean getBooleanOperand(StreamExpression expression, String paramName, Boolean defaultValue) throws IOException{
+  public boolean getBooleanOperand(StreamExpression expression, String paramName, Boolean defaultValue) throws IOException {
     StreamExpressionNamedParameter param = getNamedOperand(expression, paramName);
     
-    if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){
-      if(null != defaultValue){
+    if (null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)) {
+      if (null != defaultValue) {
         return defaultValue;
       }
-      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type boolean but didn't find one",expression, paramName));
+      throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type boolean but didn't find one", expression, paramName));
     }
-    String nStr = ((StreamExpressionValue)param.getParameter()).getValue();
+    String nStr = ((StreamExpressionValue) param.getParameter()).getValue();
     return Boolean.parseBoolean(nStr);
   }
 
-  
   public TupleStream constructStream(String expressionClause) throws IOException {
     return constructStream(StreamExpressionParser.parse(expressionClause));
   }
-  public TupleStream constructStream(StreamExpression expression) throws IOException{
+  public TupleStream constructStream(StreamExpression expression) throws IOException {
     String function = expression.getFunctionName();
     Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
 
-    if(classSupplier != null){
+    if (classSupplier != null) {
       Class<? extends Expressible> clazz =  classSupplier.get();
-      if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){
+      if (Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)) {
         return (TupleStream)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
       }
     }
     
-    throw new IOException(String.format(Locale.ROOT,"Invalid stream expression %s - function '%s' is unknown (not mapped to a valid TupleStream)", expression, expression.getFunctionName()));
+    throw new IOException(String.format(Locale.ROOT, "Invalid stream expression %s - function '%s' is unknown (not mapped to a valid TupleStream)", expression, expression.getFunctionName()));
   }
   
   public Metric constructMetric(String expressionClause) throws IOException {
     return constructMetric(StreamExpressionParser.parse(expressionClause));
   }
-  public Metric constructMetric(StreamExpression expression) throws IOException{
+
+  public Metric constructMetric(StreamExpression expression) throws IOException {
     String function = expression.getFunctionName();
     Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
-    if(classSupplier != null){
+    if (classSupplier != null) {
       Class<? extends Expressible> clazz = classSupplier.get();
-      if(Expressible.class.isAssignableFrom(clazz) && Metric.class.isAssignableFrom(clazz)){
+      if (Expressible.class.isAssignableFrom(clazz) && Metric.class.isAssignableFrom(clazz)) {
         return (Metric)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
       }
     }
     
-    throw new IOException(String.format(Locale.ROOT,"Invalid metric expression %s - function '%s' is unknown (not mapped to a valid Metric)", expression, expression.getFunctionName()));
+    throw new IOException(String.format(Locale.ROOT, "Invalid metric expression %s - function '%s' is unknown (not mapped to a valid Metric)", expression, expression.getFunctionName()));
   }
 
   public StreamComparator constructComparator(String comparatorString, Class comparatorType) throws IOException {
-    if(comparatorString.contains(",")){
+    if (comparatorString.contains(",")) {
       String[] parts = comparatorString.split(",");
       StreamComparator[] comps = new StreamComparator[parts.length];
-      for(int idx = 0; idx < parts.length; ++idx){
+      for (int idx = 0; idx < parts.length; ++idx) {
         comps[idx] = constructComparator(parts[idx].trim(), comparatorType);
       }
       return new MultipleFieldComparator(comps);
-    }
-    else if(comparatorString.contains("=")){
+    } else if (comparatorString.contains("=")) {
       // expected format is "left=right order"
       String[] parts = comparatorString.split("[ =]");
       
-      if(parts.length < 3){
-        throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting 'left=right order'",comparatorString));
+      if (parts.length < 3) {
+        throw new IOException(String.format(Locale.ROOT, "Invalid comparator expression %s - expecting 'left=right order'",comparatorString));
       }
       
       String leftFieldName = null;
       String rightFieldName = null;
       String order = null;
-      for(String part : parts){
+      for (String part : parts) {
         // skip empty
-        if(null == part || 0 == part.trim().length()){ continue; }
+        if (null == part || 0 == part.trim().length()) { continue; }
         
         // assign each in order
-        if(null == leftFieldName){ 
+        if (null == leftFieldName) {
           leftFieldName = part.trim(); 
-        }
-        else if(null == rightFieldName){ 
+        } else if (null == rightFieldName) {
           rightFieldName = part.trim(); 
-        }
-        else {
+        } else {
           order = part.trim();
           break; // we're done, stop looping
         }
       }
       
-      if(null == leftFieldName || null == rightFieldName || null == order){
-        throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting 'left=right order'",comparatorString));
+      if (null == leftFieldName || null == rightFieldName || null == order) {
+        throw new IOException(String.format(Locale.ROOT, "Invalid comparator expression %s - expecting 'left=right order'",comparatorString));
       }
       
-      return (StreamComparator)createInstance(comparatorType, new Class[]{ String.class, String.class, ComparatorOrder.class }, new Object[]{ leftFieldName, rightFieldName, ComparatorOrder.fromString(order) });
-    }
-    else{
+      return (StreamComparator) createInstance(comparatorType, new Class[]{ String.class, String.class, ComparatorOrder.class }, new Object[]{ leftFieldName, rightFieldName, ComparatorOrder.fromString(order) });
+    } else {
       // expected format is "field order"
       String[] parts = comparatorString.split(" ");
-      if(2 != parts.length){
-        throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting 'field order'",comparatorString));
+      if (2 != parts.length) {
+        throw new IOException(String.format(Locale.ROOT, "Invalid comparator expression %s - expecting 'field order'",comparatorString));
       }
       
       String fieldName = parts[0].trim();
       String order = parts[1].trim();
       
-      return (StreamComparator)createInstance(comparatorType, new Class[]{ String.class, ComparatorOrder.class }, new Object[]{ fieldName, ComparatorOrder.fromString(order) });
+      return (StreamComparator) createInstance(comparatorType, new Class[]{ String.class, ComparatorOrder.class }, new Object[]{ fieldName, ComparatorOrder.fromString(order) });
     }
   }
     
   public StreamEqualitor constructEqualitor(String equalitorString, Class equalitorType) throws IOException {
-    if(equalitorString.contains(",")){
+    if (equalitorString.contains(",")) {
       String[] parts = equalitorString.split(",");
       StreamEqualitor[] eqs = new StreamEqualitor[parts.length];
-      for(int idx = 0; idx < parts.length; ++idx){
+      for (int idx = 0; idx < parts.length; ++idx) {
         eqs[idx] = constructEqualitor(parts[idx].trim(), equalitorType);
       }
       return new MultipleFieldEqualitor(eqs);
-    }
-    else{
+    } else {
       String leftFieldName;
       String rightFieldName;
       
-      if(equalitorString.contains("=")){
+      if (equalitorString.contains("=")) {
         String[] parts = equalitorString.split("=");
-        if(2 != parts.length){
-          throw new IOException(String.format(Locale.ROOT,"Invalid equalitor expression %s - expecting fieldName=fieldName",equalitorString));
+        if (2 != parts.length) {
+          throw new IOException(String.format(Locale.ROOT, "Invalid equalitor expression %s - expecting fieldName=fieldName",equalitorString));
         }
         
         leftFieldName = parts[0].trim();
         rightFieldName = parts[1].trim();
-      }
-      else{
+      } else {
         leftFieldName = rightFieldName = equalitorString.trim();
       }
       
-      return (StreamEqualitor)createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName });
+      return (StreamEqualitor) createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName });
     }
   }
   
@@ -386,18 +397,19 @@ public class StreamFactory implements Serializable {
   public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(String expressionClause) throws IOException {
     return constructEvaluator(StreamExpressionParser.parse(expressionClause));
   }
-  public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(StreamExpression expression) throws IOException{
+
+  public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(StreamExpression expression) throws IOException {
     String function = expression.getFunctionName();
     Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
 
-    if(classSupplier != null){
+    if (classSupplier != null) {
       Class<? extends Expressible> clazz = classSupplier.get();
-      if(Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)){
+      if (Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)) {
         return (org.apache.solr.client.solrj.io.eval.StreamEvaluator)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
       }
     }
     
-    throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName()));
+    throw new IOException(String.format(Locale.ROOT, "Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName()));
   }
 
   public boolean isStream(StreamExpression expression) throws IOException {
@@ -452,13 +464,13 @@ public class StreamFactory implements Serializable {
     throw new IOException(String.format(Locale.ROOT, "Unable to find function name for class '%s'", clazz.getName()));
   }
 
-  public Object constructPrimitiveObject(String original){
+  public Object constructPrimitiveObject(String original) {
     String lower = original.trim().toLowerCase(Locale.ROOT);
 
-    if("null".equals(lower)){ return null; }
-    if("true".equals(lower) || "false".equals(lower)){ return Boolean.parseBoolean(lower); }
-    try{ return Long.valueOf(original); } catch(Exception ignored){};
-    try{ return Double.valueOf(original); } catch(Exception ignored){};
+    if ("null".equals(lower)) { return null; }
+    if ("true".equals(lower) || "false".equals(lower)){ return Boolean.parseBoolean(lower); }
+    try { return Long.valueOf(original); } catch(Exception ignored) { };
+    try { return Double.valueOf(original); } catch(Exception ignored) { };
 
     // is a string
     return original;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java
index 61b8339..093b95e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java
@@ -26,6 +26,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 public class CountMetric extends Metric {
   private String columnName;
   private long count;
+  private boolean isAllColumns;
 
   public CountMetric() {
     this("*");
@@ -56,12 +57,13 @@ public class CountMetric extends Metric {
 
   private void init(String functionName, String columnName){
     this.columnName = columnName;
+    this.isAllColumns = "*".equals(this.columnName);
     setFunctionName(functionName);
     setIdentifier(functionName, "(", columnName, ")");
   }
 
   private boolean isAllColumns() {
-    return "*".equals(this.columnName);
+    return isAllColumns;
   }
 
   public void update(Tuple tuple) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/StreamParams.java b/solr/solrj/src/java/org/apache/solr/common/params/StreamParams.java
new file mode 100644
index 0000000..417b849
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/params/StreamParams.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common.params;
+
+/**
+ * Streaming Expressions Parameters and Properties.
+ */
+public interface StreamParams {
+
+  // parameters
+  String EXPR = "expr";
+
+  // stream properties
+  String TUPLE = "tuple";
+  String DOCS = "docs";
+  String RETURN_VALUE = "return-value";
+  String RESULT_SET = "result-set";
+
+  // tuple properties
+  String RESPONSE_TIME = "RESPONSE_TIME";
+  String EOF = "EOF";
+  String EXCEPTION = "EXCEPTION";
+  String METRICS = "_METRICS_";
+
+  // other common tuple properties
+  String P_VALUE = "p-value";
+}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
index a452465..e5426fd 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
@@ -522,7 +522,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
           final List<Tuple> tuples = getTuples(daemonCheck);
           assertEquals(1, tuples.size()); // our daemon;
           if (log.isInfoEnabled()) {
-            log.info("Current daemon status: {}", tuples.get(0).fields);
+            log.info("Current daemon status: {}", tuples.get(0).getFields());
           }
           assertEquals(daemonId + " should have never had a successful iteration",
                        Long.valueOf(0L), tuples.get(0).getLong("iterations"));
@@ -808,7 +808,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
       log.trace("TupleStream: {}", tupleStream);
       tupleStream.open();
       for (Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) {
-        log.trace("Tuple: {}", t.fields);
+        log.trace("Tuple: {}", t.getFields());
         tuples.add(t);
       }
     } finally {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
index 2fbe101..8b74a66 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
@@ -642,7 +642,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
   protected boolean assertFields(List<Tuple> tuples, String ... fields) throws Exception{
     for(Tuple tuple : tuples){
       for(String field : fields){
-        if(!tuple.fields.containsKey(field)){
+        if(!tuple.getFields().containsKey(field)){
           throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field));
         }
       }
@@ -653,7 +653,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
   protected boolean assertNotFields(List<Tuple> tuples, String ... fields) throws Exception{
     for(Tuple tuple : tuples){
       for(String field : fields){
-        if(tuple.fields.containsKey(field)){
+        if(tuple.getFields().containsKey(field)){
           throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field));
         }
       }
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
index cf86691..744632b 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
@@ -167,7 +167,7 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
   protected boolean assertFields(List<Tuple> tuples, String ... fields) throws Exception{
     for(Tuple tuple : tuples){
       for(String field : fields){
-        if(!tuple.fields.containsKey(field)){
+        if(!tuple.getFields().containsKey(field)){
           throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field));
         }
       }
@@ -177,7 +177,7 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
   protected boolean assertNotFields(List<Tuple> tuples, String ... fields) throws Exception{
     for(Tuple tuple : tuples){
       for(String field : fields){
-        if(tuple.fields.containsKey(field)){
+        if(tuple.getFields().containsKey(field)){
           throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field));
         }
       }
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
index 73f6f9d..f57f655 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
@@ -2296,7 +2296,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
       tuples = getTuples(stream);
       
       assertEquals(1, tuples.size());
-      assertFalse(tuples.get(0).fields.containsKey("extra_s"));
+      assertFalse(tuples.get(0).getFields().containsKey("extra_s"));
     
     } finally {
       solrClientCache.close();
@@ -4467,7 +4467,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   protected boolean assertFields(List<Tuple> tuples, String ... fields) throws Exception{
     for(Tuple tuple : tuples){
       for(String field : fields){
-        if(!tuple.fields.containsKey(field)){
+        if(!tuple.getFields().containsKey(field)){
           throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field));
         }
       }
@@ -4477,7 +4477,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   protected boolean assertNotFields(List<Tuple> tuples, String ... fields) throws Exception{
     for(Tuple tuple : tuples){
       for(String field : fields){
-        if(tuple.fields.containsKey(field)){
+        if(tuple.getFields().containsKey(field)){
           throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field));
         }
       }
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 7e5da9d..7288e7a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -674,7 +674,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
       solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams);
       tuples4 = getTuples(solrStream);
       assert(tuples4.size() == 500);
-      Map fields = tuples4.get(0).fields;
+      Map fields = tuples4.get(0).getFields();
       assert(fields.containsKey("id"));
       assert(fields.containsKey("a_f"));
       assert(fields.containsKey("a_i"));
@@ -3028,7 +3028,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
       tuples = getTuples(stream);
       assertEquals(100, tuples.size());
       Tuple lastModel = tuples.get(0);
-      ClassificationEvaluation evaluation = ClassificationEvaluation.create(lastModel.fields);
+      ClassificationEvaluation evaluation = ClassificationEvaluation.create(lastModel.getFields());
       assertTrue(evaluation.getF1() >= 1.0);
       assertEquals(Math.log(5000.0 / (2500 + 1)), lastModel.getDoubles("idfs_ds").get(0), 0.0001);
       // make sure the tuples is retrieved in correct order