You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:51 UTC
[50/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/edgent/analytics/math3/stat/JsonStorelessStatistic.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/edgent/analytics/math3/stat/JsonStorelessStatistic.java b/analytics/math3/src/main/java/edgent/analytics/math3/stat/JsonStorelessStatistic.java
deleted file mode 100644
index 66bff5b..0000000
--- a/analytics/math3/src/main/java/edgent/analytics/math3/stat/JsonStorelessStatistic.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.analytics.math3.stat;
-
-import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import edgent.analytics.math3.json.JsonUnivariateAggregator;
-
-/**
- * JSON univariate aggregator implementation wrapping a {@code StorelessUnivariateStatistic}.
- */
-public class JsonStorelessStatistic implements JsonUnivariateAggregator {
-
- private final Statistic stat;
- private final StorelessUnivariateStatistic statImpl;
-
- public JsonStorelessStatistic(Statistic stat, StorelessUnivariateStatistic statImpl) {
- this.stat = stat;
- this.statImpl = statImpl;
- }
-
- @Override
- public void clear(JsonElement partition, int n) {
- statImpl.clear();
- }
-
- @Override
- public void increment(double v) {
- statImpl.increment(v);
- }
-
- @Override
- public void result(JsonElement partition, JsonObject result) {
- double rv = statImpl.getResult();
-
- if (Double.isFinite(rv))
- result.addProperty(stat.name(), rv);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/edgent/analytics/math3/stat/Regression.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/edgent/analytics/math3/stat/Regression.java b/analytics/math3/src/main/java/edgent/analytics/math3/stat/Regression.java
deleted file mode 100644
index 6e3b76c..0000000
--- a/analytics/math3/src/main/java/edgent/analytics/math3/stat/Regression.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.analytics.math3.stat;
-
-import edgent.analytics.math3.json.JsonUnivariateAggregate;
-import edgent.analytics.math3.json.JsonUnivariateAggregator;
-
-/**
- * Univariate regression aggregates.
- *
- */
-public enum Regression implements JsonUnivariateAggregate {
-
- /**
- * Calculate the slope of a single variable.
- * The slope is calculated using the first
- * order of a ordinary least squares
- * linear regression.
- * The list of values for the single
- * single variable are processed as Y-values
- * that are evenly spaced on the X-axis.
- * <BR>
- * This is useful as a simple determination
- * if the variable is increasing or decreasing.
- * <BR>
- * The slope value is represented as a {@code double}
- * with the key {@code SLOPE} in the aggregate result.
- * <BR>
- * If the window to be aggregated contains less than
- * two values then no regression is performed.
- */
- SLOPE() {
- @Override
- public JsonUnivariateAggregator get() {
- return new JsonOLS(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/edgent/analytics/math3/stat/Statistic.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/edgent/analytics/math3/stat/Statistic.java b/analytics/math3/src/main/java/edgent/analytics/math3/stat/Statistic.java
deleted file mode 100644
index 49f2a92..0000000
--- a/analytics/math3/src/main/java/edgent/analytics/math3/stat/Statistic.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.analytics.math3.stat;
-
-import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
-import org.apache.commons.math3.stat.descriptive.moment.Mean;
-import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
-import org.apache.commons.math3.stat.descriptive.rank.Max;
-import org.apache.commons.math3.stat.descriptive.rank.Min;
-import org.apache.commons.math3.stat.descriptive.summary.Sum;
-
-import edgent.analytics.math3.json.JsonAnalytics;
-import edgent.analytics.math3.json.JsonUnivariateAggregate;
-import edgent.analytics.math3.json.JsonUnivariateAggregator;
-
-/**
- * Statistic implementations.
- *
- * Univariate statistic aggregate calculations against a value
- * extracted from a {@code JsonObject}.
- *
- * @see JsonAnalytics
- */
-public enum Statistic implements JsonUnivariateAggregate {
-
- /**
- * Calculate the arithmetic mean.
- * The mean value is represented as a {@code double}
- * with the key {@code MEAN} in the aggregate result.
- */
- MEAN(new Mean()),
- /**
- * Calculate the minimum.
- * The minimum value is represented as a {@code double}
- * with the key {@code MIN} in the aggregate result.
- */
- MIN(new Min()),
- /**
- * Calculate the maximum.
- * The maximum value is represented as a {@code double}
- * with the key {@code MAXIMUM} in the aggregate result.
- */
- MAX(new Max()),
- /**
- * Calculate the sum.
- * The sum is represented as a {@code double}
- * with the key {@code SUM} in the aggregate result.
- */
- SUM(new Sum()),
- /**
- * Calculate the standard deviation.
- */
- STDDEV(new StandardDeviation());
-
- private final StorelessUnivariateStatistic statImpl;
-
- private Statistic(StorelessUnivariateStatistic statImpl) {
- this.statImpl = statImpl;
- statImpl.clear();
- }
-
- /**
- * Return a new instance of this statistic implementation.
- * @return A new instance of this statistic implementation.
- */
- @Override
- public JsonUnivariateAggregator get() {
- return new JsonStorelessStatistic(this, statImpl.copy());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/edgent/analytics/math3/stat/package-info.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/edgent/analytics/math3/stat/package-info.java b/analytics/math3/src/main/java/edgent/analytics/math3/stat/package-info.java
deleted file mode 100644
index c95591f..0000000
--- a/analytics/math3/src/main/java/edgent/analytics/math3/stat/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Statistical algorithms using Apache Commons Math.
- */
-package edgent.analytics.math3.stat;
-
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java
new file mode 100644
index 0000000..c6e9108
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonAnalytics.java
@@ -0,0 +1,443 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.json;
+
+import java.util.List;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.ToDoubleFunction;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TWindow;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * Apache Common Math analytics for streams with JSON tuples.
+ *
+ */
+public class JsonAnalytics {
+
+ /**
+ * Aggregate against a single {@code Numeric} variable contained in an JSON object.
+ *
+ * The returned stream contains a tuple for each execution performed against a window partition.
+ * The tuple is a {@code JsonObject} containing:
+ * <UL>
+ * <LI> Partition key of type {@code K} as a property with key {@code resultPartitionProperty}. </LI>
+ * <LI> Aggregation results as a {@code JsonObject} as a property with key {@code valueProperty}.
+ * This results object contains the results of all aggregations defined by {@code aggregates} against
+ * {@code double} property with key {@code valueProperty}.
+ * <BR>
+ * Each {@link JsonUnivariateAggregate} declares how it represents its aggregation in this result
+ * object.
+ * </LI>
+ * </UL>
+ * <P>
+ * For example if the window contains these three tuples (pseudo JSON) for
+ * partition 3:
+ * <BR>
+ * <code>{id=3,reading=2.0}, {id=3,reading=2.6}, {id=3,reading=1.8}</code>
+ * <BR>
+ * the resulting aggregation for the stream returned by:
+ * <BR>
+ * {@code aggregate(window, "id", "reading", Statistic.MIN, Statistic.MAX)}
+ * <BR>
+ * would contain this tuple with the maximum and minimum values in the {@code reading}
+ * JSON object:
+ * <BR>
+ * <code>{id=3, reading={MIN=1.8, MAX=1.8}}</code>
+ * </P>
+ * @param <K> Partition type
+ *
+ * @param window Window to aggregate over.
+ * @param resultPartitionProperty Property to store the partition key in tuples on the returned stream.
+ * @param valueProperty JSON property containing the value to aggregate.
+ * @param aggregates Which aggregations to be performed.
+ * @return Stream that will contain aggregations.
+ */
+ public static <K extends JsonElement> TStream<JsonObject> aggregate(
+ TWindow<JsonObject, K> window,
+ String resultPartitionProperty,
+ String valueProperty,
+ JsonUnivariateAggregate... aggregates) {
+ return aggregate(window, resultPartitionProperty, valueProperty, j -> j.get(valueProperty).getAsDouble(), aggregates);
+
+ }
+
+ /**
+ * Aggregate against a single {@code Numeric} variable contained in an JSON object.
+ *
+ * The returned stream contains a tuple for each execution performed against a window partition.
+ * The tuple is a {@code JsonObject} containing:
+ * <UL>
+ * <LI> Partition key of type {@code K} as a property with key {@code resultPartitionProperty}. </LI>
+ * <LI> Aggregation results as a {@code JsonObject} as a property with key {@code resultProperty}.
+ * This results object contains the results of all aggregations defined by {@code aggregates} against
+ * value returned by {@code valueGetter}.
+ * <BR>
+ * Each {@link JsonUnivariateAggregate} declares how it represents its aggregation in this result
+ * object.
+ * </LI>
+ * </UL>
+ *
+ * @param <K> Partition type
+ * @param window Window to aggregate over.
+ * @param resultPartitionProperty Property to store the partition key in tuples on the returned stream.
+ * @param resultProperty Property to store the aggregations in tuples on the returned stream.
+ * @param valueGetter How to obtain the single variable from input tuples.
+ * @param aggregates Which aggregations to be performed.
+ * @return Stream that will contain aggregations.
+ */
+ public static <K extends JsonElement> TStream<JsonObject> aggregate(
+ TWindow<JsonObject, K> window,
+ String resultPartitionProperty,
+ String resultProperty,
+ ToDoubleFunction<JsonObject> valueGetter,
+ JsonUnivariateAggregate... aggregates) {
+
+ return window.aggregate(aggregateList(
+ resultPartitionProperty,
+ resultProperty,
+ valueGetter,
+ aggregates
+ ));
+ }
+
+ /**
+ * Create a Function that aggregates against a single {@code Numeric}
+ * variable contained in an JSON object.
+ *
+ * Calling {@code apply(List<JsonObject>)} on the returned {@code BiFunction}
+ * returns a {@link JsonObject} containing:
+ * <UL>
+ * <LI> Partition key of type {@code K} as a property with key {@code resultPartitionProperty}. </LI>
+ * <LI> Aggregation results as a {@code JsonObject} as a property with key {@code valueProperty}.
+ * This results object contains the results of all aggregations defined by {@code aggregates}
+ * against the value returned by {@code valueGetter}.
+ * <BR>
+ * Each {@link JsonUnivariateAggregate} declares how it represents its aggregation in this result
+ * object.
+ * </LI>
+ * </UL>
+ * <P>
+ * For example if the list contains these three tuples (pseudo JSON) for
+ * partition 3:
+ * <BR>
+ * <code>{id=3,reading=2.0}, {id=3,reading=2.6}, {id=3,reading=1.8}</code>
+ * <BR>
+ * the resulting aggregation for the JsonObject returned by:
+ * <BR>
+ * {@code aggregateList("id", "reading", Statistic.MIN, Statistic.MAX).apply(list, 3)}
+ * <BR>
+ * would be this tuple with the maximum and minimum values in the {@code reading}
+ * JSON object:
+ * <BR>
+ * <code>{id=3, reading={MIN=1.8, MAX=1.8}}</code>
+ * </P>
+ * @param <K> Partition type
+ *
+ * @param resultPartitionProperty Property to store the partition key in tuples on the returned stream.
+ * @param resultProperty Property to store the aggregations in the returned JsonObject.
+ * @param valueGetter How to obtain the single variable from input tuples.
+ * @param aggregates Which aggregations to be performed.
+ * @return Function that performs the aggregations.
+ */
+ public static <K extends JsonElement>
+ BiFunction<List<JsonObject>, K, JsonObject> aggregateList(
+ String resultPartitionProperty,
+ String resultProperty,
+ ToDoubleFunction<JsonObject> valueGetter,
+ JsonUnivariateAggregate... aggregates) {
+
+ BiFunction<List<JsonObject>, K, JsonObject> function = (tuples, partition) -> {
+
+ final JsonUnivariateAggregator[] aggregators = new JsonUnivariateAggregator[aggregates.length];
+ for (int i = 0; i < aggregates.length; i++) {
+ aggregators[i] = aggregates[i].get();
+ }
+
+ final JsonObject result = new JsonObject();
+ result.add(resultPartitionProperty, partition);
+ JsonObject aggregateResults = new JsonObject();
+ result.add(resultProperty, aggregateResults);
+
+ final int n = tuples.size();
+ aggregateResults.addProperty(JsonUnivariateAggregate.N, n);
+
+ if (n != 0) {
+
+ for (JsonUnivariateAggregator agg : aggregators) {
+ agg.clear(partition, n);
+ }
+ for (JsonObject tuple : tuples) {
+ double v = valueGetter.applyAsDouble(tuple);
+ for (JsonUnivariateAggregator agg : aggregators) {
+ agg.increment(v);
+ }
+ }
+ for (JsonUnivariateAggregator agg : aggregators) {
+ agg.result(partition, aggregateResults);
+ }
+ }
+
+ return result;
+ };
+
+ return function;
+ }
+
+ /**
+ * Aggregate against multiple {@code Numeric} variables contained in an JSON object.
+ * <P>
+ * This is a multi-variable analog of {@link #aggregate(TWindow, String, String, JsonUnivariateAggregate...) aggregate()}
+ * </P>
+ * <P>
+ * See {@link #mvAggregateList(String, String, List) mvAggregateList()} for
+ * a description of the aggregation processing and result stream.
+ * </P>
+ * <P>
+ * Sample use:
+ * <pre>{@code
+ * // Ingest the data. The JsonObject tuples have properties:
+ * // "id" - the partitionKey
+ * // "tx" - a numeric data variable
+ * // "rx" - a numeric data variable
+ * TStream<JsonObject> ingestData = ...
+ *
+ * // Define the tuple variables and their aggregations to compute
+ * List<Pair<String, JsonUnivariateAggregate[]>> aggSpecs = new ArrayList<>();
+ * aggSpecs.add(mkAggregationSpec("tx", Statistics.MIN, Statistics.MAX));
+ * aggSpecs.add(mkAggregationSpec("rx", Statistics.MEAN));
+ *
+ * // Create the window over which to aggregate
+ * TWindow<JsonObject, JsonElement> window =
+ * ingestData.last(5, TimeUnit.SECONDS, jo -> jo.get("id"));
+ *
+ * // Create a stream with the aggregations. The result tuples have properties:
+ * // "id" - the partitionKey
+ * // "aggResults" - the aggregation results
+ * TStream<JsonObject> aggResults =
+ * mvAggregate(window, "id", "aggResults", aggSpecs);
+ *
+ * // Create a stream of JsonObject tuples with just the average "rx"
+ * TStream<JsonObject> avgRx = aggResults.map(
+ * jo -> {
+ * JsonObject result = new JsonObject();
+ * result.add("id", jo.get("id"))
+ * result.add("avgRx", getMvAggregate(jo, "aggResults", "Rx", Statistic.MEAN);
+ * return result;
+ * });
+ * }</pre>
+ *
+ * @param window the window to compute aggregations over
+ * @param resultPartitionKeyProperty name of the partition key property in the result
+ * @param resultProperty name of the aggregation results property in the result
+ * @param aggregateSpecs see {@link #mkAggregationSpec(String, JsonUnivariateAggregate...) mkAggregationSpec()}
+ * @return TStream<JsonObject> with aggregation results
+ *
+ * @see #mvAggregateList(String, String, List) mvAggregateList()
+ * @see #mkAggregationSpec(String, JsonUnivariateAggregate...) mkAggregationSpec()
+ * @see #getMvAggregate(JsonObject, String, String, JsonUnivariateAggregate) getMvAggregate()
+ */
+ public static <K extends JsonElement> TStream<JsonObject> mvAggregate(
+ TWindow<JsonObject, K> window,
+ String resultPartitionKeyProperty,
+ String resultProperty,
+ List<Pair<String, JsonUnivariateAggregate[]>> aggregateSpecs) {
+
+ return window.aggregate(mvAggregateList(
+ resultPartitionKeyProperty,
+ resultProperty,
+ aggregateSpecs
+ ));
+ }
+
+ /**
+ * Create an aggregation specification.
+ * <P>
+ * The aggregation specification specifies a variable name and
+ * the aggregates to compute on it.
+ * </P>
+ * <P>
+ * The specification can be use with {@link #mvAggregateList(String, String, List) mkAggregateList()}
+ *
+ * @param variableName the name of a {@code Numeric} data variable in a JSON object
+ * @param aggregates the aggregates to compute for the variable
+ * @return the aggregation specification
+ */
+ public static Pair<String, JsonUnivariateAggregate[]>
+ mkAggregationSpec(String variableName, JsonUnivariateAggregate... aggregates) {
+ return new Pair<String, JsonUnivariateAggregate[]>(variableName, aggregates);
+ }
+
+ /**
+ * Create a Function that aggregates multiple {@code Numeric}
+ * variables contained in an JSON object.
+ * <P>
+ * This is a multi-variable analog of {@link JsonAnalytics#aggregateList(String, String, org.apache.edgent.function.ToDoubleFunction, JsonUnivariateAggregate...) aggregateList()}
+ * <P>
+ * The overall multi-variable aggregation result is a JSON object
+ * with properties:
+ * <ul>
+ * <li>{@code resultPartionKeyProperty} whose value is the tuple's partition key
+ * <li>{@code resultProperty} whose value is a JSON object containing
+ * a property for each variable aggregation. The property names
+ * correspond to the variable names from the {@code aggregateSpecs}
+ * and the values are the aggregation results for the variable.
+ * The aggregation results for a variable are a JSON object
+ * having a property for each aggregation name and its value.</li>
+ * </ul>
+ * <P>
+ * For example if the list contains these three tuples (pseudo JSON) for
+ * partition 3:
+ * <BR>
+ * <code>{id=3,tx=2.0,rx=1.0,...}, {id=3,tx=2.6,rx=2.0,...}, {id=3,tx=1.8,rx=3.0,...}</code>
+ * <BR>
+ * the resulting aggregation JsonObject returned is:
+ * <BR>
+ * <code>{id=3, aggData={tx={MIN=1.8, MAX=2.6}, rx={MEAN=2.0}}}</code>
+ * <BR>
+ * for the invocation:
+ * <BR>
+ * <code>mvAggregateList("id", "aggData", aggSpecs).apply(list, 3))</code>
+ * <BR>
+ * where {@code aggSpecs} is:
+ * <BR>
+ * {@code
+ * aggSpecs.add(mkAggregationSpec("tx", Statistics.MIN, Statistics.MAX));
+ * aggSpecs.add(mkAggregationSpec("rx", Statistics.MEAN));
+ * }
+ * </P>
+ * <P>
+ * {@link #getMvAggregate(JsonObject, String, String, JsonUnivariateAggregate) getMvAggregate()}
+ * can be used to extract individual aggregate values from the result.
+ * </P>
+ *
+ * @param <K> Partition Key as a JsonElement
+ *
+ * @param resultPartitionKeyProperty name of the partition key property in the result
+ * @param resultProperty name of the aggregation results property in the result
+ * @param aggregateSpecs see {@link #mkAggregationSpec(String, JsonUnivariateAggregate...) mkAggregationSpec()}
+ * @return Function that performs the aggregations.
+ *
+ * @see #mkAggregationSpec(String, JsonUnivariateAggregate...) mkAggregationSpec()
+ * @see #getMvAggregate(JsonObject, String, String, JsonUnivariateAggregate) getMvAggregate()
+ */
+ public static <K extends JsonElement>
+ BiFunction<List<JsonObject>, K, JsonObject> mvAggregateList(
+ String resultPartitionKeyProperty, String resultProperty,
+ List<Pair<String, JsonUnivariateAggregate[]>> aggregateSpecs) {
+
+ BiFunction<List<JsonObject>, K, JsonObject> function =
+ (joList, partition) -> {
+ JsonObject joResult = new JsonObject();
+ joResult.add(resultPartitionKeyProperty, partition);
+
+ JsonObject aggregateResults = new JsonObject();
+ joResult.add(resultProperty, aggregateResults);
+
+ for (Pair<String, JsonUnivariateAggregate[]> p : aggregateSpecs) {
+ String variableName = p.getFirst();
+ JsonUnivariateAggregate[] aggregates = p.getSecond();
+
+ // Compute the aggregates for the variable
+ JsonObject jo2 = JsonAnalytics.aggregateList(resultPartitionKeyProperty,
+ resultProperty, jo -> jo.get(variableName).getAsDouble(),
+ aggregates).apply(joList, partition);
+
+ // Add the variable's aggregates result to the result
+ aggregateResults.add(variableName, jo2.get(resultProperty).getAsJsonObject());
+ }
+
+ return joResult;
+ };
+
+ return function;
+ }
+
+ /**
+ * Get the value of an aggregate computed by a multi-variable aggregation.
+ * <P>
+ * This convenience method can be used to extract information from a JSON object
+ * created by {@link #mvAggregateList(String, String, List) mvAggregationList()}
+ * or {@link #mvAggregate(TWindow, String, String, List) mvAggregate()}
+ * </P>
+ * <P>
+ * Sample use:
+ * <pre>{@code
+ * ...
+ * TStream<JsonObject> aggData = mvAggregate(window, "id", "aggResults", aggSpecs);
+ *
+ * // Create a stream of JsonObject tuples with just the average "tx"
+ * TStream<JsonObject> avgTx = aggResults.map(
+ * jo -> {
+ * JsonObject result = new JsonObject();
+ * result.add(partitionKeyName, jo.get(partitionKeyName))
+ * result.add("avgTx", getMvAggregate(jo, "aggResults", "tx", Statistic.MEAN);
+ * return result;
+ * });
+ * }</pre>
+ *
+ * @param jo a JSON object created by {@code mvAggregationList}
+ * @param resultProperty the corresponding value passed to {@code mvAggragateList}
+ * @param variableName the data variable of interest in the multivariable aggregates
+ * @param aggregate the variable's aggregate of interest
+ * @return the variable's aggregate's value as a JsonElement
+ * @throws RuntimeException if the aggregate isn't present in the result
+ *
+ * @see #hasMvAggregate(JsonObject, String, String, JsonUnivariateAggregate) hasAggregate()
+ * @see #mvAggregate(TWindow, String, String, List) mvAggregate()
+ * @see #mvAggregateList(String, String, List) mvAggregateList()
+ */
+ public static JsonElement getMvAggregate(JsonObject jo, String resultProperty, String variableName, JsonUnivariateAggregate aggregate) {
+ return jo.get(resultProperty).getAsJsonObject()
+ .get(variableName).getAsJsonObject()
+ .get(aggregate.name());
+ }
+
+ /**
+ * Check if an aggregation result from a multi-variable aggregation
+ * is present.
+ *
+ * @param jo a JSON object created by {@code mvAggregationList}
+ * @param resultProperty the corresponding value passed to {@code mvAggragateList}
+ * @param variableName the data variable of interest in the multivariable aggregates
+ * @param aggregate the variable's aggregate of interest
+ * @return true if the specified aggregate is present in the jo, false otherwise.
+ *
+ * @see #getMvAggregate(JsonObject, String, String, JsonUnivariateAggregate) getMvAggregate()
+ */
+ public static boolean hasMvAggregate(JsonObject jo, String resultProperty, String variableName, JsonUnivariateAggregate aggregate) {
+ JsonElement je = jo.get(resultProperty);
+ if (je != null && je.isJsonObject()) {
+ JsonObject jo2 = je.getAsJsonObject();
+ je = jo2.get(variableName);
+ if (je != null && je.isJsonObject()) {
+ jo2 = je.getAsJsonObject();
+ je = jo2.get(aggregate.name());
+ if (je != null)
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregate.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregate.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregate.java
new file mode 100644
index 0000000..f0ff3f6
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregate.java
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.json;
+
+import org.apache.edgent.function.Supplier;
+
+/**
+ * Univariate aggregate for a JSON tuple.
+ * This is the declaration of the aggregate that
+ * application use when declaring a topology.
+ * <P>
+ * Implementations are typically enums such
+ * as {@link org.apache.edgent.analytics.math3.stat.Statistic Statistic}.
+ * </P>
+ * <P>
+ * Each call to {@code get()} must return a new
+ * {@link JsonUnivariateAggregator aggregator}
+ * that implements the required aggregate.
+ * </P>
+ *
+ * @see JsonAnalytics
+ */
+public interface JsonUnivariateAggregate extends Supplier<JsonUnivariateAggregator>{
+
+ /**
+ * JSON key used for representation of the number
+ * of tuples that were aggregated. Value is {@value}.
+ */
+ public static final String N = "N";
+
+ /**
+ * Name of the aggregate.
+ * The returned value is used as the JSON key containing
+ * the result of the aggregation.
+ * @return Name of the aggregate.
+ */
+ public String name();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregator.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregator.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregator.java
new file mode 100644
index 0000000..2e50cbb
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/JsonUnivariateAggregator.java
@@ -0,0 +1,55 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.json;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * Univariate aggregator for JSON tuples.
+ * This is the runtime implementation interface
+ * of {@link JsonUnivariateAggregate} defined aggregate.
+ */
+public interface JsonUnivariateAggregator {
+
+ /**
+ * Clear the aggregator to prepare for a new aggregation.
+ * @param partitionKey Partition key.
+ * @param n Number of tuples to be aggregated.
+ */
+ void clear(JsonElement partitionKey, int n);
+
+ /**
+ * Add a value to the aggregation.
+ * @param value Value to be added.
+ */
+ void increment(double value);
+
+ /**
+ * Place the result of the aggregation into the {@code result}
+ * object. The key for the result must be
+ * {@link JsonUnivariateAggregate#name()} for the corresponding
+ * aggregate. The value of the aggregation may be a primitive value
+ * such as a {@code double} or any valid JSON element.
+ *
+ * @param partitionKey Partition key.
+ * @param result JSON object holding the result.
+ */
+ void result(JsonElement partitionKey, JsonObject result);
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/package-info.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/package-info.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/package-info.java
new file mode 100644
index 0000000..be11e22
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/json/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * JSON analytics using Apache Commons Math.
+ */
+package org.apache.edgent.analytics.math3.json;
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonOLS.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonOLS.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonOLS.java
new file mode 100644
index 0000000..c1d351b
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonOLS.java
@@ -0,0 +1,74 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.stat;
+
+import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression;
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregator;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+class JsonOLS implements JsonUnivariateAggregator {
+
+ private final Regression type;
+ private final OLSMultipleLinearRegression ols = new OLSMultipleLinearRegression();
+ private double[] values;
+ private int yOffset;
+
+ JsonOLS(Regression type) {
+ this.type = type;
+ }
+
+ @Override
+ public void clear(JsonElement partition, int n) {
+ values = new double[n*2];
+ yOffset = 0;
+ }
+
+ @Override
+ public void increment(double v) {
+ values[yOffset] = v;
+ yOffset+=2;
+ }
+
+ void setSampleData() {
+ // Fill in the x values
+ for (int x = 0; x < values.length/2; x++)
+ values[(x*2)+1] = x;
+ ols.newSampleData(values, values.length/2, 1);
+ }
+ @Override
+ public void result(JsonElement partition, JsonObject result) {
+ // If there are no values or only a single
+ // value then we cannot calculate tne slope.
+ if (values.length <= 2)
+ return;
+
+ setSampleData();
+ double[] regressionParams = ols.estimateRegressionParameters();
+ if (regressionParams.length >= 2) {
+ // [0] is the constant (zero'th order)
+ // [1] is the first order , which we use as the slope.
+ final double slope = regressionParams[1];
+ if (Double.isFinite(slope))
+ result.addProperty(type.name(), slope);
+ }
+ values = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonStorelessStatistic.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonStorelessStatistic.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonStorelessStatistic.java
new file mode 100644
index 0000000..702f7c3
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/JsonStorelessStatistic.java
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.stat;
+
+import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregator;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * JSON univariate aggregator implementation wrapping a {@code StorelessUnivariateStatistic}.
+ */
+public class JsonStorelessStatistic implements JsonUnivariateAggregator {
+
+ private final Statistic stat;
+ private final StorelessUnivariateStatistic statImpl;
+
+ public JsonStorelessStatistic(Statistic stat, StorelessUnivariateStatistic statImpl) {
+ this.stat = stat;
+ this.statImpl = statImpl;
+ }
+
+ @Override
+ public void clear(JsonElement partition, int n) {
+ statImpl.clear();
+ }
+
+ @Override
+ public void increment(double v) {
+ statImpl.increment(v);
+ }
+
+ @Override
+ public void result(JsonElement partition, JsonObject result) {
+ double rv = statImpl.getResult();
+
+ if (Double.isFinite(rv))
+ result.addProperty(stat.name(), rv);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression.java
new file mode 100644
index 0000000..02fc4d0
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Regression.java
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.stat;
+
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate;
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregator;
+
+/**
+ * Univariate regression aggregates.
+ *
+ */
+public enum Regression implements JsonUnivariateAggregate {
+
+ /**
+ * Calculate the slope of a single variable.
+ * The slope is calculated using the first
+ * order of a ordinary least squares
+ * linear regression.
+ * The list of values for the single
+ * single variable are processed as Y-values
+ * that are evenly spaced on the X-axis.
+ * <BR>
+ * This is useful as a simple determination
+ * if the variable is increasing or decreasing.
+ * <BR>
+ * The slope value is represented as a {@code double}
+ * with the key {@code SLOPE} in the aggregate result.
+ * <BR>
+ * If the window to be aggregated contains less than
+ * two values then no regression is performed.
+ */
+ SLOPE() {
+ @Override
+ public JsonUnivariateAggregator get() {
+ return new JsonOLS(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic.java
new file mode 100644
index 0000000..1b4b0c1
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/Statistic.java
@@ -0,0 +1,85 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.analytics.math3.stat;
+
+import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
+import org.apache.commons.math3.stat.descriptive.moment.Mean;
+import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
+import org.apache.commons.math3.stat.descriptive.rank.Max;
+import org.apache.commons.math3.stat.descriptive.rank.Min;
+import org.apache.commons.math3.stat.descriptive.summary.Sum;
+import org.apache.edgent.analytics.math3.json.JsonAnalytics;
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate;
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregator;
+
+/**
+ * Statistic implementations.
+ *
+ * Univariate statistic aggregate calculations against a value
+ * extracted from a {@code JsonObject}.
+ *
+ * @see JsonAnalytics
+ */
+public enum Statistic implements JsonUnivariateAggregate {
+
+ /**
+ * Calculate the arithmetic mean.
+ * The mean value is represented as a {@code double}
+ * with the key {@code MEAN} in the aggregate result.
+ */
+ MEAN(new Mean()),
+ /**
+ * Calculate the minimum.
+ * The minimum value is represented as a {@code double}
+ * with the key {@code MIN} in the aggregate result.
+ */
+ MIN(new Min()),
+ /**
+ * Calculate the maximum.
+ * The maximum value is represented as a {@code double}
+ * with the key {@code MAXIMUM} in the aggregate result.
+ */
+ MAX(new Max()),
+ /**
+ * Calculate the sum.
+ * The sum is represented as a {@code double}
+ * with the key {@code SUM} in the aggregate result.
+ */
+ SUM(new Sum()),
+ /**
+ * Calculate the standard deviation.
+ */
+ STDDEV(new StandardDeviation());
+
+ private final StorelessUnivariateStatistic statImpl;
+
+ private Statistic(StorelessUnivariateStatistic statImpl) {
+ this.statImpl = statImpl;
+ statImpl.clear();
+ }
+
+ /**
+ * Return a new instance of this statistic implementation.
+ * @return A new instance of this statistic implementation.
+ */
+ @Override
+ public JsonUnivariateAggregator get() {
+ return new JsonStorelessStatistic(this, statImpl.copy());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/package-info.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/package-info.java b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/package-info.java
new file mode 100644
index 0000000..1112268
--- /dev/null
+++ b/analytics/math3/src/main/java/org/apache/edgent/analytics/math3/stat/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Statistical algorithms using Apache Commons Math.
+ */
+package org.apache.edgent.analytics.math3.stat;
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/test/java/edgent/test/analytics/math3/StatisticsTest.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/test/java/edgent/test/analytics/math3/StatisticsTest.java b/analytics/math3/src/test/java/edgent/test/analytics/math3/StatisticsTest.java
deleted file mode 100644
index 59c68c3..0000000
--- a/analytics/math3/src/test/java/edgent/test/analytics/math3/StatisticsTest.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.analytics.math3;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.math3.util.Pair;
-import org.junit.Test;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import edgent.analytics.math3.json.JsonAnalytics;
-import edgent.analytics.math3.json.JsonUnivariateAggregate;
-import edgent.analytics.math3.stat.Regression;
-import edgent.analytics.math3.stat.Statistic;
-import edgent.test.providers.direct.DirectTestSetup;
-import edgent.test.topology.TopologyAbstractTest;
-import edgent.topology.TStream;
-import edgent.topology.TWindow;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-
-public class StatisticsTest extends TopologyAbstractTest implements DirectTestSetup {
- @Test
- public void testMin() throws Exception {
- Topology topology = newTopology("testMin");
-
- TStream<JsonObject> aggregate = aggregate(topology, Statistic.MIN);
-
- Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
- Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
- complete(topology, count);
- assertTrue(count.valid());
-
- List<JsonObject> tuples = contents.getResult();
- assertEquals(11, tuples.size());
-
- assertOutputStructure(tuples, Statistic.MIN);
-
- // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0", "C700"
- assertResult(tuples, Statistic.MIN, 0, "A", 1.0);
- assertResult(tuples, Statistic.MIN, 1, "B", 7.0);
- assertResult(tuples, Statistic.MIN, 2, "C", 4.0);
-
- assertResult(tuples, Statistic.MIN, 3, "A", 1.0);
- assertResult(tuples, Statistic.MIN, 4, "B", 3.0);
- assertResult(tuples, Statistic.MIN, 5, "C", 4.0);
-
- assertResult(tuples, Statistic.MIN, 6, "A", 4.0);
- assertResult(tuples, Statistic.MIN, 7, "B", 3.0);
- assertResult(tuples, Statistic.MIN, 8, "B", 13.0);
-
- assertResult(tuples, Statistic.MIN, 9, "A", 0.0);
- assertResult(tuples, Statistic.MIN, 10, "C", 99.0);
- }
-
- @Test
- public void testMaxMean() throws Exception {
- Topology topology = newTopology("testMaxMean");
-
- TStream<JsonObject> aggregate = aggregate(topology, Statistic.MAX, Statistic.MEAN);
-
- Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
- Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
- complete(topology, count);
- assertTrue(count.valid());
-
- List<JsonObject> tuples = contents.getResult();
- assertEquals(11, tuples.size());
-
- assertOutputStructure(tuples, Statistic.MAX, Statistic.MEAN);
-
- // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0",
- // "C700"
- assertResult(tuples, Statistic.MAX, 0, "A", 1.0);
- assertResult(tuples, Statistic.MAX, 1, "B", 7.0);
- assertResult(tuples, Statistic.MAX, 2, "C", 4.0);
-
- assertResult(tuples, Statistic.MAX, 3, "A", 4.0);
- assertResult(tuples, Statistic.MAX, 4, "B", 7.0);
- assertResult(tuples, Statistic.MAX, 5, "C", 99.0);
-
- assertResult(tuples, Statistic.MAX, 6, "A", 102.0);
- assertResult(tuples, Statistic.MAX, 7, "B", 43.0);
- assertResult(tuples, Statistic.MAX, 8, "B", 43.0);
-
- assertResult(tuples, Statistic.MAX, 9, "A", 102.0);
- assertResult(tuples, Statistic.MAX, 10, "C", 700.0);
-
- assertResult(tuples, Statistic.MEAN, 0, "A", 1.0);
- assertResult(tuples, Statistic.MEAN, 1, "B", 7.0);
- assertResult(tuples, Statistic.MEAN, 2, "C", 4.0);
-
- assertResult(tuples, Statistic.MEAN, 3, "A", 2.5);
- assertResult(tuples, Statistic.MEAN, 4, "B", 5.0);
- assertResult(tuples, Statistic.MEAN, 5, "C", 51.5);
-
- assertResult(tuples, Statistic.MEAN, 6, "A", 53.0);
- assertResult(tuples, Statistic.MEAN, 7, "B", 23.0);
- assertResult(tuples, Statistic.MEAN, 8, "B", 28.0);
-
- assertResult(tuples, Statistic.MEAN, 9, "A", 51.0);
- assertResult(tuples, Statistic.MEAN, 10, "C", 399.5);
- }
-
- @Test
- public void testSlope() throws Exception {
- Topology topology = newTopology("testSlope");
-
- TStream<JsonObject> aggregate = aggregate(topology, Regression.SLOPE);
-
- Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
- Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
- complete(topology, count);
- assertTrue(count.valid());
-
- List<JsonObject> tuples = contents.getResult();
- assertEquals(11, tuples.size());
-
- // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0", "C700"
- assertResult(tuples, Regression.SLOPE, 0, "A", null);
- assertResult(tuples, Regression.SLOPE, 1, "B", null);
- assertResult(tuples, Regression.SLOPE, 2, "C", null);
-
- assertResult(tuples, Regression.SLOPE, 3, "A", 3.0);
- assertResult(tuples, Regression.SLOPE, 4, "B", -4.0);
- assertResult(tuples, Regression.SLOPE, 5, "C", 95.0);
-
- assertResult(tuples, Regression.SLOPE, 6, "A", 98.0);
- assertResult(tuples, Regression.SLOPE, 7, "B", 40.0);
- assertResult(tuples, Regression.SLOPE, 8, "B", -30.0);
-
- assertResult(tuples, Regression.SLOPE, 9, "A", -102.0);
- assertResult(tuples, Regression.SLOPE, 10, "C", 601.0);
- }
-
- @Test
- public void testMvMaxMean() throws Exception {
- Topology topology = newTopology("testMvMaxMean");
-
- TStream<JsonObject> aggregate = mvAggregate(topology, Statistic.MAX, Statistic.MEAN);
-
- Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
- Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
- complete(topology, count);
- assertTrue(count.valid());
-
- List<JsonObject> tuples = contents.getResult();
- assertEquals(11, tuples.size());
-
- assertMvOutputStructure(tuples, Statistic.MAX, Statistic.MEAN);
-
- // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0",
- // "C700"
- assertMvResult(tuples, Statistic.MAX, 0, "A", 1.0);
- assertMvResult(tuples, Statistic.MAX, 1, "B", 7.0);
- assertMvResult(tuples, Statistic.MAX, 2, "C", 4.0);
-
- assertMvResult(tuples, Statistic.MAX, 3, "A", 4.0);
- assertMvResult(tuples, Statistic.MAX, 4, "B", 7.0);
- assertMvResult(tuples, Statistic.MAX, 5, "C", 99.0);
-
- assertMvResult(tuples, Statistic.MAX, 6, "A", 102.0);
- assertMvResult(tuples, Statistic.MAX, 7, "B", 43.0);
- assertMvResult(tuples, Statistic.MAX, 8, "B", 43.0);
-
- assertMvResult(tuples, Statistic.MAX, 9, "A", 102.0);
- assertMvResult(tuples, Statistic.MAX, 10, "C", 700.0);
-
- assertMvResult(tuples, Statistic.MEAN, 0, "A", 1.0);
- assertMvResult(tuples, Statistic.MEAN, 1, "B", 7.0);
- assertMvResult(tuples, Statistic.MEAN, 2, "C", 4.0);
-
- assertMvResult(tuples, Statistic.MEAN, 3, "A", 2.5);
- assertMvResult(tuples, Statistic.MEAN, 4, "B", 5.0);
- assertMvResult(tuples, Statistic.MEAN, 5, "C", 51.5);
-
- assertMvResult(tuples, Statistic.MEAN, 6, "A", 53.0);
- assertMvResult(tuples, Statistic.MEAN, 7, "B", 23.0);
- assertMvResult(tuples, Statistic.MEAN, 8, "B", 28.0);
-
- assertMvResult(tuples, Statistic.MEAN, 9, "A", 51.0);
- assertMvResult(tuples, Statistic.MEAN, 10, "C", 399.5);
- }
-
- private static void assertResult(List<JsonObject> tuples, JsonUnivariateAggregate stat, int index, String key, Double value) {
- JsonObject tuple = tuples.get(index);
- assertEquals(key, tuple.get("id").getAsString());
-
- JsonObject agg = tuple.getAsJsonObject("value");
- if (value != null) {
- double result = agg.get(stat.name()).getAsDouble();
- assertEquals("index:" + index, value, result, 0.01);
- } else {
- assertFalse(agg.has(stat.name()));
- }
- }
-
- private static void assertMvResult(List<JsonObject> tuples, JsonUnivariateAggregate stat, int index, String key, Double value) {
- JsonObject tuple = tuples.get(index);
- assertEquals(key, tuple.get("id").getAsString());
-
- if (value != null) {
- Double result = JsonAnalytics.getMvAggregate(tuple, "aggResults", "value", stat).getAsDouble();
- assertEquals("index:" + index + " value "+stat, value, result, 0.01);
-
- Double result2 = JsonAnalytics.getMvAggregate(tuple, "aggResults", "value2", stat).getAsDouble();
- assertEquals("index:" + index + " value2 "+stat, value+1000, result2, 0.01);
- }
- else {
- assertFalse("index:" + index + " value "+stat, JsonAnalytics.hasMvAggregate(tuple, "aggResults", "value", stat));
- assertFalse("index:" + index + " value2 "+stat, JsonAnalytics.hasMvAggregate(tuple, "aggResults", "value2", stat));
- }
- }
-
- public static void assertOutputStructure(List<JsonObject> tuples, JsonUnivariateAggregate ... stats) {
- for (JsonObject j : tuples) {
- assertTrue(j.has("id")); // Value of the key
- assertTrue(j.has("value")); // Value of the key
-
- JsonObject v = j.getAsJsonObject("value");
- for (JsonUnivariateAggregate stat : stats) {
- assertTrue(v.has(stat.name()));
- }
- }
- }
-
- public static void assertMvOutputStructure(List<JsonObject> tuples, JsonUnivariateAggregate ... stats) {
- for (JsonObject j : tuples) {
- assertTrue(j.has("id")); // Value of the key
- assertTrue(j.has("aggResults")); // Value of the key
-
- for (JsonUnivariateAggregate stat : stats) {
- assertTrue("value "+stat, JsonAnalytics.hasMvAggregate(j, "aggResults", "value", stat));
- }
- for (JsonUnivariateAggregate stat : stats) {
- assertTrue("value2 "+stat, JsonAnalytics.hasMvAggregate(j, "aggResults", "value2", stat));
- }
- }
- }
-
- public static TStream<JsonObject> aggregate(Topology topology, JsonUnivariateAggregate ... stats) {
- TStream<JsonObject> sourceData = sourceData(topology);
-
- TWindow<JsonObject, JsonElement> window = sourceData.last(2, j -> j.get("id"));
-
- return JsonAnalytics.aggregate(window, "id", "value",
- j -> j.get("value").getAsDouble(), stats);
- }
-
- public static TStream<JsonObject> sourceData(Topology topology)
- {
- TStream<String> seed = topology.strings("A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13", "A0", "C700");
-
- return seed.map(s -> {
- JsonObject j = new JsonObject();
- j.addProperty("id", s.substring(0, 1));
- j.addProperty("value", Integer.valueOf(s.substring(1)));
- return j;
- });
- }
-
- public static TStream<JsonObject> mvAggregate(Topology topology, JsonUnivariateAggregate ... stats) {
- TStream<JsonObject> sourceData = sourceMvData(topology);
-
- TWindow<JsonObject, JsonElement> window = sourceData.last(2, j -> j.get("id"));
-
- List<Pair<String, JsonUnivariateAggregate[]>> aggSpecs = new ArrayList<>();
- aggSpecs.add(JsonAnalytics.mkAggregationSpec("value", stats));
- aggSpecs.add(JsonAnalytics.mkAggregationSpec("value2", stats));
-
- return JsonAnalytics.mvAggregate(window, "id", "aggResults", aggSpecs);
- }
-
- /*
- * same JsonObject as sourceData() but with an additional
- * "value2" variable whose value is the the "value" variable's value + 1000
- */
- public static TStream<JsonObject> sourceMvData(Topology topology)
- {
- return sourceData(topology)
- .map(jo -> {
- jo.addProperty("value2", jo.get("value").getAsLong() + 1000);
- return jo;
- });
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/StatisticsTest.java
----------------------------------------------------------------------
diff --git a/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/StatisticsTest.java b/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/StatisticsTest.java
new file mode 100644
index 0000000..012ee2f
--- /dev/null
+++ b/analytics/math3/src/test/java/org/apache/edgent/test/analytics/math3/StatisticsTest.java
@@ -0,0 +1,309 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.analytics.math3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.edgent.analytics.math3.json.JsonAnalytics;
+import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate;
+import org.apache.edgent.analytics.math3.stat.Regression;
+import org.apache.edgent.analytics.math3.stat.Statistic;
+import org.apache.edgent.test.providers.direct.DirectTestSetup;
+import org.apache.edgent.test.topology.TopologyAbstractTest;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TWindow;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Test;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+public class StatisticsTest extends TopologyAbstractTest implements DirectTestSetup {
+ @Test
+ public void testMin() throws Exception {
+ Topology topology = newTopology("testMin");
+
+ TStream<JsonObject> aggregate = aggregate(topology, Statistic.MIN);
+
+ Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
+ Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
+ complete(topology, count);
+ assertTrue(count.valid());
+
+ List<JsonObject> tuples = contents.getResult();
+ assertEquals(11, tuples.size());
+
+ assertOutputStructure(tuples, Statistic.MIN);
+
+ // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0", "C700"
+ assertResult(tuples, Statistic.MIN, 0, "A", 1.0);
+ assertResult(tuples, Statistic.MIN, 1, "B", 7.0);
+ assertResult(tuples, Statistic.MIN, 2, "C", 4.0);
+
+ assertResult(tuples, Statistic.MIN, 3, "A", 1.0);
+ assertResult(tuples, Statistic.MIN, 4, "B", 3.0);
+ assertResult(tuples, Statistic.MIN, 5, "C", 4.0);
+
+ assertResult(tuples, Statistic.MIN, 6, "A", 4.0);
+ assertResult(tuples, Statistic.MIN, 7, "B", 3.0);
+ assertResult(tuples, Statistic.MIN, 8, "B", 13.0);
+
+ assertResult(tuples, Statistic.MIN, 9, "A", 0.0);
+ assertResult(tuples, Statistic.MIN, 10, "C", 99.0);
+ }
+
+ @Test
+ public void testMaxMean() throws Exception {
+ Topology topology = newTopology("testMaxMean");
+
+ TStream<JsonObject> aggregate = aggregate(topology, Statistic.MAX, Statistic.MEAN);
+
+ Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
+ Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
+ complete(topology, count);
+ assertTrue(count.valid());
+
+ List<JsonObject> tuples = contents.getResult();
+ assertEquals(11, tuples.size());
+
+ assertOutputStructure(tuples, Statistic.MAX, Statistic.MEAN);
+
+ // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0",
+ // "C700"
+ assertResult(tuples, Statistic.MAX, 0, "A", 1.0);
+ assertResult(tuples, Statistic.MAX, 1, "B", 7.0);
+ assertResult(tuples, Statistic.MAX, 2, "C", 4.0);
+
+ assertResult(tuples, Statistic.MAX, 3, "A", 4.0);
+ assertResult(tuples, Statistic.MAX, 4, "B", 7.0);
+ assertResult(tuples, Statistic.MAX, 5, "C", 99.0);
+
+ assertResult(tuples, Statistic.MAX, 6, "A", 102.0);
+ assertResult(tuples, Statistic.MAX, 7, "B", 43.0);
+ assertResult(tuples, Statistic.MAX, 8, "B", 43.0);
+
+ assertResult(tuples, Statistic.MAX, 9, "A", 102.0);
+ assertResult(tuples, Statistic.MAX, 10, "C", 700.0);
+
+ assertResult(tuples, Statistic.MEAN, 0, "A", 1.0);
+ assertResult(tuples, Statistic.MEAN, 1, "B", 7.0);
+ assertResult(tuples, Statistic.MEAN, 2, "C", 4.0);
+
+ assertResult(tuples, Statistic.MEAN, 3, "A", 2.5);
+ assertResult(tuples, Statistic.MEAN, 4, "B", 5.0);
+ assertResult(tuples, Statistic.MEAN, 5, "C", 51.5);
+
+ assertResult(tuples, Statistic.MEAN, 6, "A", 53.0);
+ assertResult(tuples, Statistic.MEAN, 7, "B", 23.0);
+ assertResult(tuples, Statistic.MEAN, 8, "B", 28.0);
+
+ assertResult(tuples, Statistic.MEAN, 9, "A", 51.0);
+ assertResult(tuples, Statistic.MEAN, 10, "C", 399.5);
+ }
+
+ @Test
+ public void testSlope() throws Exception {
+ Topology topology = newTopology("testSlope");
+
+ TStream<JsonObject> aggregate = aggregate(topology, Regression.SLOPE);
+
+ Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
+ Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
+ complete(topology, count);
+ assertTrue(count.valid());
+
+ List<JsonObject> tuples = contents.getResult();
+ assertEquals(11, tuples.size());
+
+ // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0", "C700"
+ assertResult(tuples, Regression.SLOPE, 0, "A", null);
+ assertResult(tuples, Regression.SLOPE, 1, "B", null);
+ assertResult(tuples, Regression.SLOPE, 2, "C", null);
+
+ assertResult(tuples, Regression.SLOPE, 3, "A", 3.0);
+ assertResult(tuples, Regression.SLOPE, 4, "B", -4.0);
+ assertResult(tuples, Regression.SLOPE, 5, "C", 95.0);
+
+ assertResult(tuples, Regression.SLOPE, 6, "A", 98.0);
+ assertResult(tuples, Regression.SLOPE, 7, "B", 40.0);
+ assertResult(tuples, Regression.SLOPE, 8, "B", -30.0);
+
+ assertResult(tuples, Regression.SLOPE, 9, "A", -102.0);
+ assertResult(tuples, Regression.SLOPE, 10, "C", 601.0);
+ }
+
+ @Test
+ public void testMvMaxMean() throws Exception {
+ Topology topology = newTopology("testMvMaxMean");
+
+ TStream<JsonObject> aggregate = mvAggregate(topology, Statistic.MAX, Statistic.MEAN);
+
+ Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
+ Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
+ complete(topology, count);
+ assertTrue(count.valid());
+
+ List<JsonObject> tuples = contents.getResult();
+ assertEquals(11, tuples.size());
+
+ assertMvOutputStructure(tuples, Statistic.MAX, Statistic.MEAN);
+
+ // "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0",
+ // "C700"
+ assertMvResult(tuples, Statistic.MAX, 0, "A", 1.0);
+ assertMvResult(tuples, Statistic.MAX, 1, "B", 7.0);
+ assertMvResult(tuples, Statistic.MAX, 2, "C", 4.0);
+
+ assertMvResult(tuples, Statistic.MAX, 3, "A", 4.0);
+ assertMvResult(tuples, Statistic.MAX, 4, "B", 7.0);
+ assertMvResult(tuples, Statistic.MAX, 5, "C", 99.0);
+
+ assertMvResult(tuples, Statistic.MAX, 6, "A", 102.0);
+ assertMvResult(tuples, Statistic.MAX, 7, "B", 43.0);
+ assertMvResult(tuples, Statistic.MAX, 8, "B", 43.0);
+
+ assertMvResult(tuples, Statistic.MAX, 9, "A", 102.0);
+ assertMvResult(tuples, Statistic.MAX, 10, "C", 700.0);
+
+ assertMvResult(tuples, Statistic.MEAN, 0, "A", 1.0);
+ assertMvResult(tuples, Statistic.MEAN, 1, "B", 7.0);
+ assertMvResult(tuples, Statistic.MEAN, 2, "C", 4.0);
+
+ assertMvResult(tuples, Statistic.MEAN, 3, "A", 2.5);
+ assertMvResult(tuples, Statistic.MEAN, 4, "B", 5.0);
+ assertMvResult(tuples, Statistic.MEAN, 5, "C", 51.5);
+
+ assertMvResult(tuples, Statistic.MEAN, 6, "A", 53.0);
+ assertMvResult(tuples, Statistic.MEAN, 7, "B", 23.0);
+ assertMvResult(tuples, Statistic.MEAN, 8, "B", 28.0);
+
+ assertMvResult(tuples, Statistic.MEAN, 9, "A", 51.0);
+ assertMvResult(tuples, Statistic.MEAN, 10, "C", 399.5);
+ }
+
+ private static void assertResult(List<JsonObject> tuples, JsonUnivariateAggregate stat, int index, String key, Double value) {
+ JsonObject tuple = tuples.get(index);
+ assertEquals(key, tuple.get("id").getAsString());
+
+ JsonObject agg = tuple.getAsJsonObject("value");
+ if (value != null) {
+ double result = agg.get(stat.name()).getAsDouble();
+ assertEquals("index:" + index, value, result, 0.01);
+ } else {
+ assertFalse(agg.has(stat.name()));
+ }
+ }
+
+ private static void assertMvResult(List<JsonObject> tuples, JsonUnivariateAggregate stat, int index, String key, Double value) {
+ JsonObject tuple = tuples.get(index);
+ assertEquals(key, tuple.get("id").getAsString());
+
+ if (value != null) {
+ Double result = JsonAnalytics.getMvAggregate(tuple, "aggResults", "value", stat).getAsDouble();
+ assertEquals("index:" + index + " value "+stat, value, result, 0.01);
+
+ Double result2 = JsonAnalytics.getMvAggregate(tuple, "aggResults", "value2", stat).getAsDouble();
+ assertEquals("index:" + index + " value2 "+stat, value+1000, result2, 0.01);
+ }
+ else {
+ assertFalse("index:" + index + " value "+stat, JsonAnalytics.hasMvAggregate(tuple, "aggResults", "value", stat));
+ assertFalse("index:" + index + " value2 "+stat, JsonAnalytics.hasMvAggregate(tuple, "aggResults", "value2", stat));
+ }
+ }
+
+ public static void assertOutputStructure(List<JsonObject> tuples, JsonUnivariateAggregate ... stats) {
+ for (JsonObject j : tuples) {
+ assertTrue(j.has("id")); // Value of the key
+ assertTrue(j.has("value")); // Value of the key
+
+ JsonObject v = j.getAsJsonObject("value");
+ for (JsonUnivariateAggregate stat : stats) {
+ assertTrue(v.has(stat.name()));
+ }
+ }
+ }
+
+ public static void assertMvOutputStructure(List<JsonObject> tuples, JsonUnivariateAggregate ... stats) {
+ for (JsonObject j : tuples) {
+ assertTrue(j.has("id")); // Value of the key
+ assertTrue(j.has("aggResults")); // Value of the key
+
+ for (JsonUnivariateAggregate stat : stats) {
+ assertTrue("value "+stat, JsonAnalytics.hasMvAggregate(j, "aggResults", "value", stat));
+ }
+ for (JsonUnivariateAggregate stat : stats) {
+ assertTrue("value2 "+stat, JsonAnalytics.hasMvAggregate(j, "aggResults", "value2", stat));
+ }
+ }
+ }
+
+ public static TStream<JsonObject> aggregate(Topology topology, JsonUnivariateAggregate ... stats) {
+ TStream<JsonObject> sourceData = sourceData(topology);
+
+ TWindow<JsonObject, JsonElement> window = sourceData.last(2, j -> j.get("id"));
+
+ return JsonAnalytics.aggregate(window, "id", "value",
+ j -> j.get("value").getAsDouble(), stats);
+ }
+
+ public static TStream<JsonObject> sourceData(Topology topology)
+ {
+ TStream<String> seed = topology.strings("A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13", "A0", "C700");
+
+ return seed.map(s -> {
+ JsonObject j = new JsonObject();
+ j.addProperty("id", s.substring(0, 1));
+ j.addProperty("value", Integer.valueOf(s.substring(1)));
+ return j;
+ });
+ }
+
+ public static TStream<JsonObject> mvAggregate(Topology topology, JsonUnivariateAggregate ... stats) {
+ TStream<JsonObject> sourceData = sourceMvData(topology);
+
+ TWindow<JsonObject, JsonElement> window = sourceData.last(2, j -> j.get("id"));
+
+ List<Pair<String, JsonUnivariateAggregate[]>> aggSpecs = new ArrayList<>();
+ aggSpecs.add(JsonAnalytics.mkAggregationSpec("value", stats));
+ aggSpecs.add(JsonAnalytics.mkAggregationSpec("value2", stats));
+
+ return JsonAnalytics.mvAggregate(window, "id", "aggResults", aggSpecs);
+ }
+
+ /*
+ * same JsonObject as sourceData() but with an additional
+ * "value2" variable whose value is the the "value" variable's value + 1000
+ */
+ public static TStream<JsonObject> sourceMvData(Topology topology)
+ {
+ return sourceData(topology)
+ .map(jo -> {
+ jo.addProperty("value2", jo.get("value").getAsLong() + 1000);
+ return jo;
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadband.java
----------------------------------------------------------------------
diff --git a/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadband.java b/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadband.java
deleted file mode 100644
index b7c63d5..0000000
--- a/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadband.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.analytics.sensors;
-
-import java.util.concurrent.TimeUnit;
-
-import edgent.function.Function;
-import edgent.function.Predicate;
-
-/**
- * Deadband predicate function.
- *
- * @param <T> Tuple type.
- * @param <V> Value type for the deadband function.
- */
-class Deadband<T, V> implements Predicate<T> {
-
- private static final long serialVersionUID = 1L;
-
- private final Function<T, V> valueFunction;
- private final Predicate<V> inBand;
- private final long period;
- private final TimeUnit unit;
-
- // Always send the first value.
- private transient boolean outOfBand = true;
-
- private transient long lastSend;
-
- Deadband(Function<T, V> valueFunction, Predicate<V> deadbandFunction) {
- this(valueFunction , deadbandFunction, 0, null);
- }
-
- Deadband(Function<T, V> valueFunction, Predicate<V> inBand, long period, TimeUnit unit) {
- this.valueFunction = valueFunction;
- this.inBand = inBand;
- this.period = period;
- this.unit = unit;
- }
-
- @Override
- public boolean test(final T t) {
- final V value = valueFunction.apply(t);
- boolean passTuple;
- long now = 0;
- if (!inBand.test(value)) {
- outOfBand = true;
- passTuple = true;
- } else if (outOfBand) {
- // When the value returns to being in-band
- // send the in-band value.
- outOfBand = false;
- passTuple = true;
- } else {
- passTuple = false;
- if (period != 0) {
- now = System.currentTimeMillis();
- long sinceLast = unit.convert(now - lastSend, TimeUnit.MILLISECONDS);
- if (sinceLast > period)
- passTuple = true;
- }
- }
-
- if (passTuple && period != 0)
- lastSend = now == 0 ? System.currentTimeMillis() : now;
-
- return passTuple;
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadtime.java
----------------------------------------------------------------------
diff --git a/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadtime.java b/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadtime.java
deleted file mode 100644
index 1d93241..0000000
--- a/analytics/sensors/src/main/java/edgent/analytics/sensors/Deadtime.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package edgent.analytics.sensors;
-
-import java.util.Date;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-
-import edgent.function.Predicate;
-
-/**
- * Deadtime {@link Predicate}.
- * <p>
- * {@link #test(Object) test()} returns true on its initial call
- * and then false for any calls occurring during the following deadtime period.
- * After the end of a deadtime period, the next call to {@code test()}
- * returns true and a new deadtime period is begun.
- * </p><p>
- * The deadtime period may be changed while the topology is running
- * via {@link #setPeriod(long, TimeUnit)}.
- * </p>
- *
- * @param <T> tuple type
- * @see Filters#deadtime(edgent.topology.TStream, long, TimeUnit) Filters.deadtime()
- */
-public class Deadtime<T> implements Predicate<T> {
- private static final long serialVersionUID = 1L;
- private long deadtimePeriodMillis;
- private long lastTrueTimeMillis;
- private volatile long nextTrueTimeMillis;
-
- /**
- * Create a new Deadtime Predicate
- * <p>
- * Same as {@code Deadtime(0, TimeUnit.SECONDS)}
- */
- public Deadtime() {
- setPeriod(0, TimeUnit.SECONDS);
- }
-
- /**
- * Create a new Deadtime Predicate
- * <p>
- * The first received tuple is always "accepted".
- * @param deadtimePeriod see {@link #setPeriod(long, TimeUnit) setPeriod()}
- * @param unit {@link TimeUnit} of {@code deadtimePeriod}
- */
- public Deadtime(long deadtimePeriod, TimeUnit unit) {
- setPeriod(deadtimePeriod, unit);
- }
-
- /**
- * Set the deadtime period
- * <p>
- * The end of a currently active deadtime period is shortened or extended
- * to match the new deadtime period specification.
- * </p><p>
- * The deadtime period behavior is subject to the accuracy
- * of the system's {@link System#currentTimeMillis()}.
- * A period of less than 1ms is equivalent to specifying 0.
- * </p>
- * @param deadtimePeriod the amount of time for {@code test()}
- * to return false after returning true.
- * Specify a value of 0 for no deadtime period.
- * Must be >= 0.
- * @param unit {@link TimeUnit} of {@code deadtimePeriod}
- */
- public synchronized void setPeriod(long deadtimePeriod, TimeUnit unit) {
- if (deadtimePeriod < 0)
- throw new IllegalArgumentException("deadtimePeriod");
- Objects.requireNonNull(unit, "unit");
- deadtimePeriodMillis = unit.toMillis(deadtimePeriod);
- nextTrueTimeMillis = lastTrueTimeMillis + deadtimePeriodMillis;
- }
-
- /**
- * Test the deadtime predicate.
- * @param value ignored
- * @return false if in a deadtime period, true otherwise
- */
- @Override
- public boolean test(T value) {
- long now = System.currentTimeMillis();
- if (now < nextTrueTimeMillis)
- return false;
- else synchronized(this) {
- lastTrueTimeMillis = now;
- nextTrueTimeMillis = now + deadtimePeriodMillis;
- return true;
- }
- }
-
- /**
- * Returns a String for development/debug support. Content subject to change.
- */
- @Override
- public String toString() {
- return "nextPass after "+new Date(nextTrueTimeMillis);
- }
-
-}
\ No newline at end of file