You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/03/04 20:48:47 UTC

[26/50] [abbrv] incubator-apex-core git commit: Added example code for setting dimensionScheme for ADT

Added example code for setting dimensionScheme for ADT


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/c8cc637e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/c8cc637e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/c8cc637e

Branch: refs/heads/master
Commit: c8cc637ea3963e731deba35feb54a0cb5a27b4f1
Parents: c237a24
Author: David Yan <da...@datatorrent.com>
Authored: Thu Nov 5 11:47:06 2015 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sun Feb 28 22:46:38 2016 -0800

----------------------------------------------------------------------
 autometrics/autometrics.md | 72 ++++++++++++++++++++++++++++++++---------
 1 file changed, 57 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c8cc637e/autometrics/autometrics.md
----------------------------------------------------------------------
diff --git a/autometrics/autometrics.md b/autometrics/autometrics.md
index f1c9a27..2ed0d9e 100644
--- a/autometrics/autometrics.md
+++ b/autometrics/autometrics.md
@@ -11,10 +11,10 @@ An `AutoMetric` can be any object. It can be of a primitive type - int, long, et
 public class LineReceiver extends BaseOperator
 {
  @AutoMetric
- int length;
+ long length;
 
  @AutoMetric
- int count;
+ long count;
 
  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
  {
@@ -40,7 +40,7 @@ There are 2 auto-metrics declared in the `LineReceiver`. At the end of each appl
 # Aggregating AutoMetrics across Partitions
 When an operator is partitioned, it is useful to aggregate the values of auto-metrics across all its partitions every window to get a logical view of these metrics. The application master performs these aggregations using metrics aggregators.
 
-The AutoMetric API helps to achieve this by providing an interface for writing aggregators- `AutoMetric.Aggregator`. Any implementation of `AutoMetric.Aggregator` can be set as an operator attribute - `METRICS_AGGREGATOR` for a particular operator which in turn is used for aggregating physical metrics. 
+The AutoMetric API helps to achieve this by providing an interface for writing aggregators- `AutoMetric.Aggregator`. Any implementation of `AutoMetric.Aggregator` can be set as an operator attribute - `METRICS_AGGREGATOR` for a particular operator which in turn is used for aggregating physical metrics.
 
 ## Default aggregators
 [`MetricsAggregator`](https://github.com/apache/incubator-apex-core/blob/devel-3/common/src/main/java/com/datatorrent/common/metric/MetricsAggregator.java) is a simple implementation of `AutoMetric.Aggregator` that platform uses as a default for summing up primitive types - int, long, float and double.
@@ -78,8 +78,8 @@ public class AnotherLineReceiver extends BaseOperator
 
   public static class LineMetrics implements Serializable
   {
-    int length;
-    int count;
+    long length;
+    long count;
 
     private static final long serialVersionUID = 201511041908L;
   }
@@ -111,7 +111,7 @@ public class AvgLineLengthAggregator implements AutoMetric.Aggregator
 ```
 An instance of above aggregator can be specified as the `METRIC_AGGREGATOR` for `AnotherLineReceiver` while creating the DAG as shown below.
 
-```
+```java
   @Override
   public void populateDAG(DAG dag, Configuration configuration)
   {
@@ -130,10 +130,10 @@ GET /ws/v2/applications/{appid}/logicalPlan/operators/{opName}
 {
     ...
     "autoMetrics": {
-       "count": "71314", 
+       "count": "71314",
        "length": "27780706"
     },
-    "className": "com.datatorrent.autometric.LineReceiver", 
+    "className": "com.datatorrent.autometric.LineReceiver",
     ...
 }
 ```
@@ -156,13 +156,13 @@ The Gateway REST API provides a way to retrieve the latest values for all of the
 GET /ws/v2/applications/{appid}/logicalPlan/operators/{opName}
 {
     ...
-    "cpuPercentageMA": "{cpuPercentageMA}", 
-    "failureCount": "{failureCount}", 
+    "cpuPercentageMA": "{cpuPercentageMA}",
+    "failureCount": "{failureCount}",
     "latencyMA": "{latencyMA}",  
-    "totalTuplesEmitted": "{totalTuplesEmitted}", 
-    "totalTuplesProcessed": "{totalTuplesProcessed}", 
-    "tuplesEmittedPSMA": "{tuplesEmittedPSMA}", 
-    "tuplesProcessedPSMA": "{tuplesProcessedPSMA}", 
+    "totalTuplesEmitted": "{totalTuplesEmitted}",
+    "totalTuplesProcessed": "{totalTuplesProcessed}",
+    "tuplesEmittedPSMA": "{tuplesEmittedPSMA}",
+    "tuplesProcessedPSMA": "{tuplesProcessedPSMA}",
     ...
 }
 ```
@@ -222,7 +222,7 @@ public class AggregatorIIRAVG extends AbstractIncrementalAggregator
     double[] destVals = dest.getAggregates().getFieldsDouble();
     double[] srcVals = src.getAggregates().getFieldsDouble();
 
-    for(int index = 0; index < destLongs.length; index++) {
+    for (int index = 0; index < destLongs.length; index++) {
       destVals[index] = .5 * destVals[index] + .5 * srcVals[index];
     }
   }
@@ -251,6 +251,48 @@ AppDataTracker searches for custom aggregator jars under the following directori
 
 It uses reflection to find all the classes that extend from `IncrementalAggregator` and `OTFAggregator` in these jars and registers them with the name provided by `@Name` annotation (or class name when `@Name` is absent).
 
+# Using `METRICS_DIMENSIONS_SCHEME`
+
+Here is a sample code snippet on how you can make use of `METRICS_DIMENSIONS_SCHEME` to set your own time buckets and your own set of aggregators for certain `AutoMetric`s performed by the App Data Tracker in your application.
+
+```java
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    ...
+    LineReceiver lineReceiver = dag.addOperator("LineReceiver", new LineReceiver());
+    ...
+    AutoMetric.DimensionsScheme dimensionsScheme = new AutoMetric.DimensionsScheme()
+    {
+      String[] timeBuckets = new String[] { "1s", "1m", "1h" };
+      String[] lengthAggregators = new String[] { "IIRAVG", "SUM" };
+      String[] countAggregators = new String[] { "SUM" };
+
+      /* Setting the aggregation time bucket to be one second, one minute and one hour */
+      @Override
+      public String[] getTimeBuckets()
+      {
+        return timeBuckets;
+      }
+
+      @Override
+      public String[] getDimensionAggregationsFor(String logicalMetricName)
+      {
+        if ("length".equals(logicalMetricName)) {
+          return lengthAggregators;
+        } else if ("count".equals(logicalMetricName)) {
+          return countAggregators;
+        } else {
+          return null; // use default
+        }
+      }
+    };
+
+    dag.setAttribute(lineReceiver, OperatorContext.METRICS_DIMENSIONS_SCHEME, dimensionsScheme);
+    ...
+  }
+```
+
+
 # Dashboards
 With App Data Tracker enabled, you can visualize the AutoMetrics and system metrics in the Dashboards within dtManage.   Refer back to the diagram in the App Data Tracker section, dtGateway relays queries and query results to and from the App Data Tracker.  In this way, dtManage sends queries and receives results from the App Data Tracker via dtGateway and uses the results to let the user visualize the data.