You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dp...@apache.org on 2017/06/28 18:17:32 UTC

[09/20] lucene-solr:master: SOLR-10123: Upgraded the Analytics Component to version 2.0

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/statistics/StatsCollectorSupplierFactory.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/statistics/StatsCollectorSupplierFactory.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/statistics/StatsCollectorSupplierFactory.java
deleted file mode 100644
index e22362d..0000000
--- a/solr/contrib/analytics/src/java/org/apache/solr/analytics/statistics/StatsCollectorSupplierFactory.java
+++ /dev/null
@@ -1,646 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.analytics.statistics;
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.function.Supplier;
-
-import org.apache.lucene.queries.function.ValueSource;
-import org.apache.lucene.queries.function.valuesource.BytesRefFieldSource;
-import org.apache.lucene.queries.function.valuesource.DoubleFieldSource;
-import org.apache.lucene.queries.function.valuesource.FloatFieldSource;
-import org.apache.lucene.queries.function.valuesource.IntFieldSource;
-import org.apache.lucene.queries.function.valuesource.LongFieldSource;
-import org.apache.solr.analytics.expression.ExpressionFactory;
-import org.apache.solr.analytics.request.ExpressionRequest;
-import org.apache.solr.analytics.util.AnalyticsParams;
-import org.apache.solr.analytics.util.valuesource.AbsoluteValueDoubleFunction;
-import org.apache.solr.analytics.util.valuesource.AddDoubleFunction;
-import org.apache.solr.analytics.util.valuesource.ConstDateSource;
-import org.apache.solr.analytics.util.valuesource.ConstDoubleSource;
-import org.apache.solr.analytics.util.valuesource.ConstStringSource;
-import org.apache.solr.analytics.util.valuesource.DateFieldSource;
-import org.apache.solr.analytics.util.valuesource.DateMathFunction;
-import org.apache.solr.analytics.util.valuesource.DivDoubleFunction;
-import org.apache.solr.analytics.util.valuesource.DualDoubleFunction;
-import org.apache.solr.analytics.util.valuesource.FilterFieldSource;
-import org.apache.solr.analytics.util.valuesource.LogDoubleFunction;
-import org.apache.solr.analytics.util.valuesource.MultiDateFunction;
-import org.apache.solr.analytics.util.valuesource.MultiDoubleFunction;
-import org.apache.solr.analytics.util.valuesource.MultiplyDoubleFunction;
-import org.apache.solr.analytics.util.valuesource.NegateDoubleFunction;
-import org.apache.solr.analytics.util.valuesource.PowDoubleFunction;
-import org.apache.solr.analytics.util.valuesource.ReverseStringFunction;
-import org.apache.solr.analytics.util.valuesource.SingleDoubleFunction;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.schema.FieldType;
-import org.apache.solr.schema.IndexSchema;
-import org.apache.solr.schema.SchemaField;
-import org.apache.solr.schema.StrField;
-import org.apache.solr.schema.TrieDateField;
-import org.apache.solr.schema.TrieDoubleField;
-import org.apache.solr.schema.TrieFloatField;
-import org.apache.solr.schema.TrieIntField;
-import org.apache.solr.schema.TrieLongField;
-import org.apache.solr.search.function.ConcatStringFunction;
-import org.apache.solr.util.DateMathParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StatsCollectorSupplierFactory {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  
-  // FunctionTypes
-  final static int NUMBER_TYPE = 0;
-  final static int DATE_TYPE = 1;
-  final static int STRING_TYPE = 2;
-  final static int FIELD_TYPE = 3;
-  final static int FILTER_TYPE = 4;
-  
-  /**
-   * Builds a Supplier that will generate identical arrays of new StatsCollectors.
-   * 
-   * @param schema The Schema being used.
-   * @param exRequests The expression requests to generate a StatsCollector[] from.
-   * @return A Supplier that will return an array of new StatsCollector.
-   */
-  @SuppressWarnings("unchecked")
-  public static Supplier<StatsCollector[]> create(IndexSchema schema, List<ExpressionRequest> exRequests ) {
-    final Map<String, Set<String>> collectorStats =  new TreeMap<>();
-    final Map<String, Set<Integer>> collectorPercs =  new TreeMap<>();
-    final Map<String, ValueSource> collectorSources =  new TreeMap<>();
-    
-    // Iterate through all expression request to make a list of ValueSource strings
-    // and statistics that need to be calculated on those ValueSources.
-    for (ExpressionRequest expRequest : exRequests) {
-      String statExpression = expRequest.getExpressionString();
-      Set<String> statistics = getStatistics(statExpression);
-      if (statistics == null) {
-        continue;
-      }
-      for (String statExp : statistics) {
-        String stat;
-        String operands;
-        try {
-          stat = statExp.substring(0, statExp.indexOf('(')).trim();
-          operands = statExp.substring(statExp.indexOf('(')+1, statExp.lastIndexOf(')')).trim();
-        } catch (Exception e) {
-          throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to parse statistic: ["+statExpression+"]",e);
-        }
-        String[] arguments = ExpressionFactory.getArguments(operands);
-        String source = arguments[0];
-        if (stat.equals(AnalyticsParams.STAT_PERCENTILE)) {
-          // The statistic is a percentile, extra parsing is required
-          if (arguments.length<2) {
-            throw new SolrException(ErrorCode.BAD_REQUEST,"Too few arguments given for "+stat+"() in ["+statExp+"].");
-          } else if (arguments.length>2) {
-            throw new SolrException(ErrorCode.BAD_REQUEST,"Too many arguments given for "+stat+"() in ["+statExp+"].");
-          }
-          source = arguments[1];
-          Set<Integer> percs = collectorPercs.get(source);
-          if (percs == null) {
-            percs = new HashSet<>();
-            collectorPercs.put(source, percs);
-          }
-          try {
-            int perc = Integer.parseInt(arguments[0]);
-            if (perc>0 && perc<100) {
-              percs.add(perc);
-            } else {
-              throw new SolrException(ErrorCode.BAD_REQUEST,"The percentile in ["+statExp+"] is not between 0 and 100, exculsive.");
-            }
-          } catch (NumberFormatException e) {
-            throw new SolrException(ErrorCode.BAD_REQUEST,"\""+arguments[0]+"\" cannot be converted into a percentile.",e);
-          }
-        } else if (arguments.length>1) {
-          throw new SolrException(ErrorCode.BAD_REQUEST,"Too many arguments given for "+stat+"() in ["+statExp+"].");
-        } else if (arguments.length==0) {
-          throw new SolrException(ErrorCode.BAD_REQUEST,"No arguments given for "+stat+"() in ["+statExp+"].");
-        } 
-        // Only unique ValueSources will be made; therefore statistics must be accumulated for
-        // each ValueSource, even across different expression requests
-        Set<String> stats = collectorStats.get(source);
-        if (stats == null) {
-          stats = new HashSet<>();
-          collectorStats.put(source, stats);
-        }
-        if(AnalyticsParams.STAT_PERCENTILE.equals(stat)) {
-          stats.add(stat + "_"+ arguments[0]);
-        } else {
-          stats.add(stat);
-        }
-      }
-    }
-    String[] keys = collectorStats.keySet().toArray(new String[0]);
-    for (String sourceStr : keys) {
-      // Build one ValueSource for each unique value source string
-      ValueSource source = buildSourceTree(schema, sourceStr);
-      if (source == null) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The statistic ["+sourceStr+"] could not be parsed.");
-      }
-      String builtString = source.toString();
-      collectorSources.put(builtString,source);
-      // Replace the user given string with the correctly built string
-      if (!builtString.equals(sourceStr)) {
-        Set<String> stats = collectorStats.remove(sourceStr);
-        if (stats!=null) {
-          collectorStats.put(builtString, stats);
-        }
-        Set<Integer> percs = collectorPercs.remove(sourceStr);
-        if (percs!=null) {
-          collectorPercs.put(builtString, percs);
-        }
-        for (ExpressionRequest er : exRequests) {
-          er.setExpressionString(er.getExpressionString().replace(sourceStr, builtString));
-        }
-      }
-    }
-    if (collectorSources.size()==0) {
-      return new Supplier<StatsCollector[]>() {
-        @Override
-        public StatsCollector[] get() {
-          return new StatsCollector[0];
-        }
-      };
-    }
-    
-    log.info("Stats objects: "+collectorStats.size()+" sr="+collectorSources.size()+" pr="+collectorPercs.size() );
-    
-    // All information is stored in final arrays so that nothing 
-    // has to be computed when the Supplier's get() method is called.
-    final Set<String>[] statsArr = collectorStats.values().toArray(new Set[0]);
-    final ValueSource[] sourceArr = collectorSources.values().toArray(new ValueSource[0]);
-    final boolean[] uniqueBools = new boolean[statsArr.length];
-    final boolean[] medianBools = new boolean[statsArr.length];
-    final boolean[] numericBools = new boolean[statsArr.length];
-    final boolean[] dateBools = new boolean[statsArr.length];
-    final double[][] percsArr = new double[statsArr.length][];
-    final String[][] percsNames = new String[statsArr.length][];
-    for (int count = 0; count < sourceArr.length; count++) {
-      uniqueBools[count] = statsArr[count].contains(AnalyticsParams.STAT_UNIQUE);
-      medianBools[count] = statsArr[count].contains(AnalyticsParams.STAT_MEDIAN);
-      numericBools[count] = statsArr[count].contains(AnalyticsParams.STAT_SUM)||statsArr[count].contains(AnalyticsParams.STAT_SUM_OF_SQUARES)||statsArr[count].contains(AnalyticsParams.STAT_MEAN)||statsArr[count].contains(AnalyticsParams.STAT_STANDARD_DEVIATION);
-      dateBools[count] = (sourceArr[count] instanceof DateFieldSource) | (sourceArr[count] instanceof MultiDateFunction) | (sourceArr[count] instanceof ConstDateSource);
-      Set<Integer> ps = collectorPercs.get(sourceArr[count].toString());
-      if (ps!=null) {
-        percsArr[count] = new double[ps.size()];
-        percsNames[count] = new String[ps.size()];
-        int percCount = 0;
-        for (int p : ps) {
-          percsArr[count][percCount] = p/100.0;
-          percsNames[count][percCount++] = AnalyticsParams.STAT_PERCENTILE+"_"+p;
-        }
-      }
-    }
-    // Making the Supplier
-    return new Supplier<StatsCollector[]>() {
-      public StatsCollector[] get() {
-        StatsCollector[] collectors = new StatsCollector[statsArr.length];
-        for (int count = 0; count < statsArr.length; count++) {
-          if(numericBools[count]){
-            StatsCollector sc = new NumericStatsCollector(sourceArr[count], statsArr[count]);
-            if(uniqueBools[count]) sc = new UniqueStatsCollector(sc);
-            if(medianBools[count]) sc = new MedianStatsCollector(sc);
-            if(percsArr[count]!=null) sc = new PercentileStatsCollector(sc,percsArr[count],percsNames[count]);
-            collectors[count]=sc;
-          } else if (dateBools[count]) {
-            StatsCollector sc = new MinMaxStatsCollector(sourceArr[count], statsArr[count]);
-            if(uniqueBools[count]) sc = new UniqueStatsCollector(sc);
-            if(medianBools[count]) sc = new DateMedianStatsCollector(sc);
-            if(percsArr[count]!=null) sc = new PercentileStatsCollector(sc,percsArr[count],percsNames[count]);
-           collectors[count]=sc;
-          } else {
-            StatsCollector sc = new MinMaxStatsCollector(sourceArr[count], statsArr[count]);
-            if(uniqueBools[count]) sc = new UniqueStatsCollector(sc);
-            if(medianBools[count]) sc = new MedianStatsCollector(sc);
-            if(percsArr[count]!=null) sc = new PercentileStatsCollector(sc,percsArr[count],percsNames[count]);
-            collectors[count]=sc;
-          }
-        }
-        return collectors;
-      }
-    };
-  }
-  
-  /**
-   * Finds the set of statistics that must be computed for the expression.
-   * @param expression The string representation of an expression
-   * @return The set of statistics (sum, mean, median, etc.) found in the expression
-   */
-  public static Set<String> getStatistics(String expression) {
-    HashSet<String> set = new HashSet<>();
-    int firstParen = expression.indexOf('(');
-    if (firstParen>0) {
-      String topOperation = expression.substring(0,firstParen).trim();
-      if (AnalyticsParams.ALL_STAT_SET.contains(topOperation)) {
-        set.add(expression);
-      } else if (!(topOperation.equals(AnalyticsParams.CONSTANT_NUMBER)||topOperation.equals(AnalyticsParams.CONSTANT_DATE)||topOperation.equals(AnalyticsParams.CONSTANT_STRING))) {
-        String operands = expression.substring(firstParen+1, expression.lastIndexOf(')')).trim();
-        String[] arguments = ExpressionFactory.getArguments(operands);
-        for (String argument : arguments) {
-          Set<String> more = getStatistics(argument);
-          if (more!=null) {
-            set.addAll(more);
-          }
-        }
-      }
-    }
-    if (set.size()==0) {
-      return null;
-    }
-    return set;
-  }
-  
-  /**
-   * Builds a Value Source from a given string
-   * 
-   * @param schema The schema being used.
-   * @param expression The string to be turned into an expression.
-   * @return The completed ValueSource
-   */
-  private static ValueSource buildSourceTree(IndexSchema schema, String expression) {
-    return buildSourceTree(schema,expression,FIELD_TYPE);
-  }
-  
-  /**
-   * Builds a Value Source from a given string and a given source type
-   * 
-   * @param schema The schema being used.
-   * @param expression The string to be turned into an expression.
-   * @param sourceType The type of source that must be returned.
-   * @return The completed ValueSource
-   */
-  private static ValueSource buildSourceTree(IndexSchema schema, String expression, int sourceType) {
-    int expressionType = getSourceType(expression);
-    if (sourceType != FIELD_TYPE && expressionType != FIELD_TYPE && 
-        expressionType != FILTER_TYPE && expressionType != sourceType) {
-      return null;
-    }
-    switch (expressionType) {
-    case NUMBER_TYPE : return buildNumericSource(schema, expression);
-    case DATE_TYPE : return buildDateSource(schema, expression);
-    case STRING_TYPE : return buildStringSource(schema, expression);
-    case FIELD_TYPE : return buildFieldSource(schema, expression, sourceType);
-    case FILTER_TYPE : return buildFilterSource(schema, expression.substring(expression.indexOf('(')+1,expression.lastIndexOf(')')), sourceType);
-    default : throw new SolrException(ErrorCode.BAD_REQUEST,expression+" is not a valid operation.");
-    }
-  }
-
-  /**
-   * Determines what type of value source the expression represents.
-   * 
-   * @param expression The expression representing the desired ValueSource
-   * @return NUMBER_TYPE, DATE_TYPE, STRING_TYPE or -1
-   */
-  private static int getSourceType(String expression) {
-    int paren = expression.indexOf('(');
-    if (paren<0) {
-      return FIELD_TYPE;
-    }
-    String operation = expression.substring(0,paren).trim();
-
-    if (AnalyticsParams.NUMERIC_OPERATION_SET.contains(operation)) {
-      return NUMBER_TYPE;
-    } else if (AnalyticsParams.DATE_OPERATION_SET.contains(operation)) {
-      return DATE_TYPE;
-    } else if (AnalyticsParams.STRING_OPERATION_SET.contains(operation)) {
-      return STRING_TYPE;
-    } else if (operation.equals(AnalyticsParams.FILTER)) {
-      return FILTER_TYPE;
-    }
-    throw new SolrException(ErrorCode.BAD_REQUEST,"The operation \""+operation+"\" in ["+expression+"] is not supported.");
-  }
-  
-  /**
-   *  Builds a value source for a given field, making sure that the field fits a given source type.
-   * @param schema the schema
-   * @param expressionString The name of the field to build a Field Source from.
-   * @param sourceType FIELD_TYPE for any type of field, NUMBER_TYPE for numeric fields, 
-   * DATE_TYPE for date fields and STRING_TYPE for string fields.
-   * @return a value source
-   */
-  private static ValueSource buildFieldSource(IndexSchema schema, String expressionString, int sourceType) {
-    SchemaField sf;
-    try {
-      sf = schema.getField(expressionString);
-    } catch (SolrException e) {
-      throw new SolrException(ErrorCode.BAD_REQUEST,"The field "+expressionString+" does not exist.",e);
-    }
-    FieldType type = sf.getType();
-    if ( type instanceof TrieIntField) {
-      if (sourceType!=NUMBER_TYPE&&sourceType!=FIELD_TYPE) {
-        return null;
-      }
-      return new IntFieldSource(expressionString) {
-        public String description() {
-          return field;
-        }
-      };
-    } else if (type instanceof TrieLongField) {
-      if (sourceType!=NUMBER_TYPE&&sourceType!=FIELD_TYPE) {
-        return null;
-      }
-      return new LongFieldSource(expressionString) {
-        public String description() {
-          return field;
-        }
-      };
-    } else if (type instanceof TrieFloatField) {
-      if (sourceType!=NUMBER_TYPE&&sourceType!=FIELD_TYPE) {
-        return null;
-      }
-      return new FloatFieldSource(expressionString) {
-        public String description() {
-          return field;
-        }
-      };
-    } else if (type instanceof TrieDoubleField) {
-      if (sourceType!=NUMBER_TYPE&&sourceType!=FIELD_TYPE) {
-        return null;
-      }
-      return new DoubleFieldSource(expressionString) {
-        public String description() {
-          return field;
-        }
-      };
-    } else if (type instanceof TrieDateField) {
-      if (sourceType!=DATE_TYPE&&sourceType!=FIELD_TYPE) {
-        return null;
-      }
-      return new DateFieldSource(expressionString) {
-        public String description() {
-          return field;
-        }
-      };
-    } else if (type instanceof StrField) {
-      if (sourceType!=STRING_TYPE&&sourceType!=FIELD_TYPE) {
-        return null;
-      }
-      return new BytesRefFieldSource(expressionString) {
-        public String description() {
-          return field;
-        }
-      };
-    }
-    throw new SolrException(ErrorCode.BAD_REQUEST, type.toString()+" is not a supported field type in Solr Analytics.");
-  }
-  
-  /**
-   * Builds a default is missing source that wraps a given source. A missing value is required for all 
-   * non-field value sources.
-   * @param schema the schema
-   * @param expressionString The name of the field to build a Field Source from.
-   * @param sourceType FIELD_TYPE for any type of field, NUMBER_TYPE for numeric fields, 
-   * DATE_TYPE for date fields and STRING_TYPE for string fields.
-   * @return a value source
-   */
-  @SuppressWarnings("deprecation")
-  private static ValueSource buildFilterSource(IndexSchema schema, String expressionString, int sourceType) {
-    String[] arguments = ExpressionFactory.getArguments(expressionString);
-    if (arguments.length!=2) {
-      throw new SolrException(ErrorCode.BAD_REQUEST,"Invalid arguments were given for \""+AnalyticsParams.FILTER+"\".");
-    }
-    ValueSource delegateSource = buildSourceTree(schema, arguments[0], sourceType);
-    if (delegateSource==null) {
-      return null;
-    }
-    Object defaultObject;
-
-    ValueSource src = delegateSource;
-    if (delegateSource instanceof FilterFieldSource) {
-      src = ((FilterFieldSource)delegateSource).getRootSource();
-    }
-    if ( src instanceof IntFieldSource) {
-      try {
-        defaultObject = new Integer(arguments[1]);
-      } catch (NumberFormatException e) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The filter value "+arguments[1]+" cannot be converted into an integer.",e);
-      }
-    } else if ( src instanceof DateFieldSource || src instanceof MultiDateFunction) {
-      defaultObject = DateMathParser.parseMath(null, arguments[1]);
-    } else if ( src instanceof LongFieldSource ) {
-      try {
-        defaultObject = new Long(arguments[1]);
-      } catch (NumberFormatException e) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The filter value "+arguments[1]+" cannot be converted into a long.",e);
-      }
-    } else if ( src instanceof FloatFieldSource ) {
-      try {
-        defaultObject = new Float(arguments[1]);
-      } catch (NumberFormatException e) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The filter value "+arguments[1]+" cannot be converted into a float.",e);
-      }
-    } else if ( src instanceof DoubleFieldSource || src instanceof SingleDoubleFunction ||
-                src instanceof DualDoubleFunction|| src instanceof MultiDoubleFunction) {
-      try {
-        defaultObject = new Double(arguments[1]);
-      } catch (NumberFormatException e) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The filter value "+arguments[1]+" cannot be converted into a double.",e);
-      }
-    } else {
-      defaultObject = arguments[1];
-    }
-    return new FilterFieldSource(delegateSource,defaultObject);
-  } 
-  
-  /**
-   * Recursively parses and breaks down the expression string to build a numeric ValueSource.
-   * 
-   * @param schema The schema to pull fields from.
-   * @param expressionString The expression string to build a ValueSource from.
-   * @return The value source represented by the given expressionString
-   */
-  private static ValueSource buildNumericSource(IndexSchema schema, String expressionString) {
-    int paren = expressionString.indexOf('(');
-    String[] arguments;
-    String operands;
-    if (paren<0) {
-      return buildFieldSource(schema,expressionString,NUMBER_TYPE);
-    } else {
-      try {
-        operands = expressionString.substring(paren+1, expressionString.lastIndexOf(')')).trim();
-      } catch (Exception e) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"Missing closing parenthesis in ["+expressionString+"]");
-      }
-      arguments = ExpressionFactory.getArguments(operands);
-    }
-    String operation = expressionString.substring(0, paren).trim();
-    if (operation.equals(AnalyticsParams.CONSTANT_NUMBER)) {
-      if (arguments.length!=1) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The constant number declaration ["+expressionString+"] does not have exactly 1 argument.");
-      }
-      return new ConstDoubleSource(Double.parseDouble(arguments[0]));
-    } else if (operation.equals(AnalyticsParams.NEGATE)) {
-      if (arguments.length!=1) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The negate operation ["+expressionString+"] does not have exactly 1 argument.");
-      }
-      ValueSource argSource = buildNumericSource(schema, arguments[0]);
-      if (argSource==null) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The operation \""+AnalyticsParams.NEGATE+"\" requires a numeric field or operation as argument. \""+arguments[0]+"\" is not a numeric field or operation.");
-      }
-      return new NegateDoubleFunction(argSource);
-    }  else if (operation.equals(AnalyticsParams.ABSOLUTE_VALUE)) {
-      if (arguments.length!=1) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The absolute value operation ["+expressionString+"] does not have exactly 1 argument.");
-      }
-      ValueSource argSource = buildNumericSource(schema, arguments[0]);
-      if (argSource==null) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The operation \""+AnalyticsParams.NEGATE+"\" requires a numeric field or operation as argument. \""+arguments[0]+"\" is not a numeric field or operation.");
-      }
-      return new AbsoluteValueDoubleFunction(argSource);
-    } else if (operation.equals(AnalyticsParams.FILTER)) {
-      return buildFilterSource(schema, operands, NUMBER_TYPE);
-    }
-    List<ValueSource> subExpressions = new ArrayList<>();
-    for (String argument : arguments) {
-      ValueSource argSource = buildNumericSource(schema, argument);
-      if (argSource == null) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The operation \""+operation+"\" requires numeric fields or operations as arguments. \""+argument+"\" is not a numeric field or operation.");
-      }
-      subExpressions.add(argSource);
-    }
-    if (operation.equals(AnalyticsParams.ADD)) {
-      return new AddDoubleFunction(subExpressions.toArray(new ValueSource[0]));
-    } else if (operation.equals(AnalyticsParams.MULTIPLY)) {
-      return new MultiplyDoubleFunction(subExpressions.toArray(new ValueSource[0]));
-    } else if (operation.equals(AnalyticsParams.DIVIDE)) {
-      if (subExpressions.size()!=2) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The divide operation ["+expressionString+"] does not have exactly 2 arguments.");
-      }
-      return new DivDoubleFunction(subExpressions.get(0),subExpressions.get(1));
-    } else if (operation.equals(AnalyticsParams.POWER)) {
-      if (subExpressions.size()!=2) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The power operation ["+expressionString+"] does not have exactly 2 arguments.");
-      }
-      return new PowDoubleFunction(subExpressions.get(0),subExpressions.get(1));
-    } else if (operation.equals(AnalyticsParams.LOG)) {
-      if (subExpressions.size()!=2) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The log operation ["+expressionString+"] does not have exactly 2 arguments.");
-      }
-      return new LogDoubleFunction(subExpressions.get(0), subExpressions.get(1));
-    } 
-    if (AnalyticsParams.DATE_OPERATION_SET.contains(operation)||AnalyticsParams.STRING_OPERATION_SET.contains(operation)) {
-      return null;
-    }
-    throw new SolrException(ErrorCode.BAD_REQUEST,"The operation ["+expressionString+"] is not supported.");
-  }
-
-  
-  /**
-   * Recursively parses and breaks down the expression string to build a date ValueSource.
-   * 
-   * @param schema The schema to pull fields from.
-   * @param expressionString The expression string to build a ValueSource from.
-   * @return The value source represented by the given expressionString
-   */
-  @SuppressWarnings("deprecation")
-  private static ValueSource buildDateSource(IndexSchema schema, String expressionString) {
-    int paren = expressionString.indexOf('(');
-    String[] arguments;
-    if (paren<0) {
-      return buildFieldSource(schema, expressionString, DATE_TYPE);
-    } else {
-      arguments = ExpressionFactory.getArguments(expressionString.substring(paren+1, expressionString.lastIndexOf(')')).trim());
-    }
-    String operands = arguments[0];
-    String operation = expressionString.substring(0, paren).trim();
-    if (operation.equals(AnalyticsParams.CONSTANT_DATE)) {
-      if (arguments.length!=1) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"The constant date declaration ["+expressionString+"] does not have exactly 1 argument.");
-      }
-      return new ConstDateSource(DateMathParser.parseMath(null, operands));
-    } else if (operation.equals(AnalyticsParams.FILTER)) {
-      return buildFilterSource(schema, operands, DATE_TYPE);
-    }
-    if (operation.equals(AnalyticsParams.DATE_MATH)) {
-      List<ValueSource> subExpressions = new ArrayList<>();
-      boolean first = true;
-      for (String argument : arguments) {
-        ValueSource argSource;
-        if (first) {
-          first = false;
-          argSource = buildDateSource(schema, argument);
-          if (argSource == null) {
-            throw new SolrException(ErrorCode.BAD_REQUEST,"\""+AnalyticsParams.DATE_MATH+"\" requires the first argument be a date operation or field. ["+argument+"] is not a date operation or field.");
-          }
-        } else {
-          argSource = buildStringSource(schema, argument);
-          if (argSource == null) {
-            throw new SolrException(ErrorCode.BAD_REQUEST,"\""+AnalyticsParams.DATE_MATH+"\" requires that all arguments except the first be string operations. ["+argument+"] is not a string operation.");
-          }
-        }
-        subExpressions.add(argSource);
-      }
-      return new DateMathFunction(subExpressions.toArray(new ValueSource[0]));
-    }
-    if (AnalyticsParams.NUMERIC_OPERATION_SET.contains(operation)||AnalyticsParams.STRING_OPERATION_SET.contains(operation)) {
-      return null;
-    }
-    throw new SolrException(ErrorCode.BAD_REQUEST,"The operation ["+expressionString+"] is not supported.");
-  }
-
-  
-  /**
-   * Recursively parses and breaks down the expression string to build a string ValueSource.
-   * 
-   * @param schema The schema to pull fields from.
-   * @param expressionString The expression string to build a ValueSource from.
-   * @return The value source represented by the given expressionString
-   */
-  private static ValueSource buildStringSource(IndexSchema schema, String expressionString) {
-    int paren = expressionString.indexOf('(');
-    String[] arguments;
-    if (paren<0) {
-      return buildFieldSource(schema, expressionString, FIELD_TYPE);
-    } else {
-      arguments = ExpressionFactory.getArguments(expressionString.substring(paren+1, expressionString.lastIndexOf(')')).trim());
-    }
-    String operands = arguments[0];
-    String operation = expressionString.substring(0, paren).trim();
-    if (operation.equals(AnalyticsParams.CONSTANT_STRING)) {
-      operands = expressionString.substring(paren+1, expressionString.lastIndexOf(')'));
-      return new ConstStringSource(operands);
-    } else if (operation.equals(AnalyticsParams.FILTER)) {
-      return buildFilterSource(schema,operands,FIELD_TYPE);
-    } else if (operation.equals(AnalyticsParams.REVERSE)) {
-      if (arguments.length!=1) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"\""+AnalyticsParams.REVERSE+"\" requires exactly one argument. The number of arguments in "+expressionString+" is not 1.");
-      }
-      return new ReverseStringFunction(buildStringSource(schema, operands));
-    }
-    List<ValueSource> subExpressions = new ArrayList<>();
-    for (String argument : arguments) {
-      subExpressions.add(buildSourceTree(schema, argument));
-    }
-    if (operation.equals(AnalyticsParams.CONCATENATE)) {
-      return new ConcatStringFunction(subExpressions.toArray(new ValueSource[0]));
-    } 
-    if (AnalyticsParams.NUMERIC_OPERATION_SET.contains(operation)) {
-      return buildNumericSource(schema, expressionString);
-    } else if (AnalyticsParams.DATE_OPERATION_SET.contains(operation)) {
-      return buildDateSource(schema, expressionString);
-    }
-    throw new SolrException(ErrorCode.BAD_REQUEST,"The operation ["+expressionString+"] is not supported.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/statistics/UniqueStatsCollector.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/statistics/UniqueStatsCollector.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/statistics/UniqueStatsCollector.java
deleted file mode 100644
index 461b0f4..0000000
--- a/solr/contrib/analytics/src/java/org/apache/solr/analytics/statistics/UniqueStatsCollector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.analytics.statistics;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * <code>UniqueValueCounter</code> computes the number of unique values.
- */
-public class UniqueStatsCollector extends AbstractDelegatingStatsCollector{
-  private final Set<Object> uniqueValues = new HashSet<>();
-  
-  public UniqueStatsCollector(StatsCollector delegate) {
-    super(delegate);
-  }
-  
-  @Override
-  public void collect(int doc) throws IOException {
-    super.collect(doc);
-    if (value.exists) {
-      uniqueValues.add(value.toObject());
-    }
-  }
-
-  @Override
-  public Comparable getStat(String stat) {
-    if (stat.equals("unique")) {
-      return new Long(uniqueValues.size());
-    }
-    return delegate.getStat(stat);
-  }
-
-  @Override
-  public void compute() {
-    delegate.compute();
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/statistics/package-info.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/statistics/package-info.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/statistics/package-info.java
deleted file mode 100644
index 90fa12d..0000000
--- a/solr/contrib/analytics/src/java/org/apache/solr/analytics/statistics/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-/** 
- * Statistics collectors reduce a list of Objects to a single value. Most implementations reduce a list to a statistic on that list.
- */
-package org.apache.solr.analytics.statistics;
-
-
-

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
new file mode 100644
index 0000000..f65e58f
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.solr.analytics.AnalyticsRequestManager;
+import org.apache.solr.analytics.AnalyticsRequestParser;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.handler.AnalyticsHandler;
+import org.apache.solr.handler.component.AnalyticsComponent;
+import org.apache.solr.response.AnalyticsShardResponseWriter;
+
+/**
+ * This class manages the requesting of shard responses from all shards in the queried collection.
+ * 
+ * <p>
+ * Shard Requests are sent to the {@link AnalyticsHandler} instead of the {@link AnalyticsComponent},
+ * which is the entrance to the analytics component for all client requests.
+ */
+public class AnalyticsShardRequestManager {
+  private final SolrParams params;
+  protected transient CloudSolrClient cloudSolrClient;
+  protected transient List<String> replicaUrls;
+  
+  /**
+   * All shards responses, which are received in parallel, are funneled into the manager.
+   * So the manager must be transient.
+   */
+  private transient final AnalyticsRequestManager manager;
+
+  public AnalyticsShardRequestManager(SolrParams params, AnalyticsRequestManager manager) {
+    this.manager = manager;
+    this.params = loadParams(params, manager.analyticsRequest);
+  }
+
+  /**
+   * Send out shard requests to each shard in the given collection.
+   * 
+   * @param collection that is being queried
+   * @param zkHost of the solr cloud hosting the collection
+   * @throws IOException if an exception occurs while picking shards or sending requests
+   */
+  public void sendRequests(String collection, String zkHost) throws IOException {
+    this.replicaUrls = new ArrayList<>();
+    this.cloudSolrClient = new Builder()
+        .withZkHost(zkHost)
+        .build();
+    try {
+      this.cloudSolrClient.connect();
+      pickShards(collection);
+      streamFromShards();
+    } finally {
+      cloudSolrClient.close();
+    }
+  }
+  
+  /**
+   * Pick one replica from each shard to send the shard requests to.
+   * 
+   * @param collection that is being queried
+   * @throws IOException if an exception occurs while finding replicas
+   */
+  protected void pickShards(String collection) throws IOException {
+    try {
+
+      ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+      ClusterState clusterState = zkStateReader.getClusterState();
+      Set<String> liveNodes = clusterState.getLiveNodes();
+
+      Collection<Slice> slices = clusterState.getCollection(collection).getActiveSlices();
+
+      for(Slice slice : slices) {
+        Collection<Replica> replicas = slice.getReplicas();
+        List<Replica> shuffler = new ArrayList<>();
+        for(Replica replica : replicas) {
+          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
+          shuffler.add(replica);
+        }
+
+        Collections.shuffle(shuffler, new Random());
+        Replica rep = shuffler.get(0);
+        ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
+        String url = zkProps.getCoreUrl();
+        replicaUrls.add(url);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Send a shard request to each chosen replica, streaming 
+   * the responses back to the {@link AnalyticsRequestManager}
+   * through the {@link AnalyticsShardResponseParser}.
+   * <p>
+   * A thread pool is used to send the requests simultaneously, 
+   * and therefore importing the results is also done in parallel.
+   * However the manager can only import one shard response at a time,
+   * so the {@link AnalyticsShardResponseParser} is blocked until each import is finished.
+   * 
+   * @throws IOException if an exception occurs while sending requests.
+   */
+  private void streamFromShards() throws IOException {
+    ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("SolrAnalyticsStream"));
+    List<Future<SolrException>> futures = new ArrayList<>();
+    List<AnalyticsShardRequester> openers = new ArrayList<>();
+    for (String replicaUrl : replicaUrls) {
+      AnalyticsShardRequester opener = new AnalyticsShardRequester(replicaUrl);
+      openers.add(opener);
+      Future<SolrException> future = service.submit(opener);
+      futures.add(future);
+    }
+    try {
+      for (Future<SolrException> f : futures) {
+        SolrException e = f.get();
+        if (e != null) {
+          throw e;
+        }
+      }
+    } catch (InterruptedException e1) {
+      throw new RuntimeException(e1);
+    } catch (ExecutionException e1) {
+      throw new RuntimeException(e1);
+    } finally {
+      service.shutdown();
+      for (AnalyticsShardRequester opener : openers) {
+        opener.close();
+      }
+    }
+  }
+
+  /**
+   * Create a {@link SolrParams} for shard requests. The only parameters that are copied over from
+   * the original search request are "q" and "fq".
+   * 
+   * <p>
+   * The request is sent to the {@link AnalyticsHandler} and the output will be encoded in the analytics bit-stream
+   * format generated by the {@link AnalyticsShardResponseWriter}.
+   * 
+   * @param paramsIn of the original solr request
+   * @param analyticsRequest string representation
+   * @return shard request SolrParams
+   */
+  private static SolrParams loadParams(SolrParams paramsIn, String analyticsRequest) {
+    ModifiableSolrParams solrParams = new ModifiableSolrParams();
+
+    solrParams.add(CommonParams.QT, AnalyticsHandler.NAME);
+    solrParams.add(CommonParams.WT, AnalyticsShardResponseWriter.NAME);
+    solrParams.add(CommonParams.Q, paramsIn.get(CommonParams.Q));
+    solrParams.add(CommonParams.FQ, paramsIn.getParams(CommonParams.FQ));
+    solrParams.add(AnalyticsRequestParser.analyticsParamName, analyticsRequest);
+
+    return solrParams;
+  }
+
+  /**
+   * A class that opens a connection to a given solr instance, a selected replica of the queried collection,
+   * and sends a analytics request to the {@link AnalyticsHandler}. The results are processed by an
+   * {@link AnalyticsShardResponseParser} constructed with the {@link AnalyticsRequestManager} passed
+   * to the parent {@link AnalyticsShardRequestManager}.
+   */
+  protected class AnalyticsShardRequester implements Callable<SolrException> {
+    private String baseUrl;
+    HttpSolrClient client;
+
+    /**
+     * Create a requester for analytics shard data.
+     * 
+     * @param baseUrl of the replica to send the request to
+     */
+    public AnalyticsShardRequester(String baseUrl) {
+      this.baseUrl = baseUrl;
+      this.client = null;
+    }
+    
+    /**
+     * Send the analytics request to the shard.
+     */
+    @Override
+    public SolrException call() throws Exception {
+      client = new HttpSolrClient.Builder(baseUrl).build();
+      QueryRequest query = new QueryRequest( params );
+      query.setPath(AnalyticsHandler.NAME);
+      query.setResponseParser(new AnalyticsShardResponseParser(manager));
+      query.setMethod(SolrRequest.METHOD.POST);
+      NamedList<Object> exception = client.request(query);
+      if (exception.size() > 0) {
+        return (SolrException)exception.getVal(0);
+      }
+      return null;
+    }
+    
+    /**
+     * Close the connection to the solr instance.
+     * 
+     * @throws IOException if an error occurs while closing the connection
+     */
+    public void close() throws IOException {
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardResponseParser.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardResponseParser.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardResponseParser.java
new file mode 100644
index 0000000..c7f4094
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardResponseParser.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream;
+
+import org.apache.solr.analytics.AnalyticsRequestManager;
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.AnalyticsHandler;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.Reader;
+
+/**
+ * This parser initiates a merge of an Analytics Shard Response, sent from the {@link AnalyticsHandler}.
+ * 
+ * The input stream is immediately sent to the given {@link AnalyticsRequestManager} to merge.
+ */
+public class AnalyticsShardResponseParser extends ResponseParser {
+  public static final String BINARY_CONTENT_TYPE = "application/octet-stream";
+  public static final String STREAM = "application/octet-stream";
+  
+  private final AnalyticsRequestManager manager;
+
+  /**
+   * 
+   * @param manager the manager of the current Analytics Request, will manage the merging of shard data
+   */
+  public AnalyticsShardResponseParser(AnalyticsRequestManager manager) {
+    this.manager = manager;
+  }
+
+  @Override
+  public String getWriterType() {
+    return "analytics_shard_stream";
+  }
+
+  @Override
+  public NamedList<Object> processResponse(InputStream body, String encoding) {
+    DataInputStream input = new DataInputStream(body);
+    //check to see if the response is an exception
+    NamedList<Object> exception = new NamedList<>();
+    try {
+      if (input.readBoolean()) {
+        manager.importShardData(input);
+      } else {
+        exception.add("Exception", new ObjectInputStream(input).readObject());
+      }
+    } catch (IOException e) {
+      exception.add("Exception", new SolrException(ErrorCode.SERVER_ERROR, "Couldn't process analytics shard response", e));
+    } catch (ClassNotFoundException e1) {
+      throw new RuntimeException(e1);
+    }
+    return exception;
+  }
+
+  @Override
+  public String getContentType() {
+    return BINARY_CONTENT_TYPE;
+  }
+
+  @Override
+  public String getVersion() {
+    return "1";
+  }
+
+  @Override
+  public NamedList<Object> processResponse(Reader reader) {
+    throw new RuntimeException("Cannot handle character stream");
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/package-info.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/package-info.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/package-info.java
new file mode 100644
index 0000000..31563cd
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+/** 
+ * Classes to manage the I/O between shards.
+ */
+package org.apache.solr.analytics.stream;
+
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/BooleanArrayReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/BooleanArrayReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/BooleanArrayReservation.java
new file mode 100644
index 0000000..b0ca38d
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/BooleanArrayReservation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.BooleanSupplier;
+import java.util.function.IntConsumer;
+import java.util.function.IntSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.BooleanDataArrayReader;
+import org.apache.solr.analytics.stream.reservation.write.BooleanDataArrayWriter;
+import org.apache.solr.analytics.util.function.BooleanConsumer;
+
+public class BooleanArrayReservation extends ReductionDataArrayReservation<BooleanConsumer, BooleanSupplier> {
+  
+  public BooleanArrayReservation(BooleanConsumer applier, IntConsumer sizeApplier, BooleanSupplier extractor, IntSupplier sizeExtractor) {
+    super(applier, sizeApplier, extractor, sizeExtractor);
+  }
+
+  @Override
+  public BooleanDataArrayReader createReadStream(DataInput input) {
+    return new BooleanDataArrayReader(input, applier, sizeApplier);
+  }
+
+  @Override
+  public BooleanDataArrayWriter createWriteStream(DataOutput output) {
+    return new BooleanDataArrayWriter(output, extractor, sizeExtractor);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/BooleanCheckedReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/BooleanCheckedReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/BooleanCheckedReservation.java
new file mode 100644
index 0000000..1c3d9ec
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/BooleanCheckedReservation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.BooleanSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.BooleanCheckedDataReader;
+import org.apache.solr.analytics.stream.reservation.write.BooleanCheckedDataWriter;
+import org.apache.solr.analytics.util.function.BooleanConsumer;
+
+public class BooleanCheckedReservation extends ReductionCheckedDataReservation<BooleanConsumer, BooleanSupplier> {
+  
+  public BooleanCheckedReservation(BooleanConsumer applier, BooleanSupplier extractor, BooleanSupplier exists) {
+    super(applier, extractor, exists);
+  }
+
+  @Override
+  public BooleanCheckedDataReader createReadStream(DataInput input) {
+    return new BooleanCheckedDataReader(input, applier);
+  }
+
+  @Override
+  public BooleanCheckedDataWriter createWriteStream(DataOutput output) {
+    return new BooleanCheckedDataWriter(output, extractor, exists);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/BooleanReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/BooleanReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/BooleanReservation.java
new file mode 100644
index 0000000..2e6b718
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/BooleanReservation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.BooleanSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.BooleanDataReader;
+import org.apache.solr.analytics.stream.reservation.write.BooleanDataWriter;
+import org.apache.solr.analytics.util.function.BooleanConsumer;
+
+public class BooleanReservation extends ReductionDataReservation<BooleanConsumer, BooleanSupplier> {
+  
+  public BooleanReservation(BooleanConsumer applier, BooleanSupplier extractor) {
+    super(applier, extractor);
+  }
+
+  @Override
+  public BooleanDataReader createReadStream(DataInput input) {
+    return new BooleanDataReader(input, applier);
+  }
+
+  @Override
+  public BooleanDataWriter createWriteStream(DataOutput output) {
+    return new BooleanDataWriter(output, extractor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/DoubleArrayReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/DoubleArrayReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/DoubleArrayReservation.java
new file mode 100644
index 0000000..6acf657
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/DoubleArrayReservation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.DoubleConsumer;
+import java.util.function.DoubleSupplier;
+import java.util.function.IntConsumer;
+import java.util.function.IntSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.DoubleDataArrayReader;
+import org.apache.solr.analytics.stream.reservation.write.DoubleDataArrayWriter;
+
+public class DoubleArrayReservation extends ReductionDataArrayReservation<DoubleConsumer, DoubleSupplier> {
+  
+  public DoubleArrayReservation(DoubleConsumer applier, IntConsumer sizeApplier, DoubleSupplier extractor, IntSupplier sizeExtractor) {
+    super(applier, sizeApplier, extractor, sizeExtractor);
+  }
+
+  @Override
+  public DoubleDataArrayReader createReadStream(DataInput input) {
+    return new DoubleDataArrayReader(input, applier, sizeApplier);
+  }
+
+  @Override
+  public DoubleDataArrayWriter createWriteStream(DataOutput output) {
+    return new DoubleDataArrayWriter(output, extractor, sizeExtractor);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/DoubleCheckedReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/DoubleCheckedReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/DoubleCheckedReservation.java
new file mode 100644
index 0000000..a1dd461
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/DoubleCheckedReservation.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.BooleanSupplier;
+import java.util.function.DoubleConsumer;
+import java.util.function.DoubleSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.DoubleCheckedDataReader;
+import org.apache.solr.analytics.stream.reservation.write.DoubleCheckedDataWriter;
+
+public class DoubleCheckedReservation extends ReductionCheckedDataReservation<DoubleConsumer, DoubleSupplier> {
+  
+  public DoubleCheckedReservation(DoubleConsumer applier, DoubleSupplier extractor, BooleanSupplier exists) {
+    super(applier, extractor, exists);
+  }
+
+  @Override
+  public DoubleCheckedDataReader createReadStream(DataInput input) {
+    return new DoubleCheckedDataReader(input, applier);
+  }
+
+  @Override
+  public DoubleCheckedDataWriter createWriteStream(DataOutput output) {
+    return new DoubleCheckedDataWriter(output, extractor, exists);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/DoubleReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/DoubleReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/DoubleReservation.java
new file mode 100644
index 0000000..8ef3bef
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/DoubleReservation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.DoubleConsumer;
+import java.util.function.DoubleSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.DoubleDataReader;
+import org.apache.solr.analytics.stream.reservation.write.DoubleDataWriter;
+
+public class DoubleReservation extends ReductionDataReservation<DoubleConsumer, DoubleSupplier> {
+  
+  public DoubleReservation(DoubleConsumer applier, DoubleSupplier extractor) {
+    super(applier, extractor);
+  }
+
+  @Override
+  public DoubleDataReader createReadStream(DataInput input) {
+    return new DoubleDataReader(input, applier);
+  }
+
+  @Override
+  public DoubleDataWriter createWriteStream(DataOutput output) {
+    return new DoubleDataWriter(output, extractor);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/FloatArrayReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/FloatArrayReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/FloatArrayReservation.java
new file mode 100644
index 0000000..702106d
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/FloatArrayReservation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.IntConsumer;
+import java.util.function.IntSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.FloatDataArrayReader;
+import org.apache.solr.analytics.stream.reservation.write.FloatDataArrayWriter;
+import org.apache.solr.analytics.util.function.FloatConsumer;
+import org.apache.solr.analytics.util.function.FloatSupplier;
+
+public class FloatArrayReservation extends ReductionDataArrayReservation<FloatConsumer, FloatSupplier> {
+  
+  public FloatArrayReservation(FloatConsumer applier, IntConsumer sizeApplier, FloatSupplier extractor, IntSupplier sizeExtractor) {
+    super(applier, sizeApplier, extractor, sizeExtractor);
+  }
+
+  @Override
+  public FloatDataArrayReader createReadStream(DataInput input) {
+    return new FloatDataArrayReader(input, applier, sizeApplier);
+  }
+
+  @Override
+  public FloatDataArrayWriter createWriteStream(DataOutput output) {
+    return new FloatDataArrayWriter(output, extractor, sizeExtractor);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/FloatCheckedReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/FloatCheckedReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/FloatCheckedReservation.java
new file mode 100644
index 0000000..581772c
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/FloatCheckedReservation.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.BooleanSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.FloatCheckedDataReader;
+import org.apache.solr.analytics.stream.reservation.write.FloatCheckedDataWriter;
+import org.apache.solr.analytics.util.function.FloatConsumer;
+import org.apache.solr.analytics.util.function.FloatSupplier;
+
+public class FloatCheckedReservation extends ReductionCheckedDataReservation<FloatConsumer, FloatSupplier> {
+  
+  public FloatCheckedReservation(FloatConsumer applier, FloatSupplier extractor, BooleanSupplier exists) {
+    super(applier, extractor, exists);
+  }
+
+  @Override
+  public FloatCheckedDataReader createReadStream(DataInput input) {
+    return new FloatCheckedDataReader(input, applier);
+  }
+
+  @Override
+  public FloatCheckedDataWriter createWriteStream(DataOutput output) {
+    return new FloatCheckedDataWriter(output, extractor, exists);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/FloatReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/FloatReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/FloatReservation.java
new file mode 100644
index 0000000..c0ea5f0
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/FloatReservation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+
+import org.apache.solr.analytics.stream.reservation.read.FloatDataReader;
+import org.apache.solr.analytics.stream.reservation.write.FloatDataWriter;
+import org.apache.solr.analytics.util.function.FloatConsumer;
+import org.apache.solr.analytics.util.function.FloatSupplier;
+
+public class FloatReservation extends ReductionDataReservation<FloatConsumer, FloatSupplier> {
+  
+  public FloatReservation(FloatConsumer applier, FloatSupplier extractor) {
+    super(applier, extractor);
+  }
+
+  @Override
+  public FloatDataReader createReadStream(DataInput input) {
+    return new FloatDataReader(input, applier);
+  }
+
+  @Override
+  public FloatDataWriter createWriteStream(DataOutput output) {
+    return new FloatDataWriter(output, extractor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/IntArrayReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/IntArrayReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/IntArrayReservation.java
new file mode 100644
index 0000000..e3639a1
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/IntArrayReservation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.IntConsumer;
+import java.util.function.IntSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.IntDataArrayReader;
+import org.apache.solr.analytics.stream.reservation.write.IntDataArrayWriter;
+
+public class IntArrayReservation extends ReductionDataArrayReservation<IntConsumer, IntSupplier> {
+  
+  public IntArrayReservation(IntConsumer applier, IntConsumer sizeApplier, IntSupplier extractor, IntSupplier sizeExtractor) {
+    super(applier, sizeApplier, extractor, sizeExtractor);
+  }
+
+  @Override
+  public IntDataArrayReader createReadStream(DataInput input) {
+    return new IntDataArrayReader(input, applier, sizeApplier);
+  }
+
+  @Override
+  public IntDataArrayWriter createWriteStream(DataOutput output) {
+    return new IntDataArrayWriter(output, extractor, sizeExtractor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/IntCheckedReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/IntCheckedReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/IntCheckedReservation.java
new file mode 100644
index 0000000..c0a7cf2
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/IntCheckedReservation.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.BooleanSupplier;
+import java.util.function.IntConsumer;
+import java.util.function.IntSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.IntCheckedDataReader;
+import org.apache.solr.analytics.stream.reservation.write.IntCheckedDataWriter;
+
+public class IntCheckedReservation extends ReductionCheckedDataReservation<IntConsumer, IntSupplier> {
+  
+  public IntCheckedReservation(IntConsumer applier, IntSupplier extractor, BooleanSupplier exists) {
+    super(applier, extractor, exists);
+  }
+
+  @Override
+  public IntCheckedDataReader createReadStream(DataInput input) {
+    return new IntCheckedDataReader(input, applier);
+  }
+
+  @Override
+  public IntCheckedDataWriter createWriteStream(DataOutput output) {
+    return new IntCheckedDataWriter(output, extractor, exists);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/IntReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/IntReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/IntReservation.java
new file mode 100644
index 0000000..cb66b58
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/IntReservation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.IntConsumer;
+import java.util.function.IntSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.IntDataReader;
+import org.apache.solr.analytics.stream.reservation.write.IntDataWriter;
+
+public class IntReservation extends ReductionDataReservation<IntConsumer, IntSupplier> {
+  
+  public IntReservation(IntConsumer applier, IntSupplier extractor) {
+    super(applier, extractor);
+  }
+
+  @Override
+  public IntDataReader createReadStream(DataInput input) {
+    return new IntDataReader(input, applier);
+  }
+
+  @Override
+  public IntDataWriter createWriteStream(DataOutput output) {
+    return new IntDataWriter(output, extractor);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/LongArrayReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/LongArrayReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/LongArrayReservation.java
new file mode 100644
index 0000000..980bc2b
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/LongArrayReservation.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.LongConsumer;
+import java.util.function.LongSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.LongDataArrayReader;
+import org.apache.solr.analytics.stream.reservation.write.LongDataArrayWriter;
+
+import java.util.function.IntConsumer;
+import java.util.function.IntSupplier;
+
+public class LongArrayReservation extends ReductionDataArrayReservation<LongConsumer, LongSupplier> {
+  
+  public LongArrayReservation(LongConsumer applier, IntConsumer sizeApplier, LongSupplier extractor, IntSupplier sizeExtractor) {
+    super(applier, sizeApplier, extractor, sizeExtractor);
+  }
+
+  @Override
+  public LongDataArrayReader createReadStream(DataInput input) {
+    return new LongDataArrayReader(input, applier, sizeApplier);
+  }
+
+  @Override
+  public LongDataArrayWriter createWriteStream(DataOutput output) {
+    return new LongDataArrayWriter(output, extractor, sizeExtractor);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/LongCheckedReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/LongCheckedReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/LongCheckedReservation.java
new file mode 100644
index 0000000..e1626e5
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/LongCheckedReservation.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.BooleanSupplier;
+import java.util.function.LongConsumer;
+import java.util.function.LongSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.LongCheckedDataReader;
+import org.apache.solr.analytics.stream.reservation.write.LongCheckedDataWriter;
+
+public class LongCheckedReservation extends ReductionCheckedDataReservation<LongConsumer, LongSupplier> {
+  
+  public LongCheckedReservation(LongConsumer applier, LongSupplier extractor, BooleanSupplier exists) {
+    super(applier, extractor, exists);
+  }
+
+  @Override
+  public LongCheckedDataReader createReadStream(DataInput input) {
+    return new LongCheckedDataReader(input, applier);
+  }
+
+  @Override
+  public LongCheckedDataWriter createWriteStream(DataOutput output) {
+    return new LongCheckedDataWriter(output, extractor, exists);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d5963beb/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/LongReservation.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/LongReservation.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/LongReservation.java
new file mode 100644
index 0000000..daf8ec3
--- /dev/null
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/reservation/LongReservation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.analytics.stream.reservation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.function.LongConsumer;
+import java.util.function.LongSupplier;
+
+import org.apache.solr.analytics.stream.reservation.read.LongDataReader;
+import org.apache.solr.analytics.stream.reservation.write.LongDataWriter;
+
+public class LongReservation extends ReductionDataReservation<LongConsumer, LongSupplier> {
+  
+  public LongReservation(LongConsumer applier, LongSupplier extractor) {
+    super(applier, extractor);
+  }
+
+  @Override
+  public LongDataReader createReadStream(DataInput input) {
+    return new LongDataReader(input, applier);
+  }
+
+  @Override
+  public LongDataWriter createWriteStream(DataOutput output) {
+    return new LongDataWriter(output, extractor);
+  }
+}
\ No newline at end of file