You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:48:54 UTC

[08/50] [abbrv] hadoop git commit: YARN-3816. [Aggregation] App-level aggregation and accumulation for YARN system metrics (Li Lu via sjlee)

YARN-3816. [Aggregation] App-level aggregation and accumulation for YARN system metrics (Li Lu via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/78ffdf08
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/78ffdf08
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/78ffdf08

Branch: refs/heads/YARN-2928
Commit: 78ffdf087a787aef6a105a98094816ac7f8a4889
Parents: a5bf4fa
Author: Sangjin Lee <sj...@apache.org>
Authored: Fri Apr 22 10:24:40 2016 -0700
Committer: Vrushali <vr...@twitter.com>
Committed: Sun Jun 19 00:20:06 2016 -0700

----------------------------------------------------------------------
 .../records/timelineservice/TimelineMetric.java | 140 ++++++++++--
 .../TimelineMetricCalculator.java               | 115 ++++++++++
 .../TimelineMetricOperation.java                | 167 +++++++++++++++
 .../timelineservice/TestTimelineMetric.java     | 100 +++++++++
 .../TestTimelineServiceRecords.java             |   6 +-
 .../timelineservice/NMTimelinePublisher.java    |   4 +
 .../collector/AppLevelTimelineCollector.java    |  72 +++++++
 .../collector/TimelineCollector.java            | 213 ++++++++++++++++++-
 .../storage/TimelineAggregationTrack.java       |   2 +-
 .../collector/TestTimelineCollector.java        | 127 +++++++++++
 .../TestFileSystemTimelineWriterImpl.java       |  43 +++-
 .../storage/TestHBaseTimelineStorage.java       |  35 ++-
 12 files changed, 998 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
index 2f60515..f0c6849 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
@@ -19,12 +19,13 @@ package org.apache.hadoop.yarn.api.records.timelineservice;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
-import java.util.Comparator;
+import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -48,13 +49,13 @@ public class TimelineMetric {
 
   private Type type;
   private String id;
-  private Comparator<Long> reverseComparator = new Comparator<Long>() {
-    @Override
-    public int compare(Long l1, Long l2) {
-      return l2.compareTo(l1);
-    }
-  };
-  private TreeMap<Long, Number> values = new TreeMap<>(reverseComparator);
+  // By default, not to do any aggregation operations. This field will NOT be
+  // persisted (like a "transient" member).
+  private TimelineMetricOperation realtimeAggregationOp
+      = TimelineMetricOperation.NOP;
+
+  private TreeMap<Long, Number> values
+      = new TreeMap<>(Collections.reverseOrder());
 
   public TimelineMetric() {
     this(Type.SINGLE_VALUE);
@@ -83,6 +84,26 @@ public class TimelineMetric {
     this.id = metricId;
   }
 
+  /**
+   * Get the real time aggregation operation of this metric.
+   *
+   * @return Real time aggregation operation
+   */
+  public TimelineMetricOperation getRealtimeAggregationOp() {
+    return realtimeAggregationOp;
+  }
+
+  /**
+   * Set the real time aggregation operation of this metric.
+   *
+   * @param op A timeline metric operation that the metric should perform on
+   *           real time aggregations
+   */
+  public void setRealtimeAggregationOp(
+      final TimelineMetricOperation op) {
+    this.realtimeAggregationOp = op;
+  }
+
   // required by JAXB
   @InterfaceAudience.Private
   @XmlElement(name = "values")
@@ -98,8 +119,8 @@ public class TimelineMetric {
     if (type == Type.SINGLE_VALUE) {
       overwrite(vals);
     } else {
-      if (values != null) {
-        this.values = new TreeMap<Long, Number>(reverseComparator);
+      if (vals != null) {
+        this.values = new TreeMap<>(Collections.reverseOrder());
         this.values.putAll(vals);
       } else {
         this.values = null;
@@ -166,11 +187,100 @@ public class TimelineMetric {
 
   @Override
   public String toString() {
-    String str = "{id:" + id + ", type:" + type;
-    if (!values.isEmpty()) {
-      str += ", values:" + values;
+    return "{id: " + id + ", type: " + type +
+        ", realtimeAggregationOp: " +
+        realtimeAggregationOp + "; " + values.toString() +
+        "}";
+  }
+
+  /**
+   * Get the latest timeline metric as single value type.
+   *
+   * @param metric Incoming timeline metric
+   * @return The latest metric in the incoming metric
+   */
+  public static TimelineMetric getLatestSingleValueMetric(
+      TimelineMetric metric) {
+    if (metric.getType() == Type.SINGLE_VALUE) {
+      return metric;
+    } else {
+      TimelineMetric singleValueMetric = new TimelineMetric(Type.SINGLE_VALUE);
+      Long firstKey = metric.values.firstKey();
+      if (firstKey != null) {
+        Number firstValue = metric.values.get(firstKey);
+        singleValueMetric.addValue(firstKey, firstValue);
+      }
+      return singleValueMetric;
     }
-    str += "}";
-    return str;
   }
+
+  /**
+   * Get single data timestamp of the metric.
+   *
+   * @return the single data timestamp
+   */
+  public long getSingleDataTimestamp() {
+    if (this.type == Type.SINGLE_VALUE) {
+      if (values.size() == 0) {
+        throw new YarnRuntimeException("Values for this timeline metric is " +
+            "empty.");
+      } else {
+        return values.firstKey();
+      }
+    } else {
+      throw new YarnRuntimeException("Type for this timeline metric is not " +
+          "SINGLE_VALUE.");
+    }
+  }
+
+  /**
+   * Get single data value of the metric.
+   *
+   * @return the single data value
+   */
+  public Number getSingleDataValue() {
+    if (this.type == Type.SINGLE_VALUE) {
+      if (values.size() == 0) {
+        return null;
+      } else {
+        return values.get(values.firstKey());
+      }
+    } else {
+      throw new YarnRuntimeException("Type for this timeline metric is not " +
+          "SINGLE_VALUE.");
+    }
+  }
+
+  /**
+   * Aggregate an incoming metric to the base aggregated metric with the given
+   * operation state in a stateless fashion. The assumption here is
+   * baseAggregatedMetric and latestMetric should be single value data if not
+   * null.
+   *
+   * @param incomingMetric Incoming timeline metric to aggregate
+   * @param baseAggregatedMetric Base timeline metric
+   * @return Result metric after aggregation
+   */
+  public static TimelineMetric aggregateTo(TimelineMetric incomingMetric,
+      TimelineMetric baseAggregatedMetric) {
+    return aggregateTo(incomingMetric, baseAggregatedMetric, null);
+  }
+
+  /**
+   * Aggregate an incoming metric to the base aggregated metric with the given
+   * operation state. The assumption here is baseAggregatedMetric and
+   * latestMetric should be single value data if not null.
+   *
+   * @param incomingMetric Incoming timeline metric to aggregate
+   * @param baseAggregatedMetric Base timeline metric
+   * @param state Operation state
+   * @return Result metric after aggregation
+   */
+  public static TimelineMetric aggregateTo(TimelineMetric incomingMetric,
+      TimelineMetric baseAggregatedMetric, Map<Object, Object> state) {
+    TimelineMetricOperation operation
+        = incomingMetric.getRealtimeAggregationOp();
+    return operation.aggregate(incomingMetric, baseAggregatedMetric, state);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java
new file mode 100644
index 0000000..4c9045f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+/**
+ * A calculator for timeline metrics.
+ */
+public final class TimelineMetricCalculator {
+
+  private TimelineMetricCalculator() {
+    // do nothing.
+  }
+
+  /**
+   * Compare two not-null numbers.
+   * @param n1 Number n1
+   * @param n2 Number n2
+   * @return 0 if n1 equals n2, a negative int if n1 is less than n2, a
+   * positive int otherwise.
+   */
+  public static int compare(Number n1, Number n2) {
+    if (n1 == null || n2 == null) {
+      throw new YarnRuntimeException(
+          "Number to be compared shouldn't be null.");
+    }
+
+    if (n1 instanceof Integer || n1 instanceof Long) {
+      if (n1.longValue() == n2.longValue()) {
+        return 0;
+      } else {
+        return (n1.longValue() < n2.longValue()) ? -1 : 1;
+      }
+    }
+
+    if (n1 instanceof Float || n1 instanceof Double) {
+      if (n1.doubleValue() == n2.doubleValue()) {
+        return 0;
+      } else {
+        return (n1.doubleValue() < n2.doubleValue()) ? -1 : 1;
+      }
+    }
+
+    // TODO throw warnings/exceptions for other types of number.
+    throw new YarnRuntimeException("Unsupported types for number comparison: "
+        + n1.getClass().getName() + ", " + n2.getClass().getName());
+  }
+
+  /**
+   * Subtract operation between two Numbers.
+   * @param n1 Number n1
+   * @param n2 Number n2
+   * @return Number represent to (n1 - n2).
+   */
+  public static Number sub(Number n1, Number n2) {
+    if (n1 == null) {
+      throw new YarnRuntimeException(
+          "Number to be subtracted shouldn't be null.");
+    } else if (n2 == null) {
+      return n1;
+    }
+
+    if (n1 instanceof Integer || n1 instanceof Long) {
+      return n1.longValue() - n2.longValue();
+    }
+
+    if (n1 instanceof Float || n1 instanceof Double) {
+      return n1.doubleValue() - n2.doubleValue();
+    }
+
+    // TODO throw warnings/exceptions for other types of number.
+    return null;
+  }
+
+  /**
+   * Sum up two Numbers.
+   * @param n1 Number n1
+   * @param n2 Number n2
+   * @return Number represent to (n1 + n2).
+   */
+  public static Number sum(Number n1, Number n2) {
+    if (n1 == null) {
+      return n2;
+    } else if (n2 == null) {
+      return n1;
+    }
+
+    if (n1 instanceof Integer || n1 instanceof Long) {
+      return n1.longValue() + n2.longValue();
+    }
+
+    if (n1 instanceof Float || n1 instanceof Double) {
+      return n1.doubleValue() + n2.doubleValue();
+    }
+
+    // TODO throw warnings/exceptions for other types of number.
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java
new file mode 100644
index 0000000..58e5c38
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import java.util.Map;
+
+/**
+ * Aggregation operations.
+ */
+public enum TimelineMetricOperation {
+  NOP("NOP") {
+    /**
+     * Do nothing on the base metric.
+     *
+     * @param incoming Metric a
+     * @param base Metric b
+     * @param state Operation state (not used)
+     * @return Metric b
+     */
+    @Override
+    public TimelineMetric exec(TimelineMetric incoming,
+        TimelineMetric base, Map<Object, Object> state) {
+      return base;
+    }
+  },
+  MAX("MAX") {
+    /**
+     * Keep the greater value of incoming and base. Stateless operation.
+     *
+     * @param incoming Metric a
+     * @param base Metric b
+     * @param state Operation state (not used)
+     * @return the greater value of a and b
+     */
+    @Override
+    public TimelineMetric exec(TimelineMetric incoming,
+        TimelineMetric base, Map<Object, Object> state) {
+      if (base == null) {
+        return incoming;
+      }
+      Number incomingValue = incoming.getSingleDataValue();
+      Number aggregateValue = base.getSingleDataValue();
+      if (aggregateValue == null) {
+        aggregateValue = Long.MIN_VALUE;
+      }
+      if (TimelineMetricCalculator.compare(incomingValue, aggregateValue) > 0) {
+        base.addValue(incoming.getSingleDataTimestamp(), incomingValue);
+      }
+      return base;
+    }
+  },
+  REPLACE("REPLACE") {
+    /**
+     * Replace the base metric with the incoming value. Stateless operation.
+     *
+     * @param incoming Metric a
+     * @param base Metric b
+     * @param state Operation state (not used)
+     * @return Metric a
+     */
+    @Override
+    public TimelineMetric exec(TimelineMetric incoming,
+        TimelineMetric base,
+        Map<Object, Object> state) {
+      return incoming;
+    }
+  },
+  SUM("SUM") {
+    /**
+     * Return the sum of the incoming metric and the base metric if the
+     * operation is stateless. For stateful operations, also subtract the
+     * value of the timeline metric mapped to the PREV_METRIC_STATE_KEY
+     * in the state object.
+     *
+     * @param incoming Metric a
+     * @param base Metric b
+     * @param state Operation state (PREV_METRIC_STATE_KEY's value as Metric p)
+     * @return A metric with value a + b - p
+     */
+    @Override
+    public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base,
+        Map<Object, Object> state) {
+      if (base == null) {
+        return incoming;
+      }
+      Number incomingValue = incoming.getSingleDataValue();
+      Number aggregateValue = base.getSingleDataValue();
+      Number result
+          = TimelineMetricCalculator.sum(incomingValue, aggregateValue);
+
+      // If there are previous value in the state, we will take it off from the
+      // sum
+      if (state != null) {
+        Object prevMetric = state.get(PREV_METRIC_STATE_KEY);
+        if (prevMetric instanceof TimelineMetric) {
+          result = TimelineMetricCalculator.sub(result,
+              ((TimelineMetric) prevMetric).getSingleDataValue());
+        }
+      }
+      base.addValue(incoming.getSingleDataTimestamp(), result);
+      return base;
+    }
+  },
+  AVG("AVERAGE") {
+    /**
+     * Return the average value of the incoming metric and the base metric,
+     * with a given state. Not supported yet.
+     *
+     * @param incoming Metric a
+     * @param base Metric b
+     * @param state Operation state
+     * @return Not finished yet
+     */
+    @Override
+    public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base,
+        Map<Object, Object> state) {
+      // Not supported yet
+      throw new UnsupportedOperationException(
+          "Unsupported aggregation operation: AVERAGE");
+    }
+  };
+
+  public static final String PREV_METRIC_STATE_KEY = "PREV_METRIC";
+
+  /**
+   * Perform the aggregation operation.
+   *
+   * @param incoming Incoming metric
+   * @param aggregate Base aggregation metric
+   * @param state Operation state
+   * @return Result metric for this aggregation operation
+   */
+  public TimelineMetric aggregate(TimelineMetric incoming,
+      TimelineMetric aggregate, Map<Object, Object> state) {
+    return exec(incoming, aggregate, state);
+  }
+
+  private final String opName;
+
+  TimelineMetricOperation(String opString) {
+    opName = opString;
+  }
+
+  @Override
+  public String toString() {
+    return this.opName;
+  }
+
+  abstract TimelineMetric exec(TimelineMetric incoming, TimelineMetric base,
+      Map<Object, Object> state);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java
new file mode 100644
index 0000000..3244bc3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+
+import org.junit.Test;
+
+public class TestTimelineMetric {
+
+  @Test
+  public void testTimelineMetricAggregation() {
+    long ts = System.currentTimeMillis();
+    // single_value metric add against null metric
+    TimelineMetric m1 = getSingleValueMetric("MEGA_BYTES_MILLIS",
+        TimelineMetricOperation.SUM, ts, 10000L);
+    TimelineMetric aggregatedMetric = TimelineMetric.aggregateTo(m1, null);
+    assertEquals(10000L, aggregatedMetric.getSingleDataValue());
+
+    TimelineMetric m2 = getSingleValueMetric("MEGA_BYTES_MILLIS",
+        TimelineMetricOperation.SUM, ts, 20000L);
+    aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric);
+    assertEquals(30000L, aggregatedMetric.getSingleDataValue());
+
+    // stateful sum test
+    Map<Object, Object> state = new HashMap<>();
+    state.put(TimelineMetricOperation.PREV_METRIC_STATE_KEY, m2);
+    TimelineMetric m2New = getSingleValueMetric("MEGA_BYTES_MILLIS",
+        TimelineMetricOperation.SUM, ts, 10000L);
+    aggregatedMetric = TimelineMetric.aggregateTo(m2New, aggregatedMetric,
+        state);
+    assertEquals(20000L, aggregatedMetric.getSingleDataValue());
+
+    // single_value metric max against single_value metric
+    TimelineMetric m3 = getSingleValueMetric("TRANSFER_RATE",
+        TimelineMetricOperation.MAX, ts, 150L);
+    TimelineMetric aggregatedMax = TimelineMetric.aggregateTo(m3, null);
+    assertEquals(150L, aggregatedMax.getSingleDataValue());
+
+    TimelineMetric m4 = getSingleValueMetric("TRANSFER_RATE",
+        TimelineMetricOperation.MAX, ts, 170L);
+    aggregatedMax = TimelineMetric.aggregateTo(m4, aggregatedMax);
+    assertEquals(170L, aggregatedMax.getSingleDataValue());
+
+    // single_value metric avg against single_value metric
+    TimelineMetric m5 = getSingleValueMetric("TRANSFER_RATE",
+        TimelineMetricOperation.AVG, ts, 150L);
+    try {
+      TimelineMetric.aggregateTo(m5, null);
+      fail("Taking average among metrics is not supported! ");
+    } catch (UnsupportedOperationException e) {
+      // Expected
+    }
+
+  }
+
+  private static TimelineMetric getSingleValueMetric(String id,
+      TimelineMetricOperation op, long timestamp, long value) {
+    TimelineMetric m = new TimelineMetric();
+    m.setId(id);
+    m.setType(Type.SINGLE_VALUE);
+    m.setRealtimeAggregationOp(op);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    metricValues.put(timestamp, value);
+    m.setValues(metricValues);
+    return m;
+  }
+
+  private static TimelineMetric getTimeSeriesMetric(String id,
+      TimelineMetricOperation op, Map<Long, Number> metricValues) {
+    TimelineMetric m = new TimelineMetric();
+    m.setId(id);
+    m.setType(Type.TIME_SERIES);
+    m.setRealtimeAggregationOp(op);
+    m.setValues(metricValues);
+    return m;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
index 51ec762..592bfa3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
@@ -64,13 +64,13 @@ public class TestTimelineServiceRecords {
         metric1.getValues().entrySet().iterator();
     Map.Entry<Long, Number> entry = itr.next();
     Assert.assertEquals(new Long(3L), entry.getKey());
-    Assert.assertEquals(new Double(3.0D), entry.getValue());
+    Assert.assertEquals(3.0D, entry.getValue());
     entry = itr.next();
     Assert.assertEquals(new Long(2L), entry.getKey());
-    Assert.assertEquals(new Integer(2), entry.getValue());
+    Assert.assertEquals(2, entry.getValue());
     entry = itr.next();
     Assert.assertEquals(new Long(1L), entry.getKey());
-    Assert.assertEquals(new Float(1.0F), entry.getValue());
+    Assert.assertEquals(1.0F, entry.getValue());
     Assert.assertFalse(itr.hasNext());
     entity.addMetric(metric1);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 4d3dafd..39a6181 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
@@ -119,12 +120,15 @@ public class NMTimelinePublisher extends CompositeService {
       if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
         TimelineMetric memoryMetric = new TimelineMetric();
         memoryMetric.setId(ContainerMetric.MEMORY.toString());
+        memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
         memoryMetric.addValue(currentTimeMillis, pmemUsage);
         entity.addMetric(memoryMetric);
       }
       if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
         TimelineMetric cpuMetric = new TimelineMetric();
         cpuMetric.setId(ContainerMetric.CPU.toString());
+        // TODO: support average
+        cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
         cpuMetric.addValue(currentTimeMillis,
             Math.round(cpuUsagePercentPerCore));
         entity.addMetric(cpuMetric);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
index 4fe445a..eb05262 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -18,15 +18,26 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import com.google.common.base.Preconditions;
 
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Service that handles writes to the timeline service and writes them to the
  * backing storage for a given YARN application.
@@ -36,8 +47,16 @@ import com.google.common.base.Preconditions;
 @Private
 @Unstable
 public class AppLevelTimelineCollector extends TimelineCollector {
+  private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
+
+  private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1;
+  private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15;
+  private static Set<String> entityTypesSkipAggregation
+      = initializeSkipSet();
+
   private final ApplicationId appId;
   private final TimelineCollectorContext context;
+  private ScheduledThreadPoolExecutor appAggregationExecutor;
 
   public AppLevelTimelineCollector(ApplicationId appId) {
     super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
@@ -46,6 +65,14 @@ public class AppLevelTimelineCollector extends TimelineCollector {
     context = new TimelineCollectorContext();
   }
 
+  private static Set<String> initializeSkipSet() {
+    Set<String> result = new HashSet<>();
+    result.add(TimelineEntityType.YARN_APPLICATION.toString());
+    result.add(TimelineEntityType.YARN_FLOW_RUN.toString());
+    result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
+    return result;
+  }
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID,
@@ -60,11 +87,25 @@ public class AppLevelTimelineCollector extends TimelineCollector {
 
   @Override
   protected void serviceStart() throws Exception {
+    // Launch the aggregation thread
+    appAggregationExecutor = new ScheduledThreadPoolExecutor(
+        AppLevelTimelineCollector.AGGREGATION_EXECUTOR_NUM_THREADS,
+        new ThreadFactoryBuilder()
+            .setNameFormat("TimelineCollector Aggregation thread #%d")
+            .build());
+    appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), 0,
+        AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
+        TimeUnit.SECONDS);
     super.serviceStart();
   }
 
   @Override
   protected void serviceStop() throws Exception {
+    appAggregationExecutor.shutdown();
+    if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+      LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
+      appAggregationExecutor.shutdownNow();
+    }
     super.serviceStop();
   }
 
@@ -73,4 +114,35 @@ public class AppLevelTimelineCollector extends TimelineCollector {
     return context;
   }
 
+  @Override
+  protected Set<String> getEntityTypesSkipAggregation() {
+    return entityTypesSkipAggregation;
+  }
+
+  private class AppLevelAggregator implements Runnable {
+
+    @Override
+    public void run() {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("App-level real-time aggregating");
+      }
+      try {
+        TimelineCollectorContext currContext = getTimelineEntityContext();
+        TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
+            getAggregationGroups(), currContext.getAppId(),
+            TimelineEntityType.YARN_APPLICATION.toString());
+        TimelineEntities entities = new TimelineEntities();
+        entities.addEntity(resultEntity);
+        getWriter().write(currContext.getClusterId(), currContext.getUserId(),
+            currContext.getFlowName(), currContext.getFlowVersion(),
+            currContext.getFlowRunId(), currContext.getAppId(), entities);
+      } catch (Exception e) {
+        LOG.error("Error aggregating timeline metrics", e);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("App-level real-time aggregation complete");
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 15187d1..8cd645c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -19,6 +19,12 @@
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,7 +33,10 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 
@@ -41,9 +50,15 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 @Private
 @Unstable
 public abstract class TimelineCollector extends CompositeService {
+
   private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
+  public static final String SEPARATOR = "_";
 
   private TimelineWriter writer;
+  private ConcurrentMap<String, AggregationStatusTable> aggregationGroups
+      = new ConcurrentHashMap<>();
+  private static Set<String> entityTypesSkipAggregation
+      = new HashSet<>();
 
   public TimelineCollector(String name) {
     super(name);
@@ -68,6 +83,28 @@ public abstract class TimelineCollector extends CompositeService {
     this.writer = w;
   }
 
+  protected TimelineWriter getWriter() {
+    return writer;
+  }
+
+  protected Map<String, AggregationStatusTable> getAggregationGroups() {
+    return aggregationGroups;
+  }
+
+  /**
+   * Method to decide the set of timeline entity types the collector should
+   * skip on aggregations. Subclasses may want to override this method to
+   * customize their own behaviors.
+   *
+   * @return A set of strings consists of all types the collector should skip.
+   */
+  protected Set<String> getEntityTypesSkipAggregation() {
+    return entityTypesSkipAggregation;
+  }
+
+  public abstract TimelineCollectorContext getTimelineEntityContext();
+
+
   /**
    * Handles entity writes. These writes are synchronous and are written to the
    * backing storage without buffering/batching. If any entity already exists,
@@ -90,8 +127,12 @@ public abstract class TimelineCollector extends CompositeService {
       LOG.debug("putEntities(entities=" + entities + ", callerUgi="
           + callerUgi + ")");
     }
-
     TimelineCollectorContext context = getTimelineEntityContext();
+
+    // Update application metrics for aggregation
+    updateAggregateStatus(entities, aggregationGroups,
+        getEntityTypesSkipAggregation());
+
     return writer.write(context.getClusterId(), context.getUserId(),
         context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(),
         context.getAppId(), entities);
@@ -117,6 +158,174 @@ public abstract class TimelineCollector extends CompositeService {
     }
   }
 
-  public abstract TimelineCollectorContext getTimelineEntityContext();
+  /**
+   * Aggregate all metrics in given timeline entities with no predefined states.
+   *
+   * @param entities Entities to aggregate
+   * @param resultEntityId Id of the result entity
+   * @param resultEntityType Type of the result entity
+   * @param needsGroupIdInResult Marks if we want the aggregation group id in
+   *                             each aggregated metrics.
+   * @return A timeline entity that contains all aggregated TimelineMetric.
+   */
+  public static TimelineEntity aggregateEntities(
+      TimelineEntities entities, String resultEntityId,
+      String resultEntityType, boolean needsGroupIdInResult) {
+    ConcurrentMap<String, AggregationStatusTable> aggregationGroups
+        = new ConcurrentHashMap<>();
+    updateAggregateStatus(entities, aggregationGroups, null);
+    if (needsGroupIdInResult) {
+      return aggregate(aggregationGroups, resultEntityId, resultEntityType);
+    } else {
+      return aggregateWithoutGroupId(
+          aggregationGroups, resultEntityId, resultEntityType);
+    }
+  }
 
+  /**
+   * Update the aggregation status table for a timeline collector.
+   *
+   * @param entities Entities to update
+   * @param aggregationGroups Aggregation status table
+   * @param typesToSkip Entity types that we can safely assume to skip updating
+   */
+  static void updateAggregateStatus(
+      TimelineEntities entities,
+      ConcurrentMap<String, AggregationStatusTable> aggregationGroups,
+      Set<String> typesToSkip) {
+    for (TimelineEntity e : entities.getEntities()) {
+      if ((typesToSkip != null && typesToSkip.contains(e.getType()))
+          || e.getMetrics().isEmpty()) {
+        continue;
+      }
+      AggregationStatusTable aggrTable = aggregationGroups.get(e.getType());
+      if (aggrTable == null) {
+        AggregationStatusTable table = new AggregationStatusTable();
+        aggrTable = aggregationGroups.putIfAbsent(e.getType(),
+            table);
+        if (aggrTable == null) {
+          aggrTable = table;
+        }
+      }
+      aggrTable.update(e);
+    }
+  }
+
+  /**
+   * Aggregate internal status and generate timeline entities for the
+   * aggregation results.
+   *
+   * @param aggregationGroups Aggregation status table
+   * @param resultEntityId Id of the result entity
+   * @param resultEntityType Type of the result entity
+   * @return A timeline entity that contains all aggregated TimelineMetric.
+   */
+  static TimelineEntity aggregate(
+      Map<String, AggregationStatusTable> aggregationGroups,
+      String resultEntityId, String resultEntityType) {
+    TimelineEntity result = new TimelineEntity();
+    result.setId(resultEntityId);
+    result.setType(resultEntityType);
+    for (Map.Entry<String, AggregationStatusTable> entry
+        : aggregationGroups.entrySet()) {
+      entry.getValue().aggregateAllTo(result, entry.getKey());
+    }
+    return result;
+  }
+
+  /**
+   * Aggregate internal status and generate timeline entities for the
+   * aggregation results. The result metrics will not have aggregation group
+   * information.
+   *
+   * @param aggregationGroups Aggregation status table
+   * @param resultEntityId Id of the result entity
+   * @param resultEntityType Type of the result entity
+   * @return A timeline entity that contains all aggregated TimelineMetric.
+   */
+  static TimelineEntity aggregateWithoutGroupId(
+      Map<String, AggregationStatusTable> aggregationGroups,
+      String resultEntityId, String resultEntityType) {
+    TimelineEntity result = new TimelineEntity();
+    result.setId(resultEntityId);
+    result.setType(resultEntityType);
+    for (Map.Entry<String, AggregationStatusTable> entry
+        : aggregationGroups.entrySet()) {
+      entry.getValue().aggregateAllTo(result, "");
+    }
+    return result;
+  }
+
+  // Note: In memory aggregation is performed in an eventually consistent
+  // fashion.
+  private static class AggregationStatusTable {
+    // On aggregation, for each metric, aggregate all per-entity accumulated
+    // metrics. We only use the id and type for TimelineMetrics in the key set
+    // of this table.
+    private ConcurrentMap<TimelineMetric, Map<String, TimelineMetric>>
+        aggregateTable;
+
+    public AggregationStatusTable() {
+      aggregateTable = new ConcurrentHashMap<>();
+    }
+
+    public void update(TimelineEntity incoming) {
+      String entityId = incoming.getId();
+      for (TimelineMetric m : incoming.getMetrics()) {
+        // Skip if the metric does not need aggregation
+        if (m.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
+          continue;
+        }
+        // Update aggregateTable
+        Map<String, TimelineMetric> aggrRow = aggregateTable.get(m);
+        if (aggrRow == null) {
+          Map<String, TimelineMetric> tempRow = new ConcurrentHashMap<>();
+          aggrRow = aggregateTable.putIfAbsent(m, tempRow);
+          if (aggrRow == null) {
+            aggrRow = tempRow;
+          }
+        }
+        aggrRow.put(entityId, m);
+      }
+    }
+
+    public TimelineEntity aggregateTo(TimelineMetric metric, TimelineEntity e,
+        String aggregationGroupId) {
+      if (metric.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
+        return e;
+      }
+      Map<String, TimelineMetric> aggrRow = aggregateTable.get(metric);
+      if (aggrRow != null) {
+        TimelineMetric aggrMetric = new TimelineMetric();
+        if (aggregationGroupId.length() > 0) {
+          aggrMetric.setId(metric.getId() + SEPARATOR + aggregationGroupId);
+        } else {
+          aggrMetric.setId(metric.getId());
+        }
+        aggrMetric.setRealtimeAggregationOp(TimelineMetricOperation.NOP);
+        Map<Object, Object> status = new HashMap<>();
+        for (TimelineMetric m : aggrRow.values()) {
+          TimelineMetric.aggregateTo(m, aggrMetric, status);
+          // getRealtimeAggregationOp returns an enum so we can directly
+          // compare with "!=".
+          if (m.getRealtimeAggregationOp()
+              != aggrMetric.getRealtimeAggregationOp()) {
+            aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp());
+          }
+        }
+        Set<TimelineMetric> metrics = e.getMetrics();
+        metrics.remove(aggrMetric);
+        metrics.add(aggrMetric);
+      }
+      return e;
+    }
+
+    public TimelineEntity aggregateAllTo(TimelineEntity e,
+        String aggregationGroupId) {
+      for (TimelineMetric m : aggregateTable.keySet()) {
+        aggregateTo(m, e, aggregationGroupId);
+      }
+      return e;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
index f0b1e47..6a1e086 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
@@ -24,5 +24,5 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
  *
  */
 public enum TimelineAggregationTrack {
-  FLOW, USER, QUEUE
+  APP, FLOW, USER, QUEUE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
new file mode 100644
index 0000000..5b4dc50
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestTimelineCollector {
+
+  private TimelineEntities generateTestEntities(int groups, int entities) {
+    TimelineEntities te = new TimelineEntities();
+    for (int j = 0; j < groups; j++) {
+      for (int i = 0; i < entities; i++) {
+        TimelineEntity entity = new TimelineEntity();
+        String containerId = "container_1000178881110_2002_" + i;
+        entity.setId(containerId);
+        String entityType = "TEST_" + j;
+        entity.setType(entityType);
+        long cTime = 1425016501000L;
+        entity.setCreatedTime(cTime);
+
+        // add metrics
+        Set<TimelineMetric> metrics = new HashSet<>();
+        TimelineMetric m1 = new TimelineMetric();
+        m1.setId("HDFS_BYTES_WRITE");
+        m1.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+        long ts = System.currentTimeMillis();
+        m1.addValue(ts - 20000, 100L);
+        metrics.add(m1);
+
+        TimelineMetric m2 = new TimelineMetric();
+        m2.setId("VCORES_USED");
+        m2.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+        m2.addValue(ts - 20000, 3L);
+        metrics.add(m2);
+
+        // m3 should not show up in the aggregation
+        TimelineMetric m3 = new TimelineMetric();
+        m3.setId("UNRELATED_VALUES");
+        m3.addValue(ts - 20000, 3L);
+        metrics.add(m3);
+
+        TimelineMetric m4 = new TimelineMetric();
+        m4.setId("TXN_FINISH_TIME");
+        m4.setRealtimeAggregationOp(TimelineMetricOperation.MAX);
+        m4.addValue(ts - 20000, i);
+        metrics.add(m4);
+
+        entity.addMetrics(metrics);
+        te.addEntity(entity);
+      }
+    }
+
+    return te;
+  }
+
+  @Test
+  public void testAggregation() throws Exception {
+    // Test aggregation with multiple groups.
+    int groups = 3;
+    int n = 50;
+    TimelineEntities testEntities = generateTestEntities(groups, n);
+    TimelineEntity resultEntity = TimelineCollector.aggregateEntities(
+        testEntities, "test_result", "TEST_AGGR", true);
+    assertEquals(resultEntity.getMetrics().size(), groups * 3);
+
+    for (int i = 0; i < groups; i++) {
+      Set<TimelineMetric> metrics = resultEntity.getMetrics();
+      for (TimelineMetric m : metrics) {
+        if (m.getId().startsWith("HDFS_BYTES_WRITE")) {
+          assertEquals(100 * n, m.getSingleDataValue().intValue());
+        } else if (m.getId().startsWith("VCORES_USED")) {
+          assertEquals(3 * n, m.getSingleDataValue().intValue());
+        } else if (m.getId().startsWith("TXN_FINISH_TIME")) {
+          assertEquals(n - 1, m.getSingleDataValue());
+        } else {
+          fail("Unrecognized metric! " + m.getId());
+        }
+      }
+    }
+
+    // Test aggregation with a single group.
+    TimelineEntities testEntities1 = generateTestEntities(1, n);
+    TimelineEntity resultEntity1 = TimelineCollector.aggregateEntities(
+        testEntities1, "test_result", "TEST_AGGR", false);
+    assertEquals(resultEntity1.getMetrics().size(), 3);
+
+    Set<TimelineMetric> metrics = resultEntity1.getMetrics();
+    for (TimelineMetric m : metrics) {
+      if (m.getId().equals("HDFS_BYTES_WRITE")) {
+        assertEquals(100 * n, m.getSingleDataValue().intValue());
+      } else if (m.getId().equals("VCORES_USED")) {
+        assertEquals(3 * n, m.getSingleDataValue().intValue());
+      } else if (m.getId().equals("TXN_FINISH_TIME")) {
+        assertEquals(n - 1, m.getSingleDataValue());
+      } else {
+        fail("Unrecognized metric! " + m.getId());
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index 5ce7d3b..2f79daa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -25,11 +25,15 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.junit.Test;
@@ -51,6 +55,26 @@ public class TestFileSystemTimelineWriterImpl {
     entity.setCreatedTime(1425016501000L);
     te.addEntity(entity);
 
+    TimelineMetric metric = new TimelineMetric();
+    String metricId = "CPU";
+    metric.setId(metricId);
+    metric.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+    metric.addValue(1425016501000L, 1234567L);
+
+    TimelineEntity entity2 = new TimelineEntity();
+    String id2 = "metric";
+    String type2 = "app";
+    entity2.setId(id2);
+    entity2.setType(type2);
+    entity2.setCreatedTime(1425016503000L);
+    entity2.addMetric(metric);
+    te.addEntity(entity2);
+
+    Map<String, TimelineMetric> aggregatedMetrics =
+        new HashMap<String, TimelineMetric>();
+    aggregatedMetrics.put(metricId, metric);
+
     FileSystemTimelineWriterImpl fsi = null;
     try {
       fsi = new FileSystemTimelineWriterImpl();
@@ -68,11 +92,27 @@ public class TestFileSystemTimelineWriterImpl {
       assertTrue(f.exists() && !f.isDirectory());
       List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
       // ensure there's only one entity + 1 new line
-      assertTrue(data.size() == 2);
+      assertTrue("data size is:" + data.size(), data.size() == 2);
       String d = data.get(0);
       // confirm the contents same as what was written
       assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
 
+      // verify aggregated metrics
+      String fileName2 = fsi.getOutputRoot() +
+          "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/"
+          + type2 + "/" + id2 +
+          FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      Path path2 = Paths.get(fileName2);
+      File file = new File(fileName2);
+      assertTrue(file.exists() && !file.isDirectory());
+      List<String> data2 = Files.readAllLines(path2, StandardCharsets.UTF_8);
+      // ensure there's only one entity + 1 new line
+      assertTrue("data size is:" + data.size(), data2.size() == 2);
+      String metricToString = data2.get(0);
+      // confirm the contents same as what was written
+      assertEquals(metricToString,
+          TimelineUtils.dumpTimelineRecordtoJSON(entity2));
+
       // delete the directory
       File outputDir = new File(fsi.getOutputRoot());
       FileUtils.deleteDirectory(outputDir);
@@ -84,4 +124,5 @@ public class TestFileSystemTimelineWriterImpl {
       }
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 6b57ec4..8ab54bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -539,6 +540,26 @@ public class TestHBaseTimelineStorage {
     metrics.add(m1);
     entity.addMetrics(metrics);
 
+    // add aggregated metrics
+    TimelineEntity aggEntity = new TimelineEntity();
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    aggEntity.setId(appId);
+    aggEntity.setType(type);
+    long cTime2 = 1425016502000L;
+    long mTime2 = 1425026902000L;
+    aggEntity.setCreatedTime(cTime2);
+
+    TimelineMetric aggMetric = new TimelineMetric();
+    aggMetric.setId("MEM_USAGE");
+    Map<Long, Number> aggMetricValues = new HashMap<Long, Number>();
+    ts = System.currentTimeMillis();
+    aggMetricValues.put(ts - 120000, 102400000);
+    aggMetric.setType(Type.SINGLE_VALUE);
+    aggMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+    aggMetric.setValues(aggMetricValues);
+    Set<TimelineMetric> aggMetrics = new HashSet<>();
+    aggMetrics.add(aggMetric);
+    entity.addMetrics(aggMetrics);
     te.addEntity(entity);
 
     HBaseTimelineWriterImpl hbi = null;
@@ -564,7 +585,7 @@ public class TestHBaseTimelineStorage {
       Result result = new ApplicationTable().getResult(c1, conn, get);
 
       assertTrue(result != null);
-      assertEquals(15, result.size());
+      assertEquals(16, result.size());
 
       // check the row key
       byte[] row1 = result.getRow();
@@ -652,10 +673,17 @@ public class TestHBaseTimelineStorage {
       assertEquals(conf, conf2);
 
       Set<TimelineMetric> metrics2 = e1.getMetrics();
-      assertEquals(metrics, metrics2);
+      assertEquals(2, metrics2.size());
       for (TimelineMetric metric2 : metrics2) {
         Map<Long, Number> metricValues2 = metric2.getValues();
-        matchMetrics(metricValues, metricValues2);
+        assertTrue(metric2.getId().equals("MAP_SLOT_MILLIS") ||
+            metric2.getId().equals("MEM_USAGE"));
+        if (metric2.getId().equals("MAP_SLOT_MILLIS")) {
+          matchMetrics(metricValues, metricValues2);
+        }
+        if (metric2.getId().equals("MEM_USAGE")) {
+          matchMetrics(aggMetricValues, metricValues2);
+        }
       }
     } finally {
       if (hbi != null) {
@@ -724,7 +752,6 @@ public class TestHBaseTimelineStorage {
     m1.setValues(metricValues);
     metrics.add(m1);
     entity.addMetrics(metrics);
-
     te.addEntity(entity);
 
     HBaseTimelineWriterImpl hbi = null;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org