You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:51 UTC

[50/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/edgent/analytics/math3/stat/JsonStorelessStatistic.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/edgent/analytics/math3/stat/JsonStorelessStatistic.java b/analytics/math3/src/main/java/edgent/analytics/math3/stat/JsonStorelessStatistic.java
deleted file mode 100644
index 66bff5b..0000000
--- a/analytics/math3/src/main/java/edgent/analytics/math3/stat/JsonStorelessStatistic.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.analytics.math3.stat;
-
-import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import edgent.analytics.math3.json.JsonUnivariateAggregator;
-
-/**
- * JSON univariate aggregator implementation wrapping a {@code StorelessUnivariateStatistic}.
- */
-public class JsonStorelessStatistic implements JsonUnivariateAggregator {
-        
-    private final Statistic stat;
-    private final StorelessUnivariateStatistic statImpl;
-    
-    public JsonStorelessStatistic(Statistic stat, StorelessUnivariateStatistic statImpl) {
-        this.stat = stat;
-        this.statImpl = statImpl;
-    }
-
-    @Override
-    public void clear(JsonElement partition, int n) {
-        statImpl.clear();
-    }
-
-    @Override
-    public void increment(double v) {
-        statImpl.increment(v);
-    }
-
-    @Override
-    public void result(JsonElement partition, JsonObject result) {        
-        double rv = statImpl.getResult();
-        
-        if (Double.isFinite(rv))
-            result.addProperty(stat.name(), rv);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/edgent/analytics/math3/stat/Regression.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/edgent/analytics/math3/stat/Regression.java b/analytics/math3/src/main/java/edgent/analytics/math3/stat/Regression.java
deleted file mode 100644
index 6e3b76c..0000000
--- a/analytics/math3/src/main/java/edgent/analytics/math3/stat/Regression.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.analytics.math3.stat;
-
-import edgent.analytics.math3.json.JsonUnivariateAggregate;
-import edgent.analytics.math3.json.JsonUnivariateAggregator;
-
-/**
- * Univariate regression aggregates.
- *
- */
-public enum Regression implements JsonUnivariateAggregate {
-    
-    /**
-     * Calculate the slope of a single variable.
-     * The slope is calculated using the first
-     * order of a ordinary least squares
-     * linear regression.
-     * The list of values for the single
-     * single variable are processed as Y-values
-     * that are evenly spaced on the X-axis.
-     * <BR>
-     * This is useful as a simple determination
-     * if the variable is increasing or decreasing.
-     * <BR>
-     * The slope value is represented as a {@code double}
-     * with the key {@code SLOPE} in the aggregate result.
-     * <BR>
-     * If the window to be aggregated contains less than
-     * two values then no regression is performed.
-     */
-    SLOPE() {
-        @Override
-        public JsonUnivariateAggregator get() {
-            return new JsonOLS(this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/edgent/analytics/math3/stat/Statistic.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/edgent/analytics/math3/stat/Statistic.java b/analytics/math3/src/main/java/edgent/analytics/math3/stat/Statistic.java
deleted file mode 100644
index 49f2a92..0000000
--- a/analytics/math3/src/main/java/edgent/analytics/math3/stat/Statistic.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.analytics.math3.stat;
-
-import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
-import org.apache.commons.math3.stat.descriptive.moment.Mean;
-import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
-import org.apache.commons.math3.stat.descriptive.rank.Max;
-import org.apache.commons.math3.stat.descriptive.rank.Min;
-import org.apache.commons.math3.stat.descriptive.summary.Sum;
-
-import edgent.analytics.math3.json.JsonAnalytics;
-import edgent.analytics.math3.json.JsonUnivariateAggregate;
-import edgent.analytics.math3.json.JsonUnivariateAggregator;
-
-/**
- * Statistic implementations.
- * 
- * Univariate statistic aggregate calculations against a value
- * extracted from a {@code JsonObject}.
- * 
- * @see JsonAnalytics
- */
-public enum Statistic implements JsonUnivariateAggregate {
-    
-    /**
-     * Calculate the arithmetic mean.
-     * The mean value is represented as a {@code double}
-     * with the key {@code MEAN} in the aggregate result.
-     */
-    MEAN(new Mean()),
-    /**
-     * Calculate the minimum.
-     * The minimum value is represented as a {@code double}
-     * with the key {@code MIN} in the aggregate result.
-     */
-    MIN(new Min()),
-    /**
-     * Calculate the maximum.
-     * The maximum value is represented as a {@code double}
-     * with the key {@code MAXIMUM} in the aggregate result.
-     */
-    MAX(new Max()),
-    /**
-     * Calculate the sum.
-     * The sum is represented as a {@code double}
-     * with the key {@code SUM} in the aggregate result.
-     */
-    SUM(new Sum()),
-    /**
-     * Calculate the standard deviation.
-     */
-    STDDEV(new StandardDeviation());
-
-    private final StorelessUnivariateStatistic statImpl;
-
-    private Statistic(StorelessUnivariateStatistic statImpl) {
-        this.statImpl = statImpl;
-        statImpl.clear();
-    }
-
-    /**
-     * Return a new instance of this statistic implementation.
-     * @return A new instance of this statistic implementation.
-     */
-    @Override
-    public JsonUnivariateAggregator get() {
-        return new JsonStorelessStatistic(this, statImpl.copy());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/edgent/analytics/math3/stat/package-info.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/edgent/analytics/math3/stat/package-info.java b/analytics/math3/src/main/java/edgent/analytics/math3/stat/package-info.java
deleted file mode 100644
index c95591f..0000000
--- a/analytics/math3/src/main/java/edgent/analytics/math3/stat/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Statistical algorithms using Apache Commons Math.
- */
-package edgent.analytics.math3.stat;
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java
new file mode 100644
index 0000000..c6e9108
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java
@@ -0,0 +1,443 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.json;
+
+import java.util.List;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.ToDoubleFunction;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TWindow;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * Apache Common Math analytics for streams with JSON tuples.
+ *
+ */
+public class JsonAnalytics {
+    
+    /**
+     * Aggregate against a single {@code Numeric} variable contained in an JSON object.
+     * 
+     * The returned stream contains a tuple for each execution performed against a window partition.
+     * The tuple is a {@code JsonObject} containing:
+     * <UL>
+     * <LI> Partition key of type {@code K} as a property with key {@code resultPartitionProperty}. </LI>
+     * <LI> Aggregation results as a {@code JsonObject} as a property with key {@code valueProperty}.
+     * This results object contains the results of all aggregations defined by {@code aggregates} against
+     * {@code double} property with key {@code valueProperty}.
+     * <BR>
+     * Each {@link JsonUnivariateAggregate} declares how it represents its aggregation in this result
+     * object.
+     * </LI>
+     * </UL>
+     * <P>
+     * For example if the window contains these three tuples (pseudo JSON) for
+     * partition 3:
+     * <BR>
+     * <code>{id=3,reading=2.0}, {id=3,reading=2.6}, {id=3,reading=1.8}</code>
+     * <BR>
+     * the resulting aggregation for the stream returned by:
+     * <BR>
+     * {@code aggregate(window, "id", "reading", Statistic.MIN, Statistic.MAX)}
+     * <BR>
+     * would contain this tuple with the maximum and minimum values in the {@code reading}
+     * JSON object:
+     * <BR>
+     * <code>{id=3, reading={MIN=1.8, MAX=1.8}}</code>
+     * </P>
+     * @param <K> Partition type
+     * 
+     * @param window Window to aggregate over.
+     * @param resultPartitionProperty Property to store the partition key in tuples on the returned stream.
+     * @param valueProperty JSON property containing the value to aggregate.
+     * @param aggregates Which aggregations to be performed.
+     * @return Stream that will contain aggregations.
+     */
+    public static <K extends JsonElement> TStream<JsonObject> aggregate(
+            TWindow<JsonObject, K> window,
+            String resultPartitionProperty,
+            String valueProperty,
+            JsonUnivariateAggregate... aggregates) {
+        return aggregate(window, resultPartitionProperty, valueProperty, j -> j.get(valueProperty).getAsDouble(), aggregates);
+
+    }
+    
+    /**
+     * Aggregate against a single {@code Numeric} variable contained in an JSON object.
+     * 
+     * The returned stream contains a tuple for each execution performed against a window partition.
+     * The tuple is a {@code JsonObject} containing:
+     * <UL>
+     * <LI> Partition key of type {@code K} as a property with key {@code resultPartitionProperty}. </LI>
+     * <LI> Aggregation results as a {@code JsonObject} as a property with key {@code resultProperty}.
+     * This results object contains the results of all aggregations defined by {@code aggregates} against
+     * value returned by {@code valueGetter}.
+     * <BR>
+     * Each {@link JsonUnivariateAggregate} declares how it represents its aggregation in this result
+     * object.
+     * </LI>
+     * </UL>
+     * 
+     * @param <K> Partition type
+     * @param window Window to aggregate over.
+     * @param resultPartitionProperty Property to store the partition key in tuples on the returned stream.
+     * @param resultProperty Property to store the aggregations in tuples on the returned stream.
+     * @param valueGetter How to obtain the single variable from input tuples.
+     * @param aggregates Which aggregations to be performed.
+     * @return Stream that will contain aggregations.
+     */
+    public static <K extends JsonElement> TStream<JsonObject> aggregate(
+            TWindow<JsonObject, K> window,
+            String resultPartitionProperty,
+            String resultProperty,
+            ToDoubleFunction<JsonObject> valueGetter,
+            JsonUnivariateAggregate... aggregates) {
+
+        return window.aggregate(aggregateList(
+                resultPartitionProperty,
+                resultProperty,
+                valueGetter,
+                aggregates
+                ));
+    }
+    
+    /**
+     * Create a Function that aggregates against a single {@code Numeric}
+     * variable contained in an JSON object.
+     * 
+     * Calling {@code apply(List<JsonObject>)} on the returned {@code BiFunction}
+     * returns a {@link JsonObject} containing:
+     * <UL>
+     * <LI> Partition key of type {@code K} as a property with key {@code resultPartitionProperty}. </LI>
+     * <LI> Aggregation results as a {@code JsonObject} as a property with key {@code valueProperty}.
+     * This results object contains the results of all aggregations defined by {@code aggregates}
+     * against the value returned by {@code valueGetter}.
+     * <BR>
+     * Each {@link JsonUnivariateAggregate} declares how it represents its aggregation in this result
+     * object.
+     * </LI>
+     * </UL>
+     * <P>
+     * For example if the list contains these three tuples (pseudo JSON) for
+     * partition 3:
+     * <BR>
+     * <code>{id=3,reading=2.0}, {id=3,reading=2.6}, {id=3,reading=1.8}</code>
+     * <BR>
+     * the resulting aggregation for the JsonObject returned by:
+     * <BR>
+     * {@code aggregateList("id", "reading", Statistic.MIN, Statistic.MAX).apply(list, 3)}
+     * <BR>
+     * would be this tuple with the maximum and minimum values in the {@code reading}
+     * JSON object:
+     * <BR>
+     * <code>{id=3, reading={MIN=1.8, MAX=1.8}}</code>
+     * </P>
+     * @param <K> Partition type
+     * 
+     * @param resultPartitionProperty Property to store the partition key in tuples on the returned stream.
+     * @param resultProperty Property to store the aggregations in the returned JsonObject.
+     * @param valueGetter How to obtain the single variable from input tuples.
+     * @param aggregates Which aggregations to be performed.
+     * @return Function that performs the aggregations.
+     */
+    public static <K extends JsonElement> 
+    BiFunction<List<JsonObject>, K, JsonObject> aggregateList(
+            String resultPartitionProperty,
+            String resultProperty,
+            ToDoubleFunction<JsonObject> valueGetter,
+            JsonUnivariateAggregate... aggregates) {
+
+        BiFunction<List<JsonObject>, K, JsonObject> function = (tuples, partition) -> {
+            
+            final JsonUnivariateAggregator[] aggregators = new JsonUnivariateAggregator[aggregates.length];
+            for (int i = 0; i < aggregates.length; i++) {
+                aggregators[i] = aggregates[i].get();
+            }     
+            
+            final JsonObject result = new JsonObject();
+            result.add(resultPartitionProperty, partition);
+            JsonObject aggregateResults = new JsonObject();
+            result.add(resultProperty, aggregateResults);
+
+            final int n = tuples.size();
+            aggregateResults.addProperty(JsonUnivariateAggregate.N, n);
+            
+            if (n != 0) {
+
+                for (JsonUnivariateAggregator agg : aggregators) {
+                    agg.clear(partition, n);
+                }
+                for (JsonObject tuple : tuples) {
+                    double v = valueGetter.applyAsDouble(tuple);
+                    for (JsonUnivariateAggregator agg : aggregators) {
+                        agg.increment(v);
+                    }
+                }
+                for (JsonUnivariateAggregator agg : aggregators) {
+                    agg.result(partition, aggregateResults);
+                }
+            }
+
+            return result;
+        };
+        
+        return function;
+    }
+    
+    /**
+     * Aggregate against multiple {@code Numeric} variables contained in an JSON object.
+     * <P>
+     * This is a multi-variable analog of {@link #aggregate(TWindow, String, String, JsonUnivariateAggregate...) aggregate()}
+     * </P>
+     * <P>
+     * See {@link #mvAggregateList(String, String, List) mvAggregateList()} for
+     * a description of the aggregation processing and result stream.
+     * </P>
+     * <P>
+     * Sample use:
+     * <pre>{@code
+     * // Ingest the data.  The JsonObject tuples have properties:
+     * //   "id" - the partitionKey
+     * //   "tx" - a numeric data variable
+     * //   "rx" - a numeric data variable
+     * TStream<JsonObject> ingestData = ...
+     * 
+     * // Define the tuple variables and their aggregations to compute
+     * List<Pair<String, JsonUnivariateAggregate[]>> aggSpecs = new ArrayList<>();
+     * aggSpecs.add(mkAggregationSpec("tx", Statistics.MIN, Statistics.MAX));
+     * aggSpecs.add(mkAggregationSpec("rx", Statistics.MEAN));
+     * 
+     * // Create the window over which to aggregate
+     * TWindow<JsonObject, JsonElement> window = 
+     *    ingestData.last(5, TimeUnit.SECONDS, jo -> jo.get("id"));
+     * 
+     * // Create a stream with the aggregations. The result tuples have properties:
+     * //   "id" - the partitionKey
+     * //   "aggResults" - the aggregation results
+     * TStream<JsonObject> aggResults = 
+     *    mvAggregate(window, "id", "aggResults", aggSpecs);
+     *    
+     * // Create a stream of JsonObject tuples with just the average "rx"
+     * TStream<JsonObject> avgRx = aggResults.map(
+     *    jo -> {
+     *      JsonObject result = new JsonObject();
+     *      result.add("id", jo.get("id"))
+     *      result.add("avgRx", getMvAggregate(jo, "aggResults", "Rx", Statistic.MEAN);
+     *      return result;
+     *    });
+     * }</pre>
+     * 
+     * @param window the window to compute aggregations over
+     * @param resultPartitionKeyProperty name of the partition key property in the result
+     * @param resultProperty name of the aggregation results property in the result
+     * @param aggregateSpecs see {@link #mkAggregationSpec(String, JsonUnivariateAggregate...) mkAggregationSpec()}
+     * @return TStream<JsonObject> with aggregation results
+     * 
+     * @see #mvAggregateList(String, String, List) mvAggregateList()
+     * @see #mkAggregationSpec(String, JsonUnivariateAggregate...) mkAggregationSpec()
+     * @see #getMvAggregate(JsonObject, String, String, JsonUnivariateAggregate) getMvAggregate()
+     */
+    public static <K extends JsonElement> TStream<JsonObject> mvAggregate(
+        TWindow<JsonObject, K> window,
+        String resultPartitionKeyProperty,
+        String resultProperty,
+        List<Pair<String, JsonUnivariateAggregate[]>> aggregateSpecs) {
+
+      return window.aggregate(mvAggregateList(
+              resultPartitionKeyProperty,
+              resultProperty,
+              aggregateSpecs
+              ));
+    }
+
+    /**
+     * Create an aggregation specification.
+     * <P>
+     * The aggregation specification specifies a variable name and
+     * the aggregates to compute on it.
+     * </P>
+     * <P>
+     * The specification can be use with {@link #mvAggregateList(String, String, List) mkAggregateList()}
+     * 
+     * @param variableName the name of a {@code Numeric} data variable in a JSON object 
+     * @param aggregates the aggregates to compute for the variable
+     * @return the aggregation specification
+     */
+    public static Pair<String, JsonUnivariateAggregate[]> 
+    mkAggregationSpec(String variableName, JsonUnivariateAggregate... aggregates) {
+      return new Pair<String, JsonUnivariateAggregate[]>(variableName, aggregates);
+    }
+    
+    /**
+     * Create a Function that aggregates multiple {@code Numeric}
+     * variables contained in an JSON object.
+     * <P>
+     * This is a multi-variable analog of {@link JsonAnalytics#aggregateList(String, String, org.apache.edgent.function.ToDoubleFunction, JsonUnivariateAggregate...) aggregateList()}
+     * <P>
+     * The overall multi-variable aggregation result is a JSON object
+     * with properties:
+     * <ul>
+     * <li>{@code resultPartionKeyProperty} whose value is the tuple's partition key
+     * <li>{@code resultProperty} whose value is a JSON object containing
+     *     a property for each variable aggregation.  The property names
+     *     correspond to the variable names from the {@code aggregateSpecs}
+     *     and the values are the aggregation results for the variable.
+     *     The aggregation results for a variable are a JSON object 
+     *     having a property for each aggregation name and its value.</li>
+     * </ul>
+     * <P>
+     * For example if the list contains these three tuples (pseudo JSON) for
+     * partition 3:
+     * <BR>
+     * <code>{id=3,tx=2.0,rx=1.0,...}, {id=3,tx=2.6,rx=2.0,...}, {id=3,tx=1.8,rx=3.0,...}</code>
+     * <BR>
+     * the resulting aggregation JsonObject returned is:
+     * <BR>
+     * <code>{id=3, aggData={tx={MIN=1.8, MAX=2.6}, rx={MEAN=2.0}}}</code>
+     * <BR>
+     * for the invocation:
+     * <BR>
+     * <code>mvAggregateList("id", "aggData", aggSpecs).apply(list, 3))</code>
+     * <BR>
+     * where {@code aggSpecs} is:
+     * <BR>
+     * {@code
+     * aggSpecs.add(mkAggregationSpec("tx", Statistics.MIN, Statistics.MAX));
+     * aggSpecs.add(mkAggregationSpec("rx", Statistics.MEAN));
+     * }
+     * </P>
+     * <P>
+     * {@link #getMvAggregate(JsonObject, String, String, JsonUnivariateAggregate) getMvAggregate()}
+     * can be used to extract individual aggregate values from the result.
+     * </P>
+     * 
+     * @param <K> Partition Key as a JsonElement
+     * 
+     * @param resultPartitionKeyProperty name of the partition key property in the result
+     * @param resultProperty name of the aggregation results property in the result
+     * @param aggregateSpecs see {@link #mkAggregationSpec(String, JsonUnivariateAggregate...) mkAggregationSpec()}
+     * @return Function that performs the aggregations.
+     * 
+     * @see #mkAggregationSpec(String, JsonUnivariateAggregate...) mkAggregationSpec()
+     * @see #getMvAggregate(JsonObject, String, String, JsonUnivariateAggregate) getMvAggregate()
+     */
+    public static <K extends JsonElement> 
+    BiFunction<List<JsonObject>, K, JsonObject> mvAggregateList(
+        String resultPartitionKeyProperty, String resultProperty,
+        List<Pair<String, JsonUnivariateAggregate[]>> aggregateSpecs) {
+      
+      BiFunction<List<JsonObject>, K, JsonObject> function = 
+        (joList, partition) -> {
+          JsonObject joResult = new JsonObject();
+          joResult.add(resultPartitionKeyProperty, partition);
+          
+          JsonObject aggregateResults = new JsonObject();
+          joResult.add(resultProperty, aggregateResults);
+          
+          for (Pair<String, JsonUnivariateAggregate[]> p : aggregateSpecs) {
+            String variableName = p.getFirst();
+            JsonUnivariateAggregate[] aggregates = p.getSecond();
+            
+            // Compute the aggregates for the variable
+            JsonObject jo2 = JsonAnalytics.aggregateList(resultPartitionKeyProperty,
+                resultProperty, jo -> jo.get(variableName).getAsDouble(),
+                aggregates).apply(joList,  partition);
+            
+            // Add the variable's aggregates result to the result
+            aggregateResults.add(variableName, jo2.get(resultProperty).getAsJsonObject());
+          }
+        
+          return joResult;
+        };
+        
+        return function;
+    }
+    
+    /**
+     * Get the value of an aggregate computed by a multi-variable aggregation.
+     * <P>
+     * This convenience method can be used to extract information from a JSON object
+     * created by {@link #mvAggregateList(String, String, List) mvAggregationList()}
+     * or {@link #mvAggregate(TWindow, String, String, List) mvAggregate()}
+     * </P>
+     * <P>
+     * Sample use:
+     * <pre>{@code
+     * ...
+     * TStream<JsonObject> aggData = mvAggregate(window, "id", "aggResults", aggSpecs);
+     * 
+     * // Create a stream of JsonObject tuples with just the average "tx"
+     * TStream<JsonObject> avgTx = aggResults.map(
+     *    jo -> {
+     *      JsonObject result = new JsonObject();
+     *      result.add(partitionKeyName, jo.get(partitionKeyName))
+     *      result.add("avgTx", getMvAggregate(jo, "aggResults", "tx", Statistic.MEAN);
+     *      return result;
+     *    });
+     * }</pre>
+     * 
+     * @param jo a JSON object created by {@code mvAggregationList}
+     * @param resultProperty the corresponding value passed to {@code mvAggragateList}
+     * @param variableName the data variable of interest in the multivariable aggregates
+     * @param aggregate the variable's aggregate of interest
+     * @return the variable's aggregate's value as a JsonElement
+     * @throws RuntimeException if the aggregate isn't present in the result
+     * 
+     * @see #hasMvAggregate(JsonObject, String, String, JsonUnivariateAggregate) hasAggregate()
+     * @see #mvAggregate(TWindow, String, String, List) mvAggregate()
+     * @see #mvAggregateList(String, String, List) mvAggregateList()
+     */
+    public static JsonElement getMvAggregate(JsonObject jo, String resultProperty, String variableName, JsonUnivariateAggregate aggregate) {
+      return jo.get(resultProperty).getAsJsonObject()
+              .get(variableName).getAsJsonObject()
+              .get(aggregate.name());
+    }
+
+    /**
+     * Check if an aggregation result from a multi-variable aggregation
+     * is present.
+     * 
+     * @param jo a JSON object created by {@code mvAggregationList}
+     * @param resultProperty the corresponding value passed to {@code mvAggragateList}
+     * @param variableName the data variable of interest in the multivariable aggregates
+     * @param aggregate the variable's aggregate of interest
+     * @return true if the specified aggregate is present in the jo, false otherwise.
+     * 
+     * @see #getMvAggregate(JsonObject, String, String, JsonUnivariateAggregate) getMvAggregate()
+     */
+    public static boolean hasMvAggregate(JsonObject jo, String resultProperty, String variableName, JsonUnivariateAggregate aggregate) {
+      JsonElement je = jo.get(resultProperty);
+      if (je != null && je.isJsonObject()) {
+        JsonObject jo2 = je.getAsJsonObject();
+        je = jo2.get(variableName);
+        if (je != null && je.isJsonObject()) {
+          jo2 = je.getAsJsonObject();
+          je = jo2.get(aggregate.name());
+          if (je != null)
+            return true;
+        }
+      }
+      return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregate.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregate.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregate.java
new file mode 100644
index 0000000..f0ff3f6
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregate.java
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.json;
+
+import org.apache.edgent.function.Supplier;
+
+/**
+ * Univariate aggregate for a JSON tuple.
+ * This is the declaration of the aggregate that
+ * application use when declaring a topology.
+ * <P>
+ * Implementations are typically enums such
+ * as {@link org.apache.edgent.analytics.math3.stat.Statistic Statistic}.
+ * </P>
+ * <P>
+ * Each call to {@code get()} must return a new
+ * {@link JsonUnivariateAggregator aggregator}
+ * that implements the required aggregate.
+ * </P>
+ * 
+ * @see JsonAnalytics
+ */
+public interface JsonUnivariateAggregate extends Supplier<JsonUnivariateAggregator>{
+    
+    /**
+     * JSON key used for representation of the number
+     * of tuples that were aggregated. Value is {@value}.
+     */
+    public static final String N = "N";
+    
+    /**
+     * Name of the aggregate.
+     * The returned value is used as the JSON key containing
+     * the result of the aggregation.
+     * @return Name of the aggregate.
+     */
+    public String name();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregator.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregator.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregator.java
new file mode 100644
index 0000000..2e50cbb
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregator.java
@@ -0,0 +1,55 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.json;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * Univariate aggregator for JSON tuples.
+ * This is the runtime implementation interface
+ * of {@link JsonUnivariateAggregate} defined aggregate.
+ */
+public interface JsonUnivariateAggregator {
+    
+    /**
+     * Clear the aggregator to prepare for a new aggregation.
+     * @param partitionKey Partition key.
+     * @param n Number of tuples to be aggregated.
+     */
+    void clear(JsonElement partitionKey, int n);
+    
+    /**
+     * Add a value to the aggregation. 
+     * @param value Value to be added.
+     */
+    void increment(double value);
+    
+    /**
+     * Place the result of the aggregation into the {@code result}
+     * object. The key for the result must be
+     * {@link JsonUnivariateAggregate#name()} for the corresponding
+     * aggregate. The value of the aggregation may be a primitive value
+     * such as a {@code double} or any valid JSON element.
+     * 
+     * @param partitionKey Partition key.
+     * @param result JSON object holding the result.
+     */
+    void result(JsonElement partitionKey, JsonObject result);
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/package-info.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/package-info.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/package-info.java
new file mode 100644
index 0000000..be11e22
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * JSON analytics using Apache Commons Math.
+ */
+package org.apache.edgent.analytics.math3.json;
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonOLS.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonOLS.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonOLS.java
new file mode 100644
index 0000000..c1d351b
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonOLS.java
@@ -0,0 +1,74 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.stat;
+
+import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression;
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregator;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+class JsonOLS implements JsonUnivariateAggregator {
+    
+    private final Regression type;
+    private final OLSMultipleLinearRegression ols = new OLSMultipleLinearRegression();
+    private double[] values;
+    private int yOffset;
+    
+    JsonOLS(Regression type) {
+        this.type = type;
+    }
+
+    @Override
+    public void clear(JsonElement partition, int n) {
+        values = new double[n*2];
+        yOffset = 0;
+    }
+
+    @Override
+    public void increment(double v) {  
+        values[yOffset] = v;
+        yOffset+=2;
+    }
+    
+    void setSampleData() {
+        // Fill  in the x values
+        for (int x = 0; x < values.length/2; x++)
+            values[(x*2)+1] = x;
+        ols.newSampleData(values, values.length/2, 1);
+    }
+    @Override
+    public void result(JsonElement partition, JsonObject result) {
+        // If there are no values or only a single
+        // value then we cannot calculate tne slope.
+        if (values.length <= 2)
+            return;
+            
+        setSampleData();
+        double[] regressionParams = ols.estimateRegressionParameters();
+        if (regressionParams.length >= 2) {
+            // [0] is the constant (zero'th order)
+            // [1] is the first order , which we use as the slope.
+            final double slope = regressionParams[1];
+            if (Double.isFinite(slope))
+                result.addProperty(type.name(), slope);
+        }
+        values = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonStorelessStatistic.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonStorelessStatistic.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonStorelessStatistic.java
new file mode 100644
index 0000000..702f7c3
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonStorelessStatistic.java
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.stat;
+
+import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregator;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * JSON univariate aggregator implementation wrapping a {@code StorelessUnivariateStatistic}.
+ */
+public class JsonStorelessStatistic implements JsonUnivariateAggregator {
+        
+    private final Statistic stat;
+    private final StorelessUnivariateStatistic statImpl;
+    
+    public JsonStorelessStatistic(Statistic stat, StorelessUnivariateStatistic statImpl) {
+        this.stat = stat;
+        this.statImpl = statImpl;
+    }
+
+    @Override
+    public void clear(JsonElement partition, int n) {
+        statImpl.clear();
+    }
+
+    @Override
+    public void increment(double v) {
+        statImpl.increment(v);
+    }
+
+    @Override
+    public void result(JsonElement partition, JsonObject result) {        
+        double rv = statImpl.getResult();
+        
+        if (Double.isFinite(rv))
+            result.addProperty(stat.name(), rv);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression.java
new file mode 100644
index 0000000..02fc4d0
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression.java
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.stat;
+
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate;
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregator;
+
+/**
+ * Univariate regression aggregates.
+ *
+ */
+public enum Regression implements JsonUnivariateAggregate {
+    
+    /**
+     * Calculate the slope of a single variable.
+     * The slope is calculated using the first
+     * order of a ordinary least squares
+     * linear regression.
+     * The list of values for the single
+     * single variable are processed as Y-values
+     * that are evenly spaced on the X-axis.
+     * <BR>
+     * This is useful as a simple determination
+     * if the variable is increasing or decreasing.
+     * <BR>
+     * The slope value is represented as a {@code double}
+     * with the key {@code SLOPE} in the aggregate result.
+     * <BR>
+     * If the window to be aggregated contains less than
+     * two values then no regression is performed.
+     */
+    SLOPE() {
+        @Override
+        public JsonUnivariateAggregator get() {
+            return new JsonOLS(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic.java
new file mode 100644
index 0000000..1b4b0c1
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic.java
@@ -0,0 +1,85 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.stat;
+
+import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
+import org.apache.commons.math3.stat.descriptive.moment.Mean;
+import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
+import org.apache.commons.math3.stat.descriptive.rank.Max;
+import org.apache.commons.math3.stat.descriptive.rank.Min;
+import org.apache.commons.math3.stat.descriptive.summary.Sum;
+import org.apache.edgent.analytics.math3.json.JsonAnalytics;
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate;
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregator;
+
+/**
+ * Statistic implementations.
+ * 
+ * Univariate statistic aggregate calculations against a value
+ * extracted from a {@code JsonObject}.
+ * 
+ * @see JsonAnalytics
+ */
+public enum Statistic implements JsonUnivariateAggregate {
+    
+    /**
+     * Calculate the arithmetic mean.
+     * The mean value is represented as a {@code double}
+     * with the key {@code MEAN} in the aggregate result.
+     */
+    MEAN(new Mean()),
+    /**
+     * Calculate the minimum.
+     * The minimum value is represented as a {@code double}
+     * with the key {@code MIN} in the aggregate result.
+     */
+    MIN(new Min()),
+    /**
+     * Calculate the maximum.
+     * The maximum value is represented as a {@code double}
+     * with the key {@code MAXIMUM} in the aggregate result.
+     */
+    MAX(new Max()),
+    /**
+     * Calculate the sum.
+     * The sum is represented as a {@code double}
+     * with the key {@code SUM} in the aggregate result.
+     */
+    SUM(new Sum()),
+    /**
+     * Calculate the standard deviation.
+     */
+    STDDEV(new StandardDeviation());
+
+    private final StorelessUnivariateStatistic statImpl;
+
+    private Statistic(StorelessUnivariateStatistic statImpl) {
+        this.statImpl = statImpl;
+        statImpl.clear();
+    }
+
+    /**
+     * Return a new instance of this statistic implementation.
+     * @return A new instance of this statistic implementation.
+     */
+    @Override
+    public JsonUnivariateAggregator get() {
+        return new JsonStorelessStatistic(this, statImpl.copy());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/package-info.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/package-info.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/package-info.java
new file mode 100644
index 0000000..1112268
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Statistical algorithms using Apache Commons Math.
+ */
+package org.apache.edgent.analytics.math3.stat;
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/test/java/edgent/test/analytics/math3/StatisticsTest.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/test/java/edgent/test/analytics/math3/StatisticsTest.java b/analytics/math3/src/test/java/edgent/test/analytics/math3/StatisticsTest.java
deleted file mode 100644
index 59c68c3..0000000
--- a/analytics/math3/src/test/java/edgent/test/analytics/math3/StatisticsTest.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.analytics.math3;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.math3.util.Pair;
-import org.junit.Test;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import edgent.analytics.math3.json.JsonAnalytics;
-import edgent.analytics.math3.json.JsonUnivariateAggregate;
-import edgent.analytics.math3.stat.Regression;
-import edgent.analytics.math3.stat.Statistic;
-import edgent.test.providers.direct.DirectTestSetup;
-import edgent.test.topology.TopologyAbstractTest;
-import edgent.topology.TStream;
-import edgent.topology.TWindow;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-
-public class StatisticsTest  extends TopologyAbstractTest implements DirectTestSetup {
-	@Test
-	public void testMin() throws Exception {
-	    Topology topology = newTopology("testMin");
-	    
-	    TStream<JsonObject> aggregate = aggregate(topology, Statistic.MIN);
-	    
-        Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
-        Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
-        complete(topology, count);
-        assertTrue(count.valid());
-        
-        List<JsonObject> tuples = contents.getResult();
-        assertEquals(11, tuples.size());
-        
-        assertOutputStructure(tuples, Statistic.MIN);
-        
-        // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0", "C700"
-        assertResult(tuples, Statistic.MIN, 0, "A", 1.0);
-        assertResult(tuples, Statistic.MIN, 1, "B", 7.0);
-        assertResult(tuples, Statistic.MIN, 2, "C", 4.0);
-        
-        assertResult(tuples, Statistic.MIN, 3, "A", 1.0);
-        assertResult(tuples, Statistic.MIN, 4, "B", 3.0);
-        assertResult(tuples, Statistic.MIN, 5, "C", 4.0);
-        
-        assertResult(tuples, Statistic.MIN, 6, "A", 4.0);
-        assertResult(tuples, Statistic.MIN, 7, "B", 3.0);
-        assertResult(tuples, Statistic.MIN, 8, "B", 13.0);
-        
-        assertResult(tuples, Statistic.MIN, 9, "A", 0.0);
-        assertResult(tuples, Statistic.MIN, 10, "C", 99.0);
-	}
-	
-    @Test
-    public void testMaxMean() throws Exception {
-        Topology topology = newTopology("testMaxMean");
-
-        TStream<JsonObject> aggregate = aggregate(topology, Statistic.MAX, Statistic.MEAN);
-
-        Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
-        Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
-        complete(topology, count);
-        assertTrue(count.valid());
-
-        List<JsonObject> tuples = contents.getResult();
-        assertEquals(11, tuples.size());
-
-        assertOutputStructure(tuples, Statistic.MAX, Statistic.MEAN);
-
-        // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0",
-        // "C700"
-        assertResult(tuples, Statistic.MAX, 0, "A", 1.0);
-        assertResult(tuples, Statistic.MAX, 1, "B", 7.0);
-        assertResult(tuples, Statistic.MAX, 2, "C", 4.0);
-
-        assertResult(tuples, Statistic.MAX, 3, "A", 4.0);
-        assertResult(tuples, Statistic.MAX, 4, "B", 7.0);
-        assertResult(tuples, Statistic.MAX, 5, "C", 99.0);
-
-        assertResult(tuples, Statistic.MAX, 6, "A", 102.0);
-        assertResult(tuples, Statistic.MAX, 7, "B", 43.0);
-        assertResult(tuples, Statistic.MAX, 8, "B", 43.0);
-
-        assertResult(tuples, Statistic.MAX, 9, "A", 102.0);
-        assertResult(tuples, Statistic.MAX, 10, "C", 700.0);
-        
-        assertResult(tuples, Statistic.MEAN, 0, "A", 1.0);
-        assertResult(tuples, Statistic.MEAN, 1, "B", 7.0);
-        assertResult(tuples, Statistic.MEAN, 2, "C", 4.0);
-
-        assertResult(tuples, Statistic.MEAN, 3, "A", 2.5);
-        assertResult(tuples, Statistic.MEAN, 4, "B", 5.0);
-        assertResult(tuples, Statistic.MEAN, 5, "C", 51.5);
-
-        assertResult(tuples, Statistic.MEAN, 6, "A", 53.0);
-        assertResult(tuples, Statistic.MEAN, 7, "B", 23.0);
-        assertResult(tuples, Statistic.MEAN, 8, "B", 28.0);
-
-        assertResult(tuples, Statistic.MEAN, 9, "A", 51.0);
-        assertResult(tuples, Statistic.MEAN, 10, "C", 399.5);
-    }
-    
-    @Test
-    public void testSlope() throws Exception {
-        Topology topology = newTopology("testSlope");
-        
-        TStream<JsonObject> aggregate = aggregate(topology, Regression.SLOPE);
-        
-        Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
-        Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
-        complete(topology, count);
-        assertTrue(count.valid());
-        
-        List<JsonObject> tuples = contents.getResult();
-        assertEquals(11, tuples.size());
-                
-        // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0", "C700"
-        assertResult(tuples, Regression.SLOPE, 0, "A", null);
-        assertResult(tuples, Regression.SLOPE, 1, "B", null);
-        assertResult(tuples, Regression.SLOPE, 2, "C", null);
-        
-        assertResult(tuples, Regression.SLOPE, 3, "A", 3.0);
-        assertResult(tuples, Regression.SLOPE, 4, "B", -4.0);
-        assertResult(tuples, Regression.SLOPE, 5, "C", 95.0);
-        
-        assertResult(tuples, Regression.SLOPE, 6, "A", 98.0);
-        assertResult(tuples, Regression.SLOPE, 7, "B", 40.0);
-        assertResult(tuples, Regression.SLOPE, 8, "B", -30.0);
-        
-        assertResult(tuples, Regression.SLOPE, 9, "A", -102.0);
-        assertResult(tuples, Regression.SLOPE, 10, "C", 601.0);
-    }
-    
-    @Test
-    public void testMvMaxMean() throws Exception {
-        Topology topology = newTopology("testMvMaxMean");
-
-        TStream<JsonObject> aggregate = mvAggregate(topology, Statistic.MAX, Statistic.MEAN);
-
-        Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
-        Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
-        complete(topology, count);
-        assertTrue(count.valid());
-
-        List<JsonObject> tuples = contents.getResult();
-        assertEquals(11, tuples.size());
-
-        assertMvOutputStructure(tuples, Statistic.MAX, Statistic.MEAN);
-
-        // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0",
-        // "C700"
-        assertMvResult(tuples, Statistic.MAX, 0, "A", 1.0);
-        assertMvResult(tuples, Statistic.MAX, 1, "B", 7.0);
-        assertMvResult(tuples, Statistic.MAX, 2, "C", 4.0);
-
-        assertMvResult(tuples, Statistic.MAX, 3, "A", 4.0);
-        assertMvResult(tuples, Statistic.MAX, 4, "B", 7.0);
-        assertMvResult(tuples, Statistic.MAX, 5, "C", 99.0);
-
-        assertMvResult(tuples, Statistic.MAX, 6, "A", 102.0);
-        assertMvResult(tuples, Statistic.MAX, 7, "B", 43.0);
-        assertMvResult(tuples, Statistic.MAX, 8, "B", 43.0);
-
-        assertMvResult(tuples, Statistic.MAX, 9, "A", 102.0);
-        assertMvResult(tuples, Statistic.MAX, 10, "C", 700.0);
-        
-        assertMvResult(tuples, Statistic.MEAN, 0, "A", 1.0);
-        assertMvResult(tuples, Statistic.MEAN, 1, "B", 7.0);
-        assertMvResult(tuples, Statistic.MEAN, 2, "C", 4.0);
-
-        assertMvResult(tuples, Statistic.MEAN, 3, "A", 2.5);
-        assertMvResult(tuples, Statistic.MEAN, 4, "B", 5.0);
-        assertMvResult(tuples, Statistic.MEAN, 5, "C", 51.5);
-
-        assertMvResult(tuples, Statistic.MEAN, 6, "A", 53.0);
-        assertMvResult(tuples, Statistic.MEAN, 7, "B", 23.0);
-        assertMvResult(tuples, Statistic.MEAN, 8, "B", 28.0);
-
-        assertMvResult(tuples, Statistic.MEAN, 9, "A", 51.0);
-        assertMvResult(tuples, Statistic.MEAN, 10, "C", 399.5);
-    }
-	
-	private static void assertResult(List<JsonObject> tuples, JsonUnivariateAggregate stat, int index, String key, Double value) {
-	    JsonObject tuple = tuples.get(index);
-	    assertEquals(key, tuple.get("id").getAsString());
-	    
-	    JsonObject agg = tuple.getAsJsonObject("value");
-        if (value != null) {
-            double result = agg.get(stat.name()).getAsDouble();
-            assertEquals("index:" + index, value, result, 0.01);
-        } else {
-            assertFalse(agg.has(stat.name()));
-        }
-	}
-  
-  private static void assertMvResult(List<JsonObject> tuples, JsonUnivariateAggregate stat, int index, String key, Double value) {
-      JsonObject tuple = tuples.get(index);
-      assertEquals(key, tuple.get("id").getAsString());
-      
-      if (value != null) {
-        Double result = JsonAnalytics.getMvAggregate(tuple, "aggResults", "value", stat).getAsDouble();
-        assertEquals("index:" + index + " value "+stat, value, result, 0.01);
-  
-        Double result2 = JsonAnalytics.getMvAggregate(tuple, "aggResults", "value2", stat).getAsDouble();
-        assertEquals("index:" + index + " value2 "+stat, value+1000, result2, 0.01);
-      }
-      else {
-        assertFalse("index:" + index + " value "+stat, JsonAnalytics.hasMvAggregate(tuple, "aggResults", "value", stat));
-        assertFalse("index:" + index + " value2 "+stat, JsonAnalytics.hasMvAggregate(tuple, "aggResults", "value2", stat));
-      }
-  }
-	
-	public static void assertOutputStructure(List<JsonObject> tuples, JsonUnivariateAggregate ... stats) {
-	    for (JsonObject j : tuples) {
-	        assertTrue(j.has("id")); // Value of the key
-	        assertTrue(j.has("value")); // Value of the key
-	        
-	        JsonObject v = j.getAsJsonObject("value");
-	        for (JsonUnivariateAggregate stat : stats) {
-	            assertTrue(v.has(stat.name()));
-	        }
-	    }
-	}
-  
-  public static void assertMvOutputStructure(List<JsonObject> tuples, JsonUnivariateAggregate ... stats) {
-      for (JsonObject j : tuples) {
-          assertTrue(j.has("id")); // Value of the key
-          assertTrue(j.has("aggResults")); // Value of the key
-          
-          for (JsonUnivariateAggregate stat : stats) {
-            assertTrue("value "+stat, JsonAnalytics.hasMvAggregate(j, "aggResults", "value", stat));
-          }
-          for (JsonUnivariateAggregate stat : stats) {
-            assertTrue("value2 "+stat, JsonAnalytics.hasMvAggregate(j, "aggResults", "value2", stat));
-          }
-      }
-  }
-	
-	public static TStream<JsonObject> aggregate(Topology topology, JsonUnivariateAggregate ... stats) {
-	       TStream<JsonObject> sourceData = sourceData(topology);
-	        
-	        TWindow<JsonObject, JsonElement> window = sourceData.last(2, j -> j.get("id"));
-	        
-	        return JsonAnalytics.aggregate(window, "id", "value",
-	                j -> j.get("value").getAsDouble(), stats);
-	}
-	
-	public static TStream<JsonObject> sourceData(Topology topology)
-	{
-	       TStream<String> seed = topology.strings("A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13", "A0", "C700");
-	        
-	        return seed.map(s -> {
-	            JsonObject j = new JsonObject();
-	            j.addProperty("id", s.substring(0, 1));
-	            j.addProperty("value", Integer.valueOf(s.substring(1)));
-	            return j;
-	        });
-	}
-  
-  public static TStream<JsonObject> mvAggregate(Topology topology, JsonUnivariateAggregate ... stats) {
-         TStream<JsonObject> sourceData = sourceMvData(topology);
-          
-          TWindow<JsonObject, JsonElement> window = sourceData.last(2, j -> j.get("id"));
-          
-          List<Pair<String, JsonUnivariateAggregate[]>> aggSpecs = new ArrayList<>();
-          aggSpecs.add(JsonAnalytics.mkAggregationSpec("value", stats));
-          aggSpecs.add(JsonAnalytics.mkAggregationSpec("value2", stats));
-          
-          return JsonAnalytics.mvAggregate(window, "id", "aggResults", aggSpecs);
-  }
-  
-  /*
-   * same JsonObject as sourceData() but with an additional 
-   * "value2" variable whose value is the the "value" variable's value + 1000 
-   */
-  public static TStream<JsonObject> sourceMvData(Topology topology)
-  {
-    return sourceData(topology)
-        .map(jo -> {
-          jo.addProperty("value2", jo.get("value").getAsLong() + 1000);
-          return jo;
-        });
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/StatisticsTest.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/StatisticsTest.java b/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/StatisticsTest.java
new file mode 100644
index 0000000..012ee2f
--- /dev/null
+++ b/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/StatisticsTest.java
@@ -0,0 +1,309 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.analytics.math3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.edgent.analytics.math3.json.JsonAnalytics;
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate;
+import org.apache.edgent.analytics.math3.stat.Regression;
+import org.apache.edgent.analytics.math3.stat.Statistic;
+import org.apache.edgent.test.providers.direct.DirectTestSetup;
+import org.apache.edgent.test.topology.TopologyAbstractTest;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TWindow;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Test;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+public class StatisticsTest  extends TopologyAbstractTest implements DirectTestSetup {
+	@Test
+	public void testMin() throws Exception {
+	    Topology topology = newTopology("testMin");
+	    
+	    TStream<JsonObject> aggregate = aggregate(topology, Statistic.MIN);
+	    
+        Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
+        Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
+        complete(topology, count);
+        assertTrue(count.valid());
+        
+        List<JsonObject> tuples = contents.getResult();
+        assertEquals(11, tuples.size());
+        
+        assertOutputStructure(tuples, Statistic.MIN);
+        
+        // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0", "C700"
+        assertResult(tuples, Statistic.MIN, 0, "A", 1.0);
+        assertResult(tuples, Statistic.MIN, 1, "B", 7.0);
+        assertResult(tuples, Statistic.MIN, 2, "C", 4.0);
+        
+        assertResult(tuples, Statistic.MIN, 3, "A", 1.0);
+        assertResult(tuples, Statistic.MIN, 4, "B", 3.0);
+        assertResult(tuples, Statistic.MIN, 5, "C", 4.0);
+        
+        assertResult(tuples, Statistic.MIN, 6, "A", 4.0);
+        assertResult(tuples, Statistic.MIN, 7, "B", 3.0);
+        assertResult(tuples, Statistic.MIN, 8, "B", 13.0);
+        
+        assertResult(tuples, Statistic.MIN, 9, "A", 0.0);
+        assertResult(tuples, Statistic.MIN, 10, "C", 99.0);
+	}
+	
+    @Test
+    public void testMaxMean() throws Exception {
+        Topology topology = newTopology("testMaxMean");
+
+        TStream<JsonObject> aggregate = aggregate(topology, Statistic.MAX, Statistic.MEAN);
+
+        Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
+        Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
+        complete(topology, count);
+        assertTrue(count.valid());
+
+        List<JsonObject> tuples = contents.getResult();
+        assertEquals(11, tuples.size());
+
+        assertOutputStructure(tuples, Statistic.MAX, Statistic.MEAN);
+
+        // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0",
+        // "C700"
+        assertResult(tuples, Statistic.MAX, 0, "A", 1.0);
+        assertResult(tuples, Statistic.MAX, 1, "B", 7.0);
+        assertResult(tuples, Statistic.MAX, 2, "C", 4.0);
+
+        assertResult(tuples, Statistic.MAX, 3, "A", 4.0);
+        assertResult(tuples, Statistic.MAX, 4, "B", 7.0);
+        assertResult(tuples, Statistic.MAX, 5, "C", 99.0);
+
+        assertResult(tuples, Statistic.MAX, 6, "A", 102.0);
+        assertResult(tuples, Statistic.MAX, 7, "B", 43.0);
+        assertResult(tuples, Statistic.MAX, 8, "B", 43.0);
+
+        assertResult(tuples, Statistic.MAX, 9, "A", 102.0);
+        assertResult(tuples, Statistic.MAX, 10, "C", 700.0);
+        
+        assertResult(tuples, Statistic.MEAN, 0, "A", 1.0);
+        assertResult(tuples, Statistic.MEAN, 1, "B", 7.0);
+        assertResult(tuples, Statistic.MEAN, 2, "C", 4.0);
+
+        assertResult(tuples, Statistic.MEAN, 3, "A", 2.5);
+        assertResult(tuples, Statistic.MEAN, 4, "B", 5.0);
+        assertResult(tuples, Statistic.MEAN, 5, "C", 51.5);
+
+        assertResult(tuples, Statistic.MEAN, 6, "A", 53.0);
+        assertResult(tuples, Statistic.MEAN, 7, "B", 23.0);
+        assertResult(tuples, Statistic.MEAN, 8, "B", 28.0);
+
+        assertResult(tuples, Statistic.MEAN, 9, "A", 51.0);
+        assertResult(tuples, Statistic.MEAN, 10, "C", 399.5);
+    }
+    
+    @Test
+    public void testSlope() throws Exception {
+        Topology topology = newTopology("testSlope");
+        
+        TStream<JsonObject> aggregate = aggregate(topology, Regression.SLOPE);
+        
+        Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
+        Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
+        complete(topology, count);
+        assertTrue(count.valid());
+        
+        List<JsonObject> tuples = contents.getResult();
+        assertEquals(11, tuples.size());
+                
+        // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0", "C700"
+        assertResult(tuples, Regression.SLOPE, 0, "A", null);
+        assertResult(tuples, Regression.SLOPE, 1, "B", null);
+        assertResult(tuples, Regression.SLOPE, 2, "C", null);
+        
+        assertResult(tuples, Regression.SLOPE, 3, "A", 3.0);
+        assertResult(tuples, Regression.SLOPE, 4, "B", -4.0);
+        assertResult(tuples, Regression.SLOPE, 5, "C", 95.0);
+        
+        assertResult(tuples, Regression.SLOPE, 6, "A", 98.0);
+        assertResult(tuples, Regression.SLOPE, 7, "B", 40.0);
+        assertResult(tuples, Regression.SLOPE, 8, "B", -30.0);
+        
+        assertResult(tuples, Regression.SLOPE, 9, "A", -102.0);
+        assertResult(tuples, Regression.SLOPE, 10, "C", 601.0);
+    }
+    
+    @Test
+    public void testMvMaxMean() throws Exception {
+        Topology topology = newTopology("testMvMaxMean");
+
+        TStream<JsonObject> aggregate = mvAggregate(topology, Statistic.MAX, Statistic.MEAN);
+
+        Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
+        Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
+        complete(topology, count);
+        assertTrue(count.valid());
+
+        List<JsonObject> tuples = contents.getResult();
+        assertEquals(11, tuples.size());
+
+        assertMvOutputStructure(tuples, Statistic.MAX, Statistic.MEAN);
+
+        // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0",
+        // "C700"
+        assertMvResult(tuples, Statistic.MAX, 0, "A", 1.0);
+        assertMvResult(tuples, Statistic.MAX, 1, "B", 7.0);
+        assertMvResult(tuples, Statistic.MAX, 2, "C", 4.0);
+
+        assertMvResult(tuples, Statistic.MAX, 3, "A", 4.0);
+        assertMvResult(tuples, Statistic.MAX, 4, "B", 7.0);
+        assertMvResult(tuples, Statistic.MAX, 5, "C", 99.0);
+
+        assertMvResult(tuples, Statistic.MAX, 6, "A", 102.0);
+        assertMvResult(tuples, Statistic.MAX, 7, "B", 43.0);
+        assertMvResult(tuples, Statistic.MAX, 8, "B", 43.0);
+
+        assertMvResult(tuples, Statistic.MAX, 9, "A", 102.0);
+        assertMvResult(tuples, Statistic.MAX, 10, "C", 700.0);
+        
+        assertMvResult(tuples, Statistic.MEAN, 0, "A", 1.0);
+        assertMvResult(tuples, Statistic.MEAN, 1, "B", 7.0);
+        assertMvResult(tuples, Statistic.MEAN, 2, "C", 4.0);
+
+        assertMvResult(tuples, Statistic.MEAN, 3, "A", 2.5);
+        assertMvResult(tuples, Statistic.MEAN, 4, "B", 5.0);
+        assertMvResult(tuples, Statistic.MEAN, 5, "C", 51.5);
+
+        assertMvResult(tuples, Statistic.MEAN, 6, "A", 53.0);
+        assertMvResult(tuples, Statistic.MEAN, 7, "B", 23.0);
+        assertMvResult(tuples, Statistic.MEAN, 8, "B", 28.0);
+
+        assertMvResult(tuples, Statistic.MEAN, 9, "A", 51.0);
+        assertMvResult(tuples, Statistic.MEAN, 10, "C", 399.5);
+    }
+	
+	private static void assertResult(List<JsonObject> tuples, JsonUnivariateAggregate stat, int index, String key, Double value) {
+	    JsonObject tuple = tuples.get(index);
+	    assertEquals(key, tuple.get("id").getAsString());
+	    
+	    JsonObject agg = tuple.getAsJsonObject("value");
+        if (value != null) {
+            double result = agg.get(stat.name()).getAsDouble();
+            assertEquals("index:" + index, value, result, 0.01);
+        } else {
+            assertFalse(agg.has(stat.name()));
+        }
+	}
+  
+  private static void assertMvResult(List<JsonObject> tuples, JsonUnivariateAggregate stat, int index, String key, Double value) {
+      JsonObject tuple = tuples.get(index);
+      assertEquals(key, tuple.get("id").getAsString());
+      
+      if (value != null) {
+        Double result = JsonAnalytics.getMvAggregate(tuple, "aggResults", "value", stat).getAsDouble();
+        assertEquals("index:" + index + " value "+stat, value, result, 0.01);
+  
+        Double result2 = JsonAnalytics.getMvAggregate(tuple, "aggResults", "value2", stat).getAsDouble();
+        assertEquals("index:" + index + " value2 "+stat, value+1000, result2, 0.01);
+      }
+      else {
+        assertFalse("index:" + index + " value "+stat, JsonAnalytics.hasMvAggregate(tuple, "aggResults", "value", stat));
+        assertFalse("index:" + index + " value2 "+stat, JsonAnalytics.hasMvAggregate(tuple, "aggResults", "value2", stat));
+      }
+  }
+	
+	public static void assertOutputStructure(List<JsonObject> tuples, JsonUnivariateAggregate ... stats) {
+	    for (JsonObject j : tuples) {
+	        assertTrue(j.has("id")); // Value of the key
+	        assertTrue(j.has("value")); // Value of the key
+	        
+	        JsonObject v = j.getAsJsonObject("value");
+	        for (JsonUnivariateAggregate stat : stats) {
+	            assertTrue(v.has(stat.name()));
+	        }
+	    }
+	}
+  
+  public static void assertMvOutputStructure(List<JsonObject> tuples, JsonUnivariateAggregate ... stats) {
+      for (JsonObject j : tuples) {
+          assertTrue(j.has("id")); // Value of the key
+          assertTrue(j.has("aggResults")); // Value of the key
+          
+          for (JsonUnivariateAggregate stat : stats) {
+            assertTrue("value "+stat, JsonAnalytics.hasMvAggregate(j, "aggResults", "value", stat));
+          }
+          for (JsonUnivariateAggregate stat : stats) {
+            assertTrue("value2 "+stat, JsonAnalytics.hasMvAggregate(j, "aggResults", "value2", stat));
+          }
+      }
+  }
+	
+	public static TStream<JsonObject> aggregate(Topology topology, JsonUnivariateAggregate ... stats) {
+	       TStream<JsonObject> sourceData = sourceData(topology);
+	        
+	        TWindow<JsonObject, JsonElement> window = sourceData.last(2, j -> j.get("id"));
+	        
+	        return JsonAnalytics.aggregate(window, "id", "value",
+	                j -> j.get("value").getAsDouble(), stats);
+	}
+	
+	public static TStream<JsonObject> sourceData(Topology topology)
+	{
+	       TStream<String> seed = topology.strings("A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13", "A0", "C700");
+	        
+	        return seed.map(s -> {
+	            JsonObject j = new JsonObject();
+	            j.addProperty("id", s.substring(0, 1));
+	            j.addProperty("value", Integer.valueOf(s.substring(1)));
+	            return j;
+	        });
+	}
+  
+  public static TStream<JsonObject> mvAggregate(Topology topology, JsonUnivariateAggregate ... stats) {
+         TStream<JsonObject> sourceData = sourceMvData(topology);
+          
+          TWindow<JsonObject, JsonElement> window = sourceData.last(2, j -> j.get("id"));
+          
+          List<Pair<String, JsonUnivariateAggregate[]>> aggSpecs = new ArrayList<>();
+          aggSpecs.add(JsonAnalytics.mkAggregationSpec("value", stats));
+          aggSpecs.add(JsonAnalytics.mkAggregationSpec("value2", stats));
+          
+          return JsonAnalytics.mvAggregate(window, "id", "aggResults", aggSpecs);
+  }
+  
+  /*
+   * same JsonObject as sourceData() but with an additional 
+   * "value2" variable whose value is the the "value" variable's value + 1000 
+   */
+  public static TStream<JsonObject> sourceMvData(Topology topology)
+  {
+    return sourceData(topology)
+        .map(jo -> {
+          jo.addProperty("value2", jo.get("value").getAsLong() + 1000);
+          return jo;
+        });
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadband.java
----------------------------------------------------------------------
diff --git a/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadband.java b/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadband.java
deleted file mode 100644
index b7c63d5..0000000
--- a/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadband.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.analytics.sensors;
-
-import java.util.concurrent.TimeUnit;
-
-import edgent.function.Function;
-import edgent.function.Predicate;
-
-/**
- * Deadband predicate function.
- *
- * @param <T> Tuple type.
- * @param <V> Value type for the deadband function.
- */
-class Deadband<T, V> implements Predicate<T> {
-
-    private static final long serialVersionUID = 1L;
-
-    private final Function<T, V> valueFunction;
-    private final Predicate<V> inBand;
-    private final long period;
-    private final TimeUnit unit;
-
-    // Always send the first value.
-    private transient boolean outOfBand = true;
-
-    private transient long lastSend;
-    
-    Deadband(Function<T, V> valueFunction, Predicate<V> deadbandFunction) {
-        this(valueFunction , deadbandFunction, 0, null);
-    }
-
-    Deadband(Function<T, V> valueFunction, Predicate<V> inBand, long period, TimeUnit unit) {
-        this.valueFunction = valueFunction;
-        this.inBand = inBand;
-        this.period = period;
-        this.unit = unit;
-    }
-
-    @Override
-    public boolean test(final T t) {
-        final V value = valueFunction.apply(t);
-        boolean passTuple;
-        long now = 0;
-        if (!inBand.test(value)) {
-            outOfBand = true;
-            passTuple = true;
-        } else if (outOfBand) {
-            // When the value returns to being in-band
-            // send the in-band value.
-            outOfBand = false;
-            passTuple = true;
-        } else {
-        	passTuple = false;
-        	if (period != 0) {
-                now = System.currentTimeMillis();
-                long sinceLast = unit.convert(now - lastSend, TimeUnit.MILLISECONDS);
-                if (sinceLast > period)
-                     passTuple = true;
-        	}
-        }
-
-        if (passTuple && period != 0)
-            lastSend = now == 0 ? System.currentTimeMillis() : now;
-            
-        return passTuple;
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadtime.java
----------------------------------------------------------------------
diff --git a/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadtime.java b/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadtime.java
deleted file mode 100644
index 1d93241..0000000
--- a/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadtime.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package edgent.analytics.sensors;
-
-import java.util.Date;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-
-import edgent.function.Predicate;
-
-/**
- * Deadtime {@link Predicate}.
- * <p>
- * {@link #test(Object) test()} returns true on its initial call
- * and then false for any calls occurring during the following deadtime period.
- * After the end of a deadtime period, the next call to {@code test()} 
- * returns true and a new deadtime period is begun.
- * </p><p>
- * The deadtime period may be changed while the topology is running
- * via {@link #setPeriod(long, TimeUnit)}.
- * </p>
- *
- * @param <T> tuple type
- * @see Filters#deadtime(edgent.topology.TStream, long, TimeUnit) Filters.deadtime()
- */
-public class Deadtime<T> implements Predicate<T> {
-    private static final long serialVersionUID = 1L;
-    private long deadtimePeriodMillis;
-    private long lastTrueTimeMillis;
-    private volatile long nextTrueTimeMillis;
-
-    /**
-     * Create a new Deadtime Predicate
-     * <p>
-     * Same as {@code Deadtime(0, TimeUnit.SECONDS)}
-     */
-    public Deadtime() {
-        setPeriod(0, TimeUnit.SECONDS);
-    }
-    
-    /**
-     * Create a new Deadtime Predicate
-     * <p>
-     * The first received tuple is always "accepted".
-     * @param deadtimePeriod see {@link #setPeriod(long, TimeUnit) setPeriod()}
-     * @param unit {@link TimeUnit} of {@code deadtimePeriod}
-     */
-    public Deadtime(long deadtimePeriod, TimeUnit unit) {
-        setPeriod(deadtimePeriod, unit);
-    }
-    
-    /**
-     * Set the deadtime period
-     * <p>
-     * The end of a currently active deadtime period is shortened or extended
-     * to match the new deadtime period specification.
-     * </p><p>
-     * The deadtime period behavior is subject to the accuracy
-     * of the system's {@link System#currentTimeMillis()}.
-     * A period of less than 1ms is equivalent to specifying 0.
-     * </p>
-     * @param deadtimePeriod the amount of time for {@code test()}
-     *        to return false after returning true.
-     *        Specify a value of 0 for no deadtime period.
-     *        Must be &gt;= 0.
-     * @param unit {@link TimeUnit} of {@code deadtimePeriod}
-     */
-    public synchronized void setPeriod(long deadtimePeriod, TimeUnit unit) {
-        if (deadtimePeriod < 0)
-            throw new IllegalArgumentException("deadtimePeriod");
-        Objects.requireNonNull(unit, "unit");
-        deadtimePeriodMillis = unit.toMillis(deadtimePeriod);
-        nextTrueTimeMillis = lastTrueTimeMillis + deadtimePeriodMillis;
-    }
-
-    /**
-     * Test the deadtime predicate.
-     * @param value ignored
-     * @return false if in a deadtime period, true otherwise
-     */
-    @Override
-    public boolean test(T value) {
-        long now = System.currentTimeMillis(); 
-        if (now < nextTrueTimeMillis)
-            return false;
-        else synchronized(this) {
-            lastTrueTimeMillis = now;
-            nextTrueTimeMillis = now + deadtimePeriodMillis;
-            return true;
-        }
-    }
-
-    /**
-     * Returns a String for development/debug support.  Content subject to change.
-     */
-    @Override
-    public String toString() {
-        return "nextPass after "+new Date(nextTrueTimeMillis);
-    }
-    
-}
\ No newline at end of file