You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/01/08 18:19:49 UTC
[23/28] ambari git commit: AMBARI-22740 : Fix integration test for
HBase in branch-3.0-ams due to UUID changes. (avijayan)
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
new file mode 100644
index 0000000..69122f9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.metrics.core.timeline.TimelineMetricDistributedCache;
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
+
+public class TimelineMetricClusterAggregatorSecondWithCacheSource extends TimelineMetricClusterAggregatorSecond {
+ private TimelineMetricDistributedCache distributedCache;
+ public TimelineMetricClusterAggregatorSecondWithCacheSource(AggregationTaskRunner.AGGREGATOR_NAME metricAggregateSecond, TimelineMetricMetadataManager metricMetadataManager, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, long sleepIntervalMillis, int checkpointCutOffMultiplier, String aggregatorDisabledParam, String inputTableName, String outputTableName,
+ Long nativeTimeRangeDelay,
+ Long timeSliceInterval,
+ MetricCollectorHAController haController, TimelineMetricDistributedCache distributedCache) {
+ super(metricAggregateSecond, metricMetadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, inputTableName, outputTableName, nativeTimeRangeDelay, timeSliceInterval, haController);
+ this.distributedCache = distributedCache;
+ }
+
+ @Override
+ public boolean doWork(long startTime, long endTime) {
+ LOG.info("Start aggregation cycle @ " + new Date() + ", " +
+ "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
+ try {
+ Map<String, Double> caheMetrics;
+ if (LOG.isDebugEnabled()) {
+ caheMetrics = distributedCache.getPointInTimeCacheMetrics();
+ LOG.debug("Ignite metrics before eviction : " + caheMetrics);
+ }
+
+ LOG.info("Trying to evict elements from cache");
+ Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = distributedCache.evictMetricAggregates(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment);
+ LOG.info(String.format("Evicted %s elements from cache.", metricsFromCache.size()));
+
+ if (LOG.isDebugEnabled()) {
+ caheMetrics = distributedCache.getPointInTimeCacheMetrics();
+ LOG.debug("Ignite metrics after eviction : " + caheMetrics);
+ }
+
+ List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, timeSliceIntervalMillis);
+ Map<TimelineClusterMetric, MetricClusterAggregate> result = aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeSlices);
+
+ LOG.info("Saving " + result.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterAggregateRecords(result);
+ LOG.info("End aggregation cycle @ " + new Date());
+ return true;
+ } catch (Exception e) {
+ LOG.error("Exception during aggregation. ", e);
+ return false;
+ }
+ }
+
+ //Slices in cache could be different from aggregate slices, so need to recalculate. Counts hosted apps
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromMetricClusterAggregates(Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache, List<Long[]> timeSlices) {
+ //TODO add basic interpolation
+ //TODO investigate if needed, maybe add config to disable/enable
+ //count hosted apps
+ Map<String, MutableInt> hostedAppCounter = new HashMap<>();
+ for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> clusterMetricAggregateEntry : metricsFromCache.entrySet()) {
+ int numHosts = clusterMetricAggregateEntry.getValue().getNumberOfHosts();
+ String appId = clusterMetricAggregateEntry.getKey().getAppId();
+ if (!hostedAppCounter.containsKey(appId)) {
+ hostedAppCounter.put(appId, new MutableInt(numHosts));
+ } else {
+ int currentHostCount = hostedAppCounter.get(appId).intValue();
+ if (currentHostCount < numHosts) {
+ hostedAppCounter.put(appId, new MutableInt(numHosts));
+ }
+ }
+ }
+
+ // Add liveHosts per AppId metrics.
+ processLiveAppCountMetrics(metricsFromCache, hostedAppCounter, timeSlices.get(timeSlices.size() - 1)[1]);
+
+ return metricsFromCache;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
new file mode 100644
index 0000000..371d9fa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+
+public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAggregator {
+ private static final Log LOG = LogFactory.getLog(TimelineMetricFilteringHostAggregator.class);
+ private TimelineMetricMetadataManager metricMetadataManager;
+ private ConcurrentHashMap<String, Long> postedAggregatedMap;
+
+ public TimelineMetricFilteringHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName,
+ TimelineMetricMetadataManager metricMetadataManager,
+ PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String hostAggregatorDisabledParam,
+ String tableName,
+ String outputTableName,
+ Long nativeTimeRangeDelay,
+ MetricCollectorHAController haController,
+ ConcurrentHashMap<String, Long> postedAggregatedMap) {
+ super(aggregatorName, metricMetadataManager,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ tableName,
+ outputTableName,
+ nativeTimeRangeDelay,
+ haController);
+ this.metricMetadataManager = metricMetadataManager;
+ this.postedAggregatedMap = postedAggregatedMap;
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ List<String> aggregatedHostnames = new ArrayList<>();
+ for (Map.Entry<String, Long> entry : postedAggregatedMap.entrySet()) {
+ if (entry.getValue() > startTime && entry.getValue() <= endTime) {
+ aggregatedHostnames.add(entry.getKey());
+ }
+ }
+ List<String> notAggregatedHostnames = metricMetadataManager.getNotLikeHostnames(aggregatedHostnames);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Already aggregated hostnames based on postedAggregatedMap : " + aggregatedHostnames);
+ LOG.debug("Hostnames that will be aggregated : " + notAggregatedHostnames);
+ }
+ List<byte[]> uuids = metricMetadataManager.getUuids(new ArrayList<String>(), notAggregatedHostnames, "", "");
+
+ Condition condition = new DefaultCondition(uuids, null, null, null, null, startTime,
+ endTime, null, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL, tableName));
+ // Retaining order of the row-key avoids client side merge sort.
+ condition.addOrderByColumn("UUID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
new file mode 100644
index 0000000..c25d6ce
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+
+public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
+ private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class);
+ TimelineMetricReadHelper readHelper;
+
+ public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
+ TimelineMetricMetadataManager metricMetadataManager,
+ PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String hostAggregatorDisabledParam,
+ String tableName,
+ String outputTableName,
+ Long nativeTimeRangeDelay,
+ MetricCollectorHAController haController) {
+ super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+ sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
+ tableName, outputTableName, nativeTimeRangeDelay, haController);
+ readHelper = new TimelineMetricReadHelper(metricMetadataManager, false);
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
+
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+ hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName);
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ endTime, null, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL, tableName));
+ // Retaining order of the row-key avoids client side merge sort.
+ condition.addOrderByColumn("UUID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+
+ private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime)
+ throws IOException, SQLException {
+ TimelineMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = new HashMap<TimelineMetric, MetricHostAggregate>();
+
+
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ readHelper.getTimelineMetricKeyFromResultSet(rs);
+ MetricHostAggregate currentHostAggregate =
+ readHelper.getMetricHostAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ currentMetric.setStartTime(endTime);
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ }
+
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ hostAggregate.updateAggregates(currentHostAggregate);
+ } else {
+ // Switched over to a new metric - save existing - create new aggregate
+ currentMetric.setStartTime(endTime);
+ hostAggregate = new MetricHostAggregate();
+ hostAggregate.updateAggregates(currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
+ }
+ }
+ return hostAggregateMap;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricReadHelper.java
new file mode 100644
index 0000000..a56c7aa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricReadHelper.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.TreeMap;
+
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+
+public class TimelineMetricReadHelper {
+
+ private boolean ignoreInstance = false;
+ private TimelineMetricMetadataManager metadataManagerInstance = null;
+
+ public TimelineMetricReadHelper() {}
+
+ public TimelineMetricReadHelper(boolean ignoreInstance) {
+ this.ignoreInstance = ignoreInstance;
+ }
+
+ public TimelineMetricReadHelper(TimelineMetricMetadataManager timelineMetricMetadataManager) {
+ this.metadataManagerInstance = timelineMetricMetadataManager;
+ }
+
+ public TimelineMetricReadHelper(TimelineMetricMetadataManager timelineMetricMetadataManager, boolean ignoreInstance) {
+ this.metadataManagerInstance = timelineMetricMetadataManager;
+ this.ignoreInstance = ignoreInstance;
+ }
+
+ public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
+ throws SQLException, IOException {
+ TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
+ TreeMap<Long, Double> sortedByTimeMetrics = PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS"));
+ metric.setMetricValues(sortedByTimeMetrics);
+ return metric;
+ }
+
+ public SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(ResultSet rs,
+ Function f) throws SQLException, IOException {
+
+ byte[] uuid = rs.getBytes("UUID");
+ TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid);
+ Function function = (f != null) ? f : Function.DEFAULT_VALUE_FUNCTION;
+ SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
+ timelineMetric.getMetricName() + function.getSuffix(),
+ timelineMetric.getAppId(),
+ timelineMetric.getInstanceId(),
+ timelineMetric.getHostName(),
+ rs.getLong("SERVER_TIME")
+ );
+
+ double value;
+ switch(function.getReadFunction()){
+ case AVG:
+ value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+ break;
+ case MIN:
+ value = rs.getDouble("METRIC_MIN");
+ break;
+ case MAX:
+ value = rs.getDouble("METRIC_MAX");
+ break;
+ case SUM:
+ value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+ break;
+ default:
+ value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+ break;
+ }
+
+ metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value);
+
+ return metric;
+ }
+
+ /**
+ * Returns common part of timeline metrics record without the values.
+ */
+ public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs)
+ throws SQLException {
+
+ byte[] uuid = rs.getBytes("UUID");
+ TimelineMetric metric = metadataManagerInstance.getMetricFromUuid(uuid);
+ if (ignoreInstance) {
+ metric.setInstanceId(null);
+ }
+ metric.setStartTime(rs.getLong("SERVER_TIME"));
+ return metric;
+ }
+
+ public MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs)
+ throws SQLException {
+ MetricClusterAggregate agg = new MetricClusterAggregate();
+ agg.setSum(rs.getDouble("METRIC_SUM"));
+ agg.setMax(rs.getDouble("METRIC_MAX"));
+ agg.setMin(rs.getDouble("METRIC_MIN"));
+ agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT"));
+
+ agg.setDeviation(0.0);
+
+ return agg;
+ }
+
+ public MetricClusterAggregate getMetricClusterTimeAggregateFromResultSet(ResultSet rs)
+ throws SQLException {
+ MetricClusterAggregate agg = new MetricClusterAggregate();
+ agg.setSum(rs.getDouble("METRIC_SUM"));
+ agg.setMax(rs.getDouble("METRIC_MAX"));
+ agg.setMin(rs.getDouble("METRIC_MIN"));
+ agg.setNumberOfHosts(rs.getInt("METRIC_COUNT"));
+
+ agg.setDeviation(0.0);
+
+ return agg;
+ }
+
+ public TimelineClusterMetric fromResultSet(ResultSet rs) throws SQLException {
+
+ byte[] uuid = rs.getBytes("UUID");
+ TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid);
+
+ return new TimelineClusterMetric(
+ timelineMetric.getMetricName(),
+ timelineMetric.getAppId(),
+ ignoreInstance ? null : timelineMetric.getInstanceId(),
+ rs.getLong("SERVER_TIME"));
+ }
+
+ public MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
+ throws SQLException {
+ MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+ metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
+ metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+ metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+ metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+
+ metricHostAggregate.setDeviation(0.0);
+ return metricHostAggregate;
+ }
+
+ public TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
+ throws SQLException, IOException {
+ byte[] uuid = rs.getBytes("UUID");
+ return metadataManagerInstance.getMetricFromUuid(uuid);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TopNDownSampler.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TopNDownSampler.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TopNDownSampler.java
new file mode 100644
index 0000000..e28d465
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TopNDownSampler.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.metrics.core.timeline.query.TopNCondition;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
+
+public class TopNDownSampler implements CustomDownSampler {
+
+ private TopNConfig topNConfig;
+ private static final Log LOG = LogFactory.getLog(TopNDownSampler.class);
+ protected String metricPatterns;
+
+ public static TopNDownSampler fromConfig(Map<String, String> conf) {
+ String metricPatterns = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn." +
+ DownSamplerUtils.downSamplerMetricPatternsConfig);
+
+ String topNString = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn.value");
+ Integer topNValue = topNString != null ? Integer.valueOf(topNString) : 10;
+ String topNFunction = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn.function");
+
+ return new TopNDownSampler(new TopNConfig(topNValue, topNFunction, false), metricPatterns);
+ }
+
+ public TopNDownSampler(TopNConfig topNConfig, String metricPatterns) {
+ this.topNConfig = topNConfig;
+ this.metricPatterns = metricPatterns;
+ }
+
+ @Override
+ public boolean validateConfigs() {
+ if (topNConfig == null) {
+ return false;
+ }
+
+ if (topNConfig.getTopN() <= 0) {
+ return false;
+ }
+
+ if (StringUtils.isEmpty(metricPatterns)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Prepare downsampling SELECT statement(s) used to determine the data to be written into the Aggregate table.
+ * @param startTime
+ * @param endTime
+ * @param tableName
+ * @return
+ */
+ @Override
+ public List<String> prepareDownSamplingStatement(Long startTime, Long endTime, String tableName) {
+ List<String> stmts = new ArrayList<>();
+
+ Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction());
+ Function function = new Function(readFunction, null);
+ String columnSelect = TopNCondition.getColumnSelect(function);
+
+ List<String> metricPatternList = Arrays.asList(metricPatterns.split(","));
+
+ for (String metricPattern : metricPatternList) {
+ String metricPatternClause = "'" + metricPattern + "'";
+ //TODO : Need a better way to find out what kind of aggregation the current one is.
+ if (tableName.contains("RECORD")) {
+ stmts.add(String.format(TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL,
+ endTime, columnSelect, columnSelect, columnSelect, tableName, metricPatternClause,
+ startTime, endTime, columnSelect, topNConfig.getTopN()));
+ } else {
+ stmts.add(String.format(TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL,
+ endTime, columnSelect, columnSelect, columnSelect, tableName, metricPatternClause,
+ startTime, endTime, columnSelect, topNConfig.getTopN()));
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DownSampling Stmt: " + stmts.toString());
+ }
+
+ return stmts;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
new file mode 100644
index 0000000..9c255e7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators.v2;
+
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.query.EmptyCondition;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+
+public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+ private final String aggregateColumnName;
+
+ public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
+ PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String hostAggregatorDisabledParam,
+ String inputTableName,
+ String outputTableName,
+ Long nativeTimeRangeDelay,
+ MetricCollectorHAController haController) {
+ super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+ sleepIntervalMillis, checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam, inputTableName, outputTableName,
+ nativeTimeRangeDelay, haController);
+
+ if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) {
+ aggregateColumnName = "HOSTS_COUNT";
+ } else {
+ aggregateColumnName = "METRIC_COUNT";
+ }
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ EmptyCondition condition = new EmptyCondition();
+ condition.setDoUpdate(true);
+
+ /*
+ UPSERT INTO METRIC_AGGREGATE_HOURLY (METRIC_NAME, APP_ID, INSTANCE_ID,
+ SERVER_TIME, UNITS, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN)
+ SELECT METRIC_NAME, APP_ID, INSTANCE_ID, MAX(SERVER_TIME), UNITS,
+ SUM(METRIC_SUM), SUM(HOSTS_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN)
+ FROM METRIC_AGGREGATE WHERE SERVER_TIME >= 1441155600000 AND
+ SERVER_TIME < 1441159200000 GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS;
+ */
+
+ condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL,
+ outputTableName, endTime, aggregateColumnName, tableName,
+ getDownsampledMetricSkipClause(), startTime, endTime));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Condition: " + condition.toString());
+ }
+
+ return condition;
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+ LOG.info("Aggregated cluster metrics for " + outputTableName +
+ ", with startTime = " + new Date(startTime) +
+ ", endTime = " + new Date(endTime));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
new file mode 100644
index 0000000..1026cbe
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators.v2;
+
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.query.EmptyCondition;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+
+public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAggregator {
+ private TimelineMetricMetadataManager metricMetadataManager;
+ private ConcurrentHashMap<String, Long> postedAggregatedMap;
+
+ public TimelineMetricFilteringHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName,
+ TimelineMetricMetadataManager metricMetadataManager,
+ PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String hostAggregatorDisabledParam,
+ String tableName,
+ String outputTableName,
+ Long nativeTimeRangeDelay,
+ MetricCollectorHAController haController,
+ ConcurrentHashMap<String, Long> postedAggregatedMap) {
+ super(aggregatorName,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ tableName,
+ outputTableName,
+ nativeTimeRangeDelay,
+ haController);
+ this.metricMetadataManager = metricMetadataManager;
+ this.postedAggregatedMap = postedAggregatedMap;
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ List<String> aggregatedHostnames = new ArrayList<>();
+ for (Map.Entry<String, Long> entry : postedAggregatedMap.entrySet()) {
+ if (entry.getValue() > startTime && entry.getValue() <= endTime) {
+ aggregatedHostnames.add(entry.getKey());
+ }
+ }
+ List<String> notAggregatedHostnames = metricMetadataManager.getNotLikeHostnames(aggregatedHostnames);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Already aggregated hostnames based on postedAggregatedMap : " + aggregatedHostnames);
+ LOG.debug("Hostnames that will be aggregated : " + notAggregatedHostnames);
+ }
+ List<byte[]> uuids = metricMetadataManager.getUuids(new ArrayList<String>(), notAggregatedHostnames, "", "");
+
+ EmptyCondition condition = new EmptyCondition();
+ condition.setDoUpdate(true);
+
+ condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
+ outputTableName, endTime, tableName,
+ getDownsampledMetricSkipClause() + getIncludedUuidsClause(uuids), startTime, endTime));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Condition: " + condition.toString());
+ }
+
+ return condition;
+ }
+
+ private String getIncludedUuidsClause(List<byte[]> uuids) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("(");
+
+ //LIKE clause
+ // (UUID LIKE ? OR UUID LIKE ?) AND
+ if (CollectionUtils.isNotEmpty(uuids)) {
+ for (int i = 0; i < uuids.size(); i++) {
+ sb.append("UUID");
+ sb.append(" LIKE ");
+ sb.append("'%");
+ sb.append(new String(uuids.get(i)));
+ sb.append("'");
+
+ if (i == uuids.size() - 1) {
+ sb.append(") AND ");
+ } else {
+ sb.append(" OR ");
+ }
+ }
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
new file mode 100644
index 0000000..9e8df6d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators.v2;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.EmptyCondition;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+
+public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
+
+ public TimelineMetricHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName,
+ PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String hostAggregatorDisabledParam,
+ String tableName,
+ String outputTableName,
+ Long nativeTimeRangeDelay,
+ MetricCollectorHAController haController) {
+ super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+ sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
+ tableName, outputTableName, nativeTimeRangeDelay, haController);
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+
+ LOG.info("Aggregated host metrics for " + outputTableName +
+ ", with startTime = " + new Date(startTime) +
+ ", endTime = " + new Date(endTime));
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ EmptyCondition condition = new EmptyCondition();
+ condition.setDoUpdate(true);
+
+ condition.setStatement(String.format(PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
+ outputTableName, endTime, tableName,
+ getDownsampledMetricSkipClause(), startTime, endTime));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Condition: " + condition.toString());
+ }
+
+ return condition;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/AggregationTaskRunner.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/AggregationTaskRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/AggregationTaskRunner.java
new file mode 100644
index 0000000..9a27d55
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/AggregationTaskRunner.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.availability;
+
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.participant.StateMachineEngine;
+
+public class AggregationTaskRunner {
+ private final String instanceName;
+ private final String zkAddress;
+ private final String clusterName;
+ private HelixManager manager;
+ private static final Log LOG = LogFactory.getLog(AggregationTaskRunner.class);
+ private CheckpointManager checkpointManager;
+ // Map partition name to an aggregator dimension
+ static final Map<String, TimelineMetricAggregator.AGGREGATOR_TYPE> PARTITION_AGGREGATION_TYPES = new HashMap<>();
+ // Ownership flags to be set by the State transitions
+ private final AtomicBoolean performsClusterAggregation = new AtomicBoolean(false);
+ private final AtomicBoolean performsHostAggregation = new AtomicBoolean(false);
+
+ public enum AGGREGATOR_NAME {
+ METRIC_RECORD_MINUTE,
+ METRIC_RECORD_HOURLY,
+ METRIC_RECORD_DAILY,
+ METRIC_AGGREGATE_SECOND,
+ METRIC_AGGREGATE_MINUTE,
+ METRIC_AGGREGATE_HOURLY,
+ METRIC_AGGREGATE_DAILY,
+ }
+
+ public static final Map<AGGREGATOR_NAME, String> ACTUAL_AGGREGATOR_NAMES = new HashMap<>();
+
+ static {
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_MINUTE, "TimelineMetricHostAggregatorMinute");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_HOURLY, "TimelineMetricHostAggregatorHourly");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_DAILY, "TimelineMetricHostAggregatorDaily");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_SECOND, "TimelineClusterAggregatorSecond");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_MINUTE, "TimelineClusterAggregatorMinute");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_HOURLY, "TimelineClusterAggregatorHourly");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_DAILY, "TimelineClusterAggregatorDaily");
+
+ // Partition name to task assignment
+ PARTITION_AGGREGATION_TYPES.put(MetricCollectorHAController.METRIC_AGGREGATORS + "_0", TimelineMetricAggregator.AGGREGATOR_TYPE.CLUSTER);
+ PARTITION_AGGREGATION_TYPES.put(MetricCollectorHAController.METRIC_AGGREGATORS + "_1", TimelineMetricAggregator.AGGREGATOR_TYPE.HOST);
+ }
+
+ public AggregationTaskRunner(String instanceName, String zkAddress, String clusterName) {
+ this.instanceName = instanceName;
+ this.zkAddress = zkAddress;
+ this.clusterName = clusterName;
+ }
+
+ public void initialize() throws Exception {
+ manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+ InstanceType.PARTICIPANT, zkAddress);
+
+ OnlineOfflineStateModelFactory stateModelFactory =
+ new OnlineOfflineStateModelFactory(instanceName, this);
+
+ StateMachineEngine stateMach = manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory(MetricCollectorHAController.DEFAULT_STATE_MODEL, stateModelFactory);
+ manager.connect();
+
+ checkpointManager = new CheckpointManager(manager.getHelixPropertyStore());
+ }
+
+ public boolean performsClusterAggregation() {
+ return performsClusterAggregation.get();
+ }
+
+ public boolean performsHostAggregation() {
+ return performsHostAggregation.get();
+ }
+
+ public CheckpointManager getCheckpointManager() {
+ return checkpointManager;
+ }
+
+ public void setPartitionAggregationFunction(TimelineMetricAggregator.AGGREGATOR_TYPE type) {
+ switch (type) {
+ case HOST:
+ performsHostAggregation.set(true);
+ LOG.info("Set host aggregator function for : " + instanceName);
+ break;
+ case CLUSTER:
+ performsClusterAggregation.set(true);
+ LOG.info("Set cluster aggregator function for : " + instanceName);
+ }
+ }
+
+ public void unsetPartitionAggregationFunction(TimelineMetricAggregator.AGGREGATOR_TYPE type) {
+ switch (type) {
+ case HOST:
+ performsHostAggregation.set(false);
+ LOG.info("Unset host aggregator function for : " + instanceName);
+ break;
+ case CLUSTER:
+ performsClusterAggregation.set(false);
+ LOG.info("Unset cluster aggregator function for : " + instanceName);
+ }
+ }
+
+ /**
+ * Disconnect participant before controller shutdown
+ */
+ void stop() {
+ manager.disconnect();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/CheckpointManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/CheckpointManager.java
new file mode 100644
index 0000000..868fb93
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/CheckpointManager.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.availability;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.data.Stat;
+
+public class CheckpointManager {
+ private final ZkHelixPropertyStore<ZNRecord> propertyStore;
+ private static final Log LOG = LogFactory.getLog(CheckpointManager.class);
+
+ static final String ZNODE_FIELD = "checkpoint";
+ static final String CHECKPOINT_PATH_PREFIX = "CHECKPOINTS";
+
+ public CheckpointManager(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ this.propertyStore = propertyStore;
+ }
+
+ /**
+ * Read aggregator checkpoint from zookeeper
+ *
+ * @return timestamp
+ */
+ public long readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName) {
+ String path = getCheckpointZKPath(aggregatorName);
+ LOG.debug("Reading checkpoint at " + path);
+ Stat stat = new Stat();
+ ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stat => " + stat);
+ }
+ long checkpoint = znRecord != null ? znRecord.getLongField(ZNODE_FIELD, -1) : -1;
+ LOG.debug("Checkpoint value = " + checkpoint);
+ return checkpoint;
+ }
+
+ /**
+ * Write aggregator checkpoint in zookeeper
+ *
+ * @param value timestamp
+ * @return sucsess
+ */
+ public boolean writeCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName, long value) {
+ String path = getCheckpointZKPath(aggregatorName);
+ LOG.debug(String.format("Saving checkpoint at %s with value %s", path, value));
+ return propertyStore.update(path, new CheckpointDataUpdater(path, value), AccessOption.PERSISTENT);
+ }
+
+ static class CheckpointDataUpdater implements DataUpdater<ZNRecord> {
+ final String path;
+ final Long value;
+
+ public CheckpointDataUpdater(String path, Long value) {
+ this.path = path;
+ this.value = value;
+ }
+
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ currentData = new ZNRecord(path);
+ }
+ currentData.setLongField(ZNODE_FIELD, value);
+ return currentData;
+ }
+ }
+
+ String getCheckpointZKPath(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName) {
+ StringBuilder sb = new StringBuilder("/");
+ sb.append(CHECKPOINT_PATH_PREFIX);
+ sb.append("/");
+ sb.append(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
new file mode 100644
index 0000000..ee28d87
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.availability;
+
+import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ambari.metrics.core.timeline.MetricsSystemInitializationException;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+
+import com.google.common.base.Joiner;
+
+;
+
+public class MetricCollectorHAController {
+ private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class);
+
+ static final String CLUSTER_NAME = "ambari-metrics-cluster";
+ static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
+ static final String DEFAULT_STATE_MODEL = OnlineOfflineSMD.name;
+ static final String INSTANCE_NAME_DELIMITER = "_";
+
+ final String zkConnectUrl;
+ final String instanceHostname;
+ final InstanceConfig instanceConfig;
+ final AggregationTaskRunner aggregationTaskRunner;
+ final TimelineMetricConfiguration configuration;
+
+ // Cache list of known live instances
+ final List<String> liveInstanceNames = new ArrayList<>();
+
+ // Helix Admin
+ HelixAdmin admin;
+ // Helix Manager
+ HelixManager manager;
+
+ private volatile boolean isInitialized = false;
+
+ public MetricCollectorHAController(TimelineMetricConfiguration configuration) {
+ this.configuration = configuration;
+ String instancePort;
+ try {
+ instanceHostname = configuration.getInstanceHostnameFromEnv();
+ instancePort = configuration.getInstancePort();
+
+ } catch (Exception e) {
+ LOG.error("Error reading configs from classpath, will resort to defaults.", e);
+ throw new MetricsSystemInitializationException(e.getMessage());
+ }
+
+ try {
+ String zkClientPort = configuration.getClusterZKClientPort();
+ String zkQuorum = configuration.getClusterZKQuorum();
+
+ if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) {
+ throw new Exception("Unable to parse zookeeper quorum. clientPort = "
+ + zkClientPort +", quorum = " + zkQuorum);
+ }
+
+ zkConnectUrl = configuration.getZkConnectionUrl(zkClientPort, zkQuorum);
+
+ } catch (Exception e) {
+ LOG.error("Unable to load hbase-site from classpath.", e);
+ throw new MetricsSystemInitializationException(e.getMessage());
+ }
+
+ instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort);
+ instanceConfig.setHostName(instanceHostname);
+ instanceConfig.setPort(instancePort);
+ instanceConfig.setInstanceEnabled(true);
+ aggregationTaskRunner = new AggregationTaskRunner(
+ instanceConfig.getInstanceName(), zkConnectUrl, getClusterName());
+ }
+
+ /**
+ * Name of Helix znode
+ */
+ public String getClusterName() {
+ return CLUSTER_NAME;
+ }
+
+ /**
+ * Initialize the instance with zookeeper via Helix
+ */
+ public void initializeHAController() throws Exception {
+ String clusterName = getClusterName();
+ admin = new ZKHelixAdmin(zkConnectUrl);
+ // create cluster
+ LOG.info("Creating zookeeper cluster node: " + clusterName);
+ boolean clusterAdded = admin.addCluster(clusterName, false);
+ LOG.info("Was cluster added successfully? " + clusterAdded);
+
+ // Adding host to the cluster
+ boolean success = false;
+ int tries = 5;
+ int sleepTimeInSeconds = 5;
+
+ for (int i = 0; i < tries && !success; i++) {
+ try {
+ List<String> nodes = admin.getInstancesInCluster(clusterName);
+ if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
+ LOG.info("Adding participant instance " + instanceConfig);
+ admin.addInstance(clusterName, instanceConfig);
+ }
+ success = true;
+ } catch (HelixException | ZkNoNodeException ex) {
+ LOG.warn("Helix Cluster not yet setup fully.");
+ if (i < tries - 1) {
+ LOG.info("Waiting for " + sleepTimeInSeconds + " seconds and retrying.");
+ TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
+ } else {
+ LOG.error(ex);
+ }
+ }
+ }
+
+ if (!success) {
+ LOG.info("Trying to create " + clusterName + " again since waiting for the creation did not help.");
+ admin.addCluster(clusterName, true);
+ List<String> nodes = admin.getInstancesInCluster(clusterName);
+ if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
+ LOG.info("Adding participant instance " + instanceConfig);
+ admin.addInstance(clusterName, instanceConfig);
+ }
+ }
+
+ // Add a state model
+ if (admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) {
+ LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
+ admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition(
+ StateModelConfigGenerator.generateConfigForOnlineOffline()));
+ }
+
+ // Add resources with 1 cluster-wide replica
+ // Since our aggregators are unbalanced in terms of work distribution we
+ // only need to distribute writes to METRIC_AGGREGATE and
+ // METRIC_RECORD_MINUTE
+ List<String> resources = admin.getResourcesInCluster(clusterName);
+ if (!resources.contains(METRIC_AGGREGATORS)) {
+ LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas");
+ admin.addResource(clusterName, METRIC_AGGREGATORS, 2, DEFAULT_STATE_MODEL, FULL_AUTO.toString());
+ }
+ // this will set up the ideal state, it calculates the preference list for
+ // each partition similar to consistent hashing
+ admin.rebalance(clusterName, METRIC_AGGREGATORS, 1);
+
+ // Start participant
+ startAggregators();
+
+ // Start controller
+ startController();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ aggregationTaskRunner.stop();
+ manager.disconnect();
+ }
+ });
+
+ isInitialized = true;
+ }
+
+ /**
+ * Return true if HA controller is enabled.
+ */
+ public boolean isInitialized() {
+ return isInitialized;
+ }
+
+ private void startAggregators() {
+ try {
+ aggregationTaskRunner.initialize();
+
+ } catch (Exception e) {
+ LOG.error("Unable to start aggregators.", e);
+ throw new MetricsSystemInitializationException(e.getMessage());
+ }
+ }
+
+ private void startController() throws Exception {
+ manager = HelixManagerFactory.getZKHelixManager(
+ getClusterName(),
+ instanceHostname,
+ InstanceType.CONTROLLER,
+ zkConnectUrl
+ );
+
+ manager.connect();
+ HelixController controller = new HelixController();
+ manager.addLiveInstanceChangeListener(controller);
+ }
+
+ public AggregationTaskRunner getAggregationTaskRunner() {
+ return aggregationTaskRunner;
+ }
+
+ public List<String> getLiveInstanceHostNames() {
+ List<String> liveInstanceHostNames = new ArrayList<>();
+
+ for (String instance : liveInstanceNames) {
+ liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
+ }
+
+ return liveInstanceHostNames;
+ }
+
+ public class HelixController extends GenericHelixController {
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ Joiner joiner = Joiner.on(", ").skipNulls();
+
+ @Override
+ public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
+ super.onLiveInstanceChange(liveInstances, changeContext);
+
+ liveInstanceNames.clear();
+ for (LiveInstance instance : liveInstances) {
+ liveInstanceNames.add(instance.getInstanceName());
+ }
+
+ LOG.info("Detected change in liveliness of Collector instances. " +
+ "LiveIsntances = " + joiner.join(liveInstanceNames));
+ // Print HA state - after some delay
+ executorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ printClusterState();
+ }
+ }, 30, TimeUnit.SECONDS);
+
+
+ }
+ }
+
+ public void printClusterState() {
+ StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################");
+
+ ExternalView resourceExternalView = admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS);
+ if (resourceExternalView != null) {
+ getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
+ }
+ sb.append("\n##################################################");
+ LOG.info(sb.toString());
+ }
+
+ private void getPrintableResourceState(ExternalView resourceExternalView,
+ String resourceName,
+ StringBuilder sb) {
+ TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet());
+ sb.append("\nCLUSTER: ");
+ sb.append(getClusterName());
+ sb.append("\nRESOURCE: ");
+ sb.append(resourceName);
+ for (String partitionName : sortedSet) {
+ sb.append("\nPARTITION: ");
+ sb.append(partitionName).append("\t");
+ Map<String, String> states = resourceExternalView.getStateMap(partitionName);
+ for (Map.Entry<String, String> stateEntry : states.entrySet()) {
+ sb.append("\t");
+ sb.append(stateEntry.getKey());
+ sb.append("\t");
+ sb.append(stateEntry.getValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/OnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/OnlineOfflineStateModelFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/OnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000..78a3199
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/OnlineOfflineStateModelFactory.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.availability;
+
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.PARTITION_AGGREGATION_TYPES;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+ private static final Log LOG = LogFactory.getLog(OnlineOfflineStateModelFactory.class);
+ private final String instanceName;
+ private final AggregationTaskRunner taskRunner;
+
+ public OnlineOfflineStateModelFactory(String instanceName, AggregationTaskRunner taskRunner) {
+ this.instanceName = instanceName;
+ this.taskRunner = taskRunner;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String partition) {
+ LOG.info("Received request to process partition = " + partition + ", for " +
+ "resource = " + resourceName + ", at " + instanceName);
+ return new OnlineOfflineStateModel();
+ }
+
+ public class OnlineOfflineStateModel extends StateModel {
+ public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+ String partitionName = message.getPartitionName();
+ LOG.info("Received transition to Online from Offline for partition: " + partitionName);
+ TimelineMetricAggregator.AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+ taskRunner.setPartitionAggregationFunction(type);
+ }
+
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+ String partitionName = message.getPartitionName();
+ LOG.info("Received transition to Offline from Online for partition: " + partitionName);
+ TimelineMetricAggregator.AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+ taskRunner.unsetPartitionAggregationFunction(type);
+ }
+
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+ String partitionName = message.getPartitionName();
+ LOG.info("Received transition to Dropped from Offline for partition: " + partitionName);
+ TimelineMetricAggregator.AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+ taskRunner.unsetPartitionAggregationFunction(type);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricHostMetadata.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricHostMetadata.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricHostMetadata.java
new file mode 100644
index 0000000..7645bd0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricHostMetadata.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.discovery;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TimelineMetricHostMetadata {
+ //need concurrent data structure, only keys are used.
+ private ConcurrentHashMap<String, String> hostedApps = new ConcurrentHashMap<>();
+ private byte[] uuid;
+
+ // Default constructor
+ public TimelineMetricHostMetadata() {
+ }
+
+ public TimelineMetricHostMetadata(ConcurrentHashMap<String, String> hostedApps) {
+ this.hostedApps = hostedApps;
+ }
+
+ public TimelineMetricHostMetadata(Set<String> hostedApps) {
+ ConcurrentHashMap<String, String> appIdsMap = new ConcurrentHashMap<>();
+ for (String appId : hostedApps) {
+ appIdsMap.put(appId, appId);
+ }
+ this.hostedApps = appIdsMap;
+ }
+
+ public ConcurrentHashMap<String, String> getHostedApps() {
+ return hostedApps;
+ }
+
+ public void setHostedApps(ConcurrentHashMap<String, String> hostedApps) {
+ this.hostedApps = hostedApps;
+ }
+
+ public byte[] getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(byte[] uuid) {
+ this.uuid = uuid;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataKey.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataKey.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataKey.java
new file mode 100644
index 0000000..d308ce1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataKey.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.discovery;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.lang3.StringUtils;
+
+@XmlRootElement
+public class TimelineMetricMetadataKey {
+ String metricName;
+ String appId;
+ String instanceId;
+
+ public TimelineMetricMetadataKey(String metricName, String appId, String instanceId) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimelineMetricMetadataKey that = (TimelineMetricMetadataKey) o;
+
+ if (!metricName.equals(that.metricName)) return false;
+ if (!appId.equals(that.appId)) return false;
+ return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId));
+ }
+
+ @Override
+ public int hashCode() {
+ int result = metricName.hashCode();
+ result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineMetricMetadataKey{" +
+ "metricName='" + metricName + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ '}';
+ }
+}