You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:48:54 UTC
[08/50] [abbrv] hadoop git commit: YARN-3816. [Aggregation] App-level
aggregation and accumulation for YARN system metrics (Li Lu via sjlee)
YARN-3816. [Aggregation] App-level aggregation and accumulation for YARN system metrics (Li Lu via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/78ffdf08
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/78ffdf08
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/78ffdf08
Branch: refs/heads/YARN-2928
Commit: 78ffdf087a787aef6a105a98094816ac7f8a4889
Parents: a5bf4fa
Author: Sangjin Lee <sj...@apache.org>
Authored: Fri Apr 22 10:24:40 2016 -0700
Committer: Vrushali <vr...@twitter.com>
Committed: Sun Jun 19 00:20:06 2016 -0700
----------------------------------------------------------------------
.../records/timelineservice/TimelineMetric.java | 140 ++++++++++--
.../TimelineMetricCalculator.java | 115 ++++++++++
.../TimelineMetricOperation.java | 167 +++++++++++++++
.../timelineservice/TestTimelineMetric.java | 100 +++++++++
.../TestTimelineServiceRecords.java | 6 +-
.../timelineservice/NMTimelinePublisher.java | 4 +
.../collector/AppLevelTimelineCollector.java | 72 +++++++
.../collector/TimelineCollector.java | 213 ++++++++++++++++++-
.../storage/TimelineAggregationTrack.java | 2 +-
.../collector/TestTimelineCollector.java | 127 +++++++++++
.../TestFileSystemTimelineWriterImpl.java | 43 +++-
.../storage/TestHBaseTimelineStorage.java | 35 ++-
12 files changed, 998 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
index 2f60515..f0c6849 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
@@ -19,12 +19,13 @@ package org.apache.hadoop.yarn.api.records.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
-import java.util.Comparator;
+import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
@@ -48,13 +49,13 @@ public class TimelineMetric {
private Type type;
private String id;
- private Comparator<Long> reverseComparator = new Comparator<Long>() {
- @Override
- public int compare(Long l1, Long l2) {
- return l2.compareTo(l1);
- }
- };
- private TreeMap<Long, Number> values = new TreeMap<>(reverseComparator);
+ // By default, not to do any aggregation operations. This field will NOT be
+ // persisted (like a "transient" member).
+ private TimelineMetricOperation realtimeAggregationOp
+ = TimelineMetricOperation.NOP;
+
+ private TreeMap<Long, Number> values
+ = new TreeMap<>(Collections.reverseOrder());
public TimelineMetric() {
this(Type.SINGLE_VALUE);
@@ -83,6 +84,26 @@ public class TimelineMetric {
this.id = metricId;
}
+ /**
+ * Get the real time aggregation operation of this metric.
+ *
+ * @return Real time aggregation operation
+ */
+ public TimelineMetricOperation getRealtimeAggregationOp() {
+ return realtimeAggregationOp;
+ }
+
+ /**
+ * Set the real time aggregation operation of this metric.
+ *
+ * @param op A timeline metric operation that the metric should perform on
+ * real time aggregations
+ */
+ public void setRealtimeAggregationOp(
+ final TimelineMetricOperation op) {
+ this.realtimeAggregationOp = op;
+ }
+
// required by JAXB
@InterfaceAudience.Private
@XmlElement(name = "values")
@@ -98,8 +119,8 @@ public class TimelineMetric {
if (type == Type.SINGLE_VALUE) {
overwrite(vals);
} else {
- if (values != null) {
- this.values = new TreeMap<Long, Number>(reverseComparator);
+ if (vals != null) {
+ this.values = new TreeMap<>(Collections.reverseOrder());
this.values.putAll(vals);
} else {
this.values = null;
@@ -166,11 +187,100 @@ public class TimelineMetric {
@Override
public String toString() {
- String str = "{id:" + id + ", type:" + type;
- if (!values.isEmpty()) {
- str += ", values:" + values;
+ return "{id: " + id + ", type: " + type +
+ ", realtimeAggregationOp: " +
+ realtimeAggregationOp + "; " + values.toString() +
+ "}";
+ }
+
+ /**
+ * Get the latest timeline metric as single value type.
+ *
+ * @param metric Incoming timeline metric
+ * @return The latest metric in the incoming metric
+ */
+ public static TimelineMetric getLatestSingleValueMetric(
+ TimelineMetric metric) {
+ if (metric.getType() == Type.SINGLE_VALUE) {
+ return metric;
+ } else {
+ TimelineMetric singleValueMetric = new TimelineMetric(Type.SINGLE_VALUE);
+ Long firstKey = metric.values.firstKey();
+ if (firstKey != null) {
+ Number firstValue = metric.values.get(firstKey);
+ singleValueMetric.addValue(firstKey, firstValue);
+ }
+ return singleValueMetric;
}
- str += "}";
- return str;
}
+
+ /**
+ * Get single data timestamp of the metric.
+ *
+ * @return the single data timestamp
+ */
+ public long getSingleDataTimestamp() {
+ if (this.type == Type.SINGLE_VALUE) {
+ if (values.size() == 0) {
+ throw new YarnRuntimeException("Values for this timeline metric is " +
+ "empty.");
+ } else {
+ return values.firstKey();
+ }
+ } else {
+ throw new YarnRuntimeException("Type for this timeline metric is not " +
+ "SINGLE_VALUE.");
+ }
+ }
+
+ /**
+ * Get single data value of the metric.
+ *
+ * @return the single data value
+ */
+ public Number getSingleDataValue() {
+ if (this.type == Type.SINGLE_VALUE) {
+ if (values.size() == 0) {
+ return null;
+ } else {
+ return values.get(values.firstKey());
+ }
+ } else {
+ throw new YarnRuntimeException("Type for this timeline metric is not " +
+ "SINGLE_VALUE.");
+ }
+ }
+
+ /**
+ * Aggregate an incoming metric to the base aggregated metric with the given
+ * operation state in a stateless fashion. The assumption here is
+ * baseAggregatedMetric and latestMetric should be single value data if not
+ * null.
+ *
+ * @param incomingMetric Incoming timeline metric to aggregate
+ * @param baseAggregatedMetric Base timeline metric
+ * @return Result metric after aggregation
+ */
+ public static TimelineMetric aggregateTo(TimelineMetric incomingMetric,
+ TimelineMetric baseAggregatedMetric) {
+ return aggregateTo(incomingMetric, baseAggregatedMetric, null);
+ }
+
+ /**
+ * Aggregate an incoming metric to the base aggregated metric with the given
+ * operation state. The assumption here is baseAggregatedMetric and
+ * latestMetric should be single value data if not null.
+ *
+ * @param incomingMetric Incoming timeline metric to aggregate
+ * @param baseAggregatedMetric Base timeline metric
+ * @param state Operation state
+ * @return Result metric after aggregation
+ */
+ public static TimelineMetric aggregateTo(TimelineMetric incomingMetric,
+ TimelineMetric baseAggregatedMetric, Map<Object, Object> state) {
+ TimelineMetricOperation operation
+ = incomingMetric.getRealtimeAggregationOp();
+ return operation.aggregate(incomingMetric, baseAggregatedMetric, state);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java
new file mode 100644
index 0000000..4c9045f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+/**
+ * A calculator for timeline metrics.
+ */
+public final class TimelineMetricCalculator {
+
+ private TimelineMetricCalculator() {
+ // do nothing.
+ }
+
+ /**
+ * Compare two not-null numbers.
+ * @param n1 Number n1
+ * @param n2 Number n2
+ * @return 0 if n1 equals n2, a negative int if n1 is less than n2, a
+ * positive int otherwise.
+ */
+ public static int compare(Number n1, Number n2) {
+ if (n1 == null || n2 == null) {
+ throw new YarnRuntimeException(
+ "Number to be compared shouldn't be null.");
+ }
+
+ if (n1 instanceof Integer || n1 instanceof Long) {
+ if (n1.longValue() == n2.longValue()) {
+ return 0;
+ } else {
+ return (n1.longValue() < n2.longValue()) ? -1 : 1;
+ }
+ }
+
+ if (n1 instanceof Float || n1 instanceof Double) {
+ if (n1.doubleValue() == n2.doubleValue()) {
+ return 0;
+ } else {
+ return (n1.doubleValue() < n2.doubleValue()) ? -1 : 1;
+ }
+ }
+
+ // TODO throw warnings/exceptions for other types of number.
+ throw new YarnRuntimeException("Unsupported types for number comparison: "
+ + n1.getClass().getName() + ", " + n2.getClass().getName());
+ }
+
+ /**
+ * Subtract operation between two Numbers.
+ * @param n1 Number n1
+ * @param n2 Number n2
+ * @return Number represent to (n1 - n2).
+ */
+ public static Number sub(Number n1, Number n2) {
+ if (n1 == null) {
+ throw new YarnRuntimeException(
+ "Number to be subtracted shouldn't be null.");
+ } else if (n2 == null) {
+ return n1;
+ }
+
+ if (n1 instanceof Integer || n1 instanceof Long) {
+ return n1.longValue() - n2.longValue();
+ }
+
+ if (n1 instanceof Float || n1 instanceof Double) {
+ return n1.doubleValue() - n2.doubleValue();
+ }
+
+ // TODO throw warnings/exceptions for other types of number.
+ return null;
+ }
+
+ /**
+ * Sum up two Numbers.
+ * @param n1 Number n1
+ * @param n2 Number n2
+ * @return Number represent to (n1 + n2).
+ */
+ public static Number sum(Number n1, Number n2) {
+ if (n1 == null) {
+ return n2;
+ } else if (n2 == null) {
+ return n1;
+ }
+
+ if (n1 instanceof Integer || n1 instanceof Long) {
+ return n1.longValue() + n2.longValue();
+ }
+
+ if (n1 instanceof Float || n1 instanceof Double) {
+ return n1.doubleValue() + n2.doubleValue();
+ }
+
+ // TODO throw warnings/exceptions for other types of number.
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java
new file mode 100644
index 0000000..58e5c38
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import java.util.Map;
+
+/**
+ * Aggregation operations.
+ */
+public enum TimelineMetricOperation {
+ NOP("NOP") {
+ /**
+ * Do nothing on the base metric.
+ *
+ * @param incoming Metric a
+ * @param base Metric b
+ * @param state Operation state (not used)
+ * @return Metric b
+ */
+ @Override
+ public TimelineMetric exec(TimelineMetric incoming,
+ TimelineMetric base, Map<Object, Object> state) {
+ return base;
+ }
+ },
+ MAX("MAX") {
+ /**
+ * Keep the greater value of incoming and base. Stateless operation.
+ *
+ * @param incoming Metric a
+ * @param base Metric b
+ * @param state Operation state (not used)
+ * @return the greater value of a and b
+ */
+ @Override
+ public TimelineMetric exec(TimelineMetric incoming,
+ TimelineMetric base, Map<Object, Object> state) {
+ if (base == null) {
+ return incoming;
+ }
+ Number incomingValue = incoming.getSingleDataValue();
+ Number aggregateValue = base.getSingleDataValue();
+ if (aggregateValue == null) {
+ aggregateValue = Long.MIN_VALUE;
+ }
+ if (TimelineMetricCalculator.compare(incomingValue, aggregateValue) > 0) {
+ base.addValue(incoming.getSingleDataTimestamp(), incomingValue);
+ }
+ return base;
+ }
+ },
+ REPLACE("REPLACE") {
+ /**
+ * Replace the base metric with the incoming value. Stateless operation.
+ *
+ * @param incoming Metric a
+ * @param base Metric b
+ * @param state Operation state (not used)
+ * @return Metric a
+ */
+ @Override
+ public TimelineMetric exec(TimelineMetric incoming,
+ TimelineMetric base,
+ Map<Object, Object> state) {
+ return incoming;
+ }
+ },
+ SUM("SUM") {
+ /**
+ * Return the sum of the incoming metric and the base metric if the
+ * operation is stateless. For stateful operations, also subtract the
+ * value of the timeline metric mapped to the PREV_METRIC_STATE_KEY
+ * in the state object.
+ *
+ * @param incoming Metric a
+ * @param base Metric b
+ * @param state Operation state (PREV_METRIC_STATE_KEY's value as Metric p)
+ * @return A metric with value a + b - p
+ */
+ @Override
+ public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base,
+ Map<Object, Object> state) {
+ if (base == null) {
+ return incoming;
+ }
+ Number incomingValue = incoming.getSingleDataValue();
+ Number aggregateValue = base.getSingleDataValue();
+ Number result
+ = TimelineMetricCalculator.sum(incomingValue, aggregateValue);
+
+ // If there are previous value in the state, we will take it off from the
+ // sum
+ if (state != null) {
+ Object prevMetric = state.get(PREV_METRIC_STATE_KEY);
+ if (prevMetric instanceof TimelineMetric) {
+ result = TimelineMetricCalculator.sub(result,
+ ((TimelineMetric) prevMetric).getSingleDataValue());
+ }
+ }
+ base.addValue(incoming.getSingleDataTimestamp(), result);
+ return base;
+ }
+ },
+ AVG("AVERAGE") {
+ /**
+ * Return the average value of the incoming metric and the base metric,
+ * with a given state. Not supported yet.
+ *
+ * @param incoming Metric a
+ * @param base Metric b
+ * @param state Operation state
+ * @return Not finished yet
+ */
+ @Override
+ public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base,
+ Map<Object, Object> state) {
+ // Not supported yet
+ throw new UnsupportedOperationException(
+ "Unsupported aggregation operation: AVERAGE");
+ }
+ };
+
+ public static final String PREV_METRIC_STATE_KEY = "PREV_METRIC";
+
+ /**
+ * Perform the aggregation operation.
+ *
+ * @param incoming Incoming metric
+ * @param aggregate Base aggregation metric
+ * @param state Operation state
+ * @return Result metric for this aggregation operation
+ */
+ public TimelineMetric aggregate(TimelineMetric incoming,
+ TimelineMetric aggregate, Map<Object, Object> state) {
+ return exec(incoming, aggregate, state);
+ }
+
+ private final String opName;
+
+ TimelineMetricOperation(String opString) {
+ opName = opString;
+ }
+
+ @Override
+ public String toString() {
+ return this.opName;
+ }
+
+ abstract TimelineMetric exec(TimelineMetric incoming, TimelineMetric base,
+ Map<Object, Object> state);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java
new file mode 100644
index 0000000..3244bc3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+
+import org.junit.Test;
+
+public class TestTimelineMetric {
+
+ @Test
+ public void testTimelineMetricAggregation() {
+ long ts = System.currentTimeMillis();
+ // single_value metric add against null metric
+ TimelineMetric m1 = getSingleValueMetric("MEGA_BYTES_MILLIS",
+ TimelineMetricOperation.SUM, ts, 10000L);
+ TimelineMetric aggregatedMetric = TimelineMetric.aggregateTo(m1, null);
+ assertEquals(10000L, aggregatedMetric.getSingleDataValue());
+
+ TimelineMetric m2 = getSingleValueMetric("MEGA_BYTES_MILLIS",
+ TimelineMetricOperation.SUM, ts, 20000L);
+ aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric);
+ assertEquals(30000L, aggregatedMetric.getSingleDataValue());
+
+ // stateful sum test
+ Map<Object, Object> state = new HashMap<>();
+ state.put(TimelineMetricOperation.PREV_METRIC_STATE_KEY, m2);
+ TimelineMetric m2New = getSingleValueMetric("MEGA_BYTES_MILLIS",
+ TimelineMetricOperation.SUM, ts, 10000L);
+ aggregatedMetric = TimelineMetric.aggregateTo(m2New, aggregatedMetric,
+ state);
+ assertEquals(20000L, aggregatedMetric.getSingleDataValue());
+
+ // single_value metric max against single_value metric
+ TimelineMetric m3 = getSingleValueMetric("TRANSFER_RATE",
+ TimelineMetricOperation.MAX, ts, 150L);
+ TimelineMetric aggregatedMax = TimelineMetric.aggregateTo(m3, null);
+ assertEquals(150L, aggregatedMax.getSingleDataValue());
+
+ TimelineMetric m4 = getSingleValueMetric("TRANSFER_RATE",
+ TimelineMetricOperation.MAX, ts, 170L);
+ aggregatedMax = TimelineMetric.aggregateTo(m4, aggregatedMax);
+ assertEquals(170L, aggregatedMax.getSingleDataValue());
+
+ // single_value metric avg against single_value metric
+ TimelineMetric m5 = getSingleValueMetric("TRANSFER_RATE",
+ TimelineMetricOperation.AVG, ts, 150L);
+ try {
+ TimelineMetric.aggregateTo(m5, null);
+ fail("Taking average among metrics is not supported! ");
+ } catch (UnsupportedOperationException e) {
+ // Expected
+ }
+
+ }
+
+ private static TimelineMetric getSingleValueMetric(String id,
+ TimelineMetricOperation op, long timestamp, long value) {
+ TimelineMetric m = new TimelineMetric();
+ m.setId(id);
+ m.setType(Type.SINGLE_VALUE);
+ m.setRealtimeAggregationOp(op);
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ metricValues.put(timestamp, value);
+ m.setValues(metricValues);
+ return m;
+ }
+
+ private static TimelineMetric getTimeSeriesMetric(String id,
+ TimelineMetricOperation op, Map<Long, Number> metricValues) {
+ TimelineMetric m = new TimelineMetric();
+ m.setId(id);
+ m.setType(Type.TIME_SERIES);
+ m.setRealtimeAggregationOp(op);
+ m.setValues(metricValues);
+ return m;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
index 51ec762..592bfa3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
@@ -64,13 +64,13 @@ public class TestTimelineServiceRecords {
metric1.getValues().entrySet().iterator();
Map.Entry<Long, Number> entry = itr.next();
Assert.assertEquals(new Long(3L), entry.getKey());
- Assert.assertEquals(new Double(3.0D), entry.getValue());
+ Assert.assertEquals(3.0D, entry.getValue());
entry = itr.next();
Assert.assertEquals(new Long(2L), entry.getKey());
- Assert.assertEquals(new Integer(2), entry.getValue());
+ Assert.assertEquals(2, entry.getValue());
entry = itr.next();
Assert.assertEquals(new Long(1L), entry.getKey());
- Assert.assertEquals(new Float(1.0F), entry.getValue());
+ Assert.assertEquals(1.0F, entry.getValue());
Assert.assertFalse(itr.hasNext());
entity.addMetric(metric1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 4d3dafd..39a6181 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
@@ -119,12 +120,15 @@ public class NMTimelinePublisher extends CompositeService {
if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
TimelineMetric memoryMetric = new TimelineMetric();
memoryMetric.setId(ContainerMetric.MEMORY.toString());
+ memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
memoryMetric.addValue(currentTimeMillis, pmemUsage);
entity.addMetric(memoryMetric);
}
if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
TimelineMetric cpuMetric = new TimelineMetric();
cpuMetric.setId(ContainerMetric.CPU.toString());
+ // TODO: support average
+ cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
cpuMetric.addValue(currentTimeMillis,
Math.round(cpuUsagePercentPerCore));
entity.addMetric(cpuMetric);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
index 4fe445a..eb05262 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -18,15 +18,26 @@
package org.apache.hadoop.yarn.server.timelineservice.collector;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.base.Preconditions;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
/**
* Service that handles writes to the timeline service and writes them to the
* backing storage for a given YARN application.
@@ -36,8 +47,16 @@ import com.google.common.base.Preconditions;
@Private
@Unstable
public class AppLevelTimelineCollector extends TimelineCollector {
+ private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
+
+ private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1;
+ private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15;
+ private static Set<String> entityTypesSkipAggregation
+ = initializeSkipSet();
+
private final ApplicationId appId;
private final TimelineCollectorContext context;
+ private ScheduledThreadPoolExecutor appAggregationExecutor;
public AppLevelTimelineCollector(ApplicationId appId) {
super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
@@ -46,6 +65,14 @@ public class AppLevelTimelineCollector extends TimelineCollector {
context = new TimelineCollectorContext();
}
+ private static Set<String> initializeSkipSet() {
+ Set<String> result = new HashSet<>();
+ result.add(TimelineEntityType.YARN_APPLICATION.toString());
+ result.add(TimelineEntityType.YARN_FLOW_RUN.toString());
+ result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
+ return result;
+ }
+
@Override
protected void serviceInit(Configuration conf) throws Exception {
context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID,
@@ -60,11 +87,25 @@ public class AppLevelTimelineCollector extends TimelineCollector {
@Override
protected void serviceStart() throws Exception {
+ // Launch the aggregation thread
+ appAggregationExecutor = new ScheduledThreadPoolExecutor(
+ AppLevelTimelineCollector.AGGREGATION_EXECUTOR_NUM_THREADS,
+ new ThreadFactoryBuilder()
+ .setNameFormat("TimelineCollector Aggregation thread #%d")
+ .build());
+ appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), 0,
+ AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
+ TimeUnit.SECONDS);
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
+ appAggregationExecutor.shutdown();
+ if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
+ appAggregationExecutor.shutdownNow();
+ }
super.serviceStop();
}
@@ -73,4 +114,35 @@ public class AppLevelTimelineCollector extends TimelineCollector {
return context;
}
+ @Override
+ protected Set<String> getEntityTypesSkipAggregation() {
+ return entityTypesSkipAggregation;
+ }
+
+ private class AppLevelAggregator implements Runnable {
+
+ @Override
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("App-level real-time aggregating");
+ }
+ try {
+ TimelineCollectorContext currContext = getTimelineEntityContext();
+ TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
+ getAggregationGroups(), currContext.getAppId(),
+ TimelineEntityType.YARN_APPLICATION.toString());
+ TimelineEntities entities = new TimelineEntities();
+ entities.addEntity(resultEntity);
+ getWriter().write(currContext.getClusterId(), currContext.getUserId(),
+ currContext.getFlowName(), currContext.getFlowVersion(),
+ currContext.getFlowRunId(), currContext.getAppId(), entities);
+ } catch (Exception e) {
+ LOG.error("Error aggregating timeline metrics", e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("App-level real-time aggregation complete");
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 15187d1..8cd645c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -19,6 +19,12 @@
package org.apache.hadoop.yarn.server.timelineservice.collector;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -27,7 +33,10 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
@@ -41,9 +50,15 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
@Private
@Unstable
public abstract class TimelineCollector extends CompositeService {
+
private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
+ public static final String SEPARATOR = "_";
private TimelineWriter writer;
+ private ConcurrentMap<String, AggregationStatusTable> aggregationGroups
+ = new ConcurrentHashMap<>();
+ private static Set<String> entityTypesSkipAggregation
+ = new HashSet<>();
public TimelineCollector(String name) {
super(name);
@@ -68,6 +83,28 @@ public abstract class TimelineCollector extends CompositeService {
this.writer = w;
}
+ protected TimelineWriter getWriter() {
+ return writer;
+ }
+
+ protected Map<String, AggregationStatusTable> getAggregationGroups() {
+ return aggregationGroups;
+ }
+
+ /**
+ * Method to decide the set of timeline entity types the collector should
+ * skip on aggregations. Subclasses may want to override this method to
+ * customize their own behaviors.
+ *
+ * @return A set of strings consists of all types the collector should skip.
+ */
+ protected Set<String> getEntityTypesSkipAggregation() {
+ return entityTypesSkipAggregation;
+ }
+
+ public abstract TimelineCollectorContext getTimelineEntityContext();
+
+
/**
* Handles entity writes. These writes are synchronous and are written to the
* backing storage without buffering/batching. If any entity already exists,
@@ -90,8 +127,12 @@ public abstract class TimelineCollector extends CompositeService {
LOG.debug("putEntities(entities=" + entities + ", callerUgi="
+ callerUgi + ")");
}
-
TimelineCollectorContext context = getTimelineEntityContext();
+
+ // Update application metrics for aggregation
+ updateAggregateStatus(entities, aggregationGroups,
+ getEntityTypesSkipAggregation());
+
return writer.write(context.getClusterId(), context.getUserId(),
context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(),
context.getAppId(), entities);
@@ -117,6 +158,174 @@ public abstract class TimelineCollector extends CompositeService {
}
}
- public abstract TimelineCollectorContext getTimelineEntityContext();
+ /**
+ * Aggregate all metrics in given timeline entities with no predefined states.
+ *
+ * @param entities Entities to aggregate
+ * @param resultEntityId Id of the result entity
+ * @param resultEntityType Type of the result entity
+ * @param needsGroupIdInResult Marks if we want the aggregation group id in
+ * each aggregated metrics.
+ * @return A timeline entity that contains all aggregated TimelineMetric.
+ */
+ public static TimelineEntity aggregateEntities(
+ TimelineEntities entities, String resultEntityId,
+ String resultEntityType, boolean needsGroupIdInResult) {
+ ConcurrentMap<String, AggregationStatusTable> aggregationGroups
+ = new ConcurrentHashMap<>();
+ updateAggregateStatus(entities, aggregationGroups, null);
+ if (needsGroupIdInResult) {
+ return aggregate(aggregationGroups, resultEntityId, resultEntityType);
+ } else {
+ return aggregateWithoutGroupId(
+ aggregationGroups, resultEntityId, resultEntityType);
+ }
+ }
+ /**
+ * Update the aggregation status table for a timeline collector.
+ *
+ * @param entities Entities to update
+ * @param aggregationGroups Aggregation status table
+ * @param typesToSkip Entity types that we can safely assume to skip updating
+ */
+ static void updateAggregateStatus(
+ TimelineEntities entities,
+ ConcurrentMap<String, AggregationStatusTable> aggregationGroups,
+ Set<String> typesToSkip) {
+ for (TimelineEntity e : entities.getEntities()) {
+ if ((typesToSkip != null && typesToSkip.contains(e.getType()))
+ || e.getMetrics().isEmpty()) {
+ continue;
+ }
+ AggregationStatusTable aggrTable = aggregationGroups.get(e.getType());
+ if (aggrTable == null) {
+ AggregationStatusTable table = new AggregationStatusTable();
+ aggrTable = aggregationGroups.putIfAbsent(e.getType(),
+ table);
+ if (aggrTable == null) {
+ aggrTable = table;
+ }
+ }
+ aggrTable.update(e);
+ }
+ }
+
+ /**
+ * Aggregate internal status and generate timeline entities for the
+ * aggregation results.
+ *
+ * @param aggregationGroups Aggregation status table
+ * @param resultEntityId Id of the result entity
+ * @param resultEntityType Type of the result entity
+ * @return A timeline entity that contains all aggregated TimelineMetric.
+ */
+ static TimelineEntity aggregate(
+ Map<String, AggregationStatusTable> aggregationGroups,
+ String resultEntityId, String resultEntityType) {
+ TimelineEntity result = new TimelineEntity();
+ result.setId(resultEntityId);
+ result.setType(resultEntityType);
+ for (Map.Entry<String, AggregationStatusTable> entry
+ : aggregationGroups.entrySet()) {
+ entry.getValue().aggregateAllTo(result, entry.getKey());
+ }
+ return result;
+ }
+
+ /**
+ * Aggregate internal status and generate timeline entities for the
+ * aggregation results. The result metrics will not have aggregation group
+ * information.
+ *
+ * @param aggregationGroups Aggregation status table
+ * @param resultEntityId Id of the result entity
+ * @param resultEntityType Type of the result entity
+ * @return A timeline entity that contains all aggregated TimelineMetric.
+ */
+ static TimelineEntity aggregateWithoutGroupId(
+ Map<String, AggregationStatusTable> aggregationGroups,
+ String resultEntityId, String resultEntityType) {
+ TimelineEntity result = new TimelineEntity();
+ result.setId(resultEntityId);
+ result.setType(resultEntityType);
+ for (Map.Entry<String, AggregationStatusTable> entry
+ : aggregationGroups.entrySet()) {
+ entry.getValue().aggregateAllTo(result, "");
+ }
+ return result;
+ }
+
+ // Note: In memory aggregation is performed in an eventually consistent
+ // fashion.
+ private static class AggregationStatusTable {
+ // On aggregation, for each metric, aggregate all per-entity accumulated
+ // metrics. We only use the id and type for TimelineMetrics in the key set
+ // of this table.
+ private ConcurrentMap<TimelineMetric, Map<String, TimelineMetric>>
+ aggregateTable;
+
+ public AggregationStatusTable() {
+ aggregateTable = new ConcurrentHashMap<>();
+ }
+
+ public void update(TimelineEntity incoming) {
+ String entityId = incoming.getId();
+ for (TimelineMetric m : incoming.getMetrics()) {
+ // Skip if the metric does not need aggregation
+ if (m.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
+ continue;
+ }
+ // Update aggregateTable
+ Map<String, TimelineMetric> aggrRow = aggregateTable.get(m);
+ if (aggrRow == null) {
+ Map<String, TimelineMetric> tempRow = new ConcurrentHashMap<>();
+ aggrRow = aggregateTable.putIfAbsent(m, tempRow);
+ if (aggrRow == null) {
+ aggrRow = tempRow;
+ }
+ }
+ aggrRow.put(entityId, m);
+ }
+ }
+
+ public TimelineEntity aggregateTo(TimelineMetric metric, TimelineEntity e,
+ String aggregationGroupId) {
+ if (metric.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
+ return e;
+ }
+ Map<String, TimelineMetric> aggrRow = aggregateTable.get(metric);
+ if (aggrRow != null) {
+ TimelineMetric aggrMetric = new TimelineMetric();
+ if (aggregationGroupId.length() > 0) {
+ aggrMetric.setId(metric.getId() + SEPARATOR + aggregationGroupId);
+ } else {
+ aggrMetric.setId(metric.getId());
+ }
+ aggrMetric.setRealtimeAggregationOp(TimelineMetricOperation.NOP);
+ Map<Object, Object> status = new HashMap<>();
+ for (TimelineMetric m : aggrRow.values()) {
+ TimelineMetric.aggregateTo(m, aggrMetric, status);
+ // getRealtimeAggregationOp returns an enum so we can directly
+ // compare with "!=".
+ if (m.getRealtimeAggregationOp()
+ != aggrMetric.getRealtimeAggregationOp()) {
+ aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp());
+ }
+ }
+ Set<TimelineMetric> metrics = e.getMetrics();
+ metrics.remove(aggrMetric);
+ metrics.add(aggrMetric);
+ }
+ return e;
+ }
+
+ public TimelineEntity aggregateAllTo(TimelineEntity e,
+ String aggregationGroupId) {
+ for (TimelineMetric m : aggregateTable.keySet()) {
+ aggregateTo(m, e, aggregationGroupId);
+ }
+ return e;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
index f0b1e47..6a1e086 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
@@ -24,5 +24,5 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
*
*/
public enum TimelineAggregationTrack {
- FLOW, USER, QUEUE
+ APP, FLOW, USER, QUEUE
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
new file mode 100644
index 0000000..5b4dc50
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestTimelineCollector {
+
+ private TimelineEntities generateTestEntities(int groups, int entities) {
+ TimelineEntities te = new TimelineEntities();
+ for (int j = 0; j < groups; j++) {
+ for (int i = 0; i < entities; i++) {
+ TimelineEntity entity = new TimelineEntity();
+ String containerId = "container_1000178881110_2002_" + i;
+ entity.setId(containerId);
+ String entityType = "TEST_" + j;
+ entity.setType(entityType);
+ long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId("HDFS_BYTES_WRITE");
+ m1.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+ long ts = System.currentTimeMillis();
+ m1.addValue(ts - 20000, 100L);
+ metrics.add(m1);
+
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId("VCORES_USED");
+ m2.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+ m2.addValue(ts - 20000, 3L);
+ metrics.add(m2);
+
+ // m3 should not show up in the aggregation
+ TimelineMetric m3 = new TimelineMetric();
+ m3.setId("UNRELATED_VALUES");
+ m3.addValue(ts - 20000, 3L);
+ metrics.add(m3);
+
+ TimelineMetric m4 = new TimelineMetric();
+ m4.setId("TXN_FINISH_TIME");
+ m4.setRealtimeAggregationOp(TimelineMetricOperation.MAX);
+ m4.addValue(ts - 20000, i);
+ metrics.add(m4);
+
+ entity.addMetrics(metrics);
+ te.addEntity(entity);
+ }
+ }
+
+ return te;
+ }
+
+ @Test
+ public void testAggregation() throws Exception {
+ // Test aggregation with multiple groups.
+ int groups = 3;
+ int n = 50;
+ TimelineEntities testEntities = generateTestEntities(groups, n);
+ TimelineEntity resultEntity = TimelineCollector.aggregateEntities(
+ testEntities, "test_result", "TEST_AGGR", true);
+ assertEquals(resultEntity.getMetrics().size(), groups * 3);
+
+ for (int i = 0; i < groups; i++) {
+ Set<TimelineMetric> metrics = resultEntity.getMetrics();
+ for (TimelineMetric m : metrics) {
+ if (m.getId().startsWith("HDFS_BYTES_WRITE")) {
+ assertEquals(100 * n, m.getSingleDataValue().intValue());
+ } else if (m.getId().startsWith("VCORES_USED")) {
+ assertEquals(3 * n, m.getSingleDataValue().intValue());
+ } else if (m.getId().startsWith("TXN_FINISH_TIME")) {
+ assertEquals(n - 1, m.getSingleDataValue());
+ } else {
+ fail("Unrecognized metric! " + m.getId());
+ }
+ }
+ }
+
+ // Test aggregation with a single group.
+ TimelineEntities testEntities1 = generateTestEntities(1, n);
+ TimelineEntity resultEntity1 = TimelineCollector.aggregateEntities(
+ testEntities1, "test_result", "TEST_AGGR", false);
+ assertEquals(resultEntity1.getMetrics().size(), 3);
+
+ Set<TimelineMetric> metrics = resultEntity1.getMetrics();
+ for (TimelineMetric m : metrics) {
+ if (m.getId().equals("HDFS_BYTES_WRITE")) {
+ assertEquals(100 * n, m.getSingleDataValue().intValue());
+ } else if (m.getId().equals("VCORES_USED")) {
+ assertEquals(3 * n, m.getSingleDataValue().intValue());
+ } else if (m.getId().equals("TXN_FINISH_TIME")) {
+ assertEquals(n - 1, m.getSingleDataValue());
+ } else {
+ fail("Unrecognized metric! " + m.getId());
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index 5ce7d3b..2f79daa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -25,11 +25,15 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Test;
@@ -51,6 +55,26 @@ public class TestFileSystemTimelineWriterImpl {
entity.setCreatedTime(1425016501000L);
te.addEntity(entity);
+ TimelineMetric metric = new TimelineMetric();
+ String metricId = "CPU";
+ metric.setId(metricId);
+ metric.setType(TimelineMetric.Type.SINGLE_VALUE);
+ metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+ metric.addValue(1425016501000L, 1234567L);
+
+ TimelineEntity entity2 = new TimelineEntity();
+ String id2 = "metric";
+ String type2 = "app";
+ entity2.setId(id2);
+ entity2.setType(type2);
+ entity2.setCreatedTime(1425016503000L);
+ entity2.addMetric(metric);
+ te.addEntity(entity2);
+
+ Map<String, TimelineMetric> aggregatedMetrics =
+ new HashMap<String, TimelineMetric>();
+ aggregatedMetrics.put(metricId, metric);
+
FileSystemTimelineWriterImpl fsi = null;
try {
fsi = new FileSystemTimelineWriterImpl();
@@ -68,11 +92,27 @@ public class TestFileSystemTimelineWriterImpl {
assertTrue(f.exists() && !f.isDirectory());
List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
// ensure there's only one entity + 1 new line
- assertTrue(data.size() == 2);
+ assertTrue("data size is:" + data.size(), data.size() == 2);
String d = data.get(0);
// confirm the contents same as what was written
assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ // verify aggregated metrics
+ String fileName2 = fsi.getOutputRoot() +
+ "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/"
+ + type2 + "/" + id2 +
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ Path path2 = Paths.get(fileName2);
+ File file = new File(fileName2);
+ assertTrue(file.exists() && !file.isDirectory());
+ List<String> data2 = Files.readAllLines(path2, StandardCharsets.UTF_8);
+ // ensure there's only one entity + 1 new line
+ assertTrue("data size is:" + data.size(), data2.size() == 2);
+ String metricToString = data2.get(0);
+ // confirm the contents same as what was written
+ assertEquals(metricToString,
+ TimelineUtils.dumpTimelineRecordtoJSON(entity2));
+
// delete the directory
File outputDir = new File(fsi.getOutputRoot());
FileUtils.deleteDirectory(outputDir);
@@ -84,4 +124,5 @@ public class TestFileSystemTimelineWriterImpl {
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ffdf08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 6b57ec4..8ab54bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -539,6 +540,26 @@ public class TestHBaseTimelineStorage {
metrics.add(m1);
entity.addMetrics(metrics);
+ // add aggregated metrics
+ TimelineEntity aggEntity = new TimelineEntity();
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ aggEntity.setId(appId);
+ aggEntity.setType(type);
+ long cTime2 = 1425016502000L;
+ long mTime2 = 1425026902000L;
+ aggEntity.setCreatedTime(cTime2);
+
+ TimelineMetric aggMetric = new TimelineMetric();
+ aggMetric.setId("MEM_USAGE");
+ Map<Long, Number> aggMetricValues = new HashMap<Long, Number>();
+ ts = System.currentTimeMillis();
+ aggMetricValues.put(ts - 120000, 102400000);
+ aggMetric.setType(Type.SINGLE_VALUE);
+ aggMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+ aggMetric.setValues(aggMetricValues);
+ Set<TimelineMetric> aggMetrics = new HashSet<>();
+ aggMetrics.add(aggMetric);
+ entity.addMetrics(aggMetrics);
te.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
@@ -564,7 +585,7 @@ public class TestHBaseTimelineStorage {
Result result = new ApplicationTable().getResult(c1, conn, get);
assertTrue(result != null);
- assertEquals(15, result.size());
+ assertEquals(16, result.size());
// check the row key
byte[] row1 = result.getRow();
@@ -652,10 +673,17 @@ public class TestHBaseTimelineStorage {
assertEquals(conf, conf2);
Set<TimelineMetric> metrics2 = e1.getMetrics();
- assertEquals(metrics, metrics2);
+ assertEquals(2, metrics2.size());
for (TimelineMetric metric2 : metrics2) {
Map<Long, Number> metricValues2 = metric2.getValues();
- matchMetrics(metricValues, metricValues2);
+ assertTrue(metric2.getId().equals("MAP_SLOT_MILLIS") ||
+ metric2.getId().equals("MEM_USAGE"));
+ if (metric2.getId().equals("MAP_SLOT_MILLIS")) {
+ matchMetrics(metricValues, metricValues2);
+ }
+ if (metric2.getId().equals("MEM_USAGE")) {
+ matchMetrics(aggMetricValues, metricValues2);
+ }
}
} finally {
if (hbi != null) {
@@ -724,7 +752,6 @@ public class TestHBaseTimelineStorage {
m1.setValues(metricValues);
metrics.add(m1);
entity.addMetrics(metrics);
-
te.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org