You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2016/06/01 16:47:30 UTC

ambari git commit: AMBARI-16969 : Provide ability in AMS to filter tracked metrics through a whitelist metic file (avijayan)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 476c774a5 -> 78cce8e23


AMBARI-16969 : Provide ability in AMS to filter tracked metrics through a whitelist metic file (avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/78cce8e2
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/78cce8e2
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/78cce8e2

Branch: refs/heads/branch-2.4
Commit: 78cce8e23a09fd61e32383b453385750342d5ded
Parents: 476c774
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Wed Jun 1 09:43:48 2016 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Wed Jun 1 09:43:52 2016 -0700

----------------------------------------------------------------------
 .../timeline/AbstractTimelineMetricsSink.java   |  3 +
 .../timeline/HBaseTimelineMetricStore.java      |  8 +++
 .../metrics/timeline/PhoenixHBaseAccessor.java  | 68 ++++++++++++++------
 .../timeline/TimelineMetricConfiguration.java   | 10 ++-
 .../timeline/aggregators/AggregatorUtils.java   | 41 ++++++++++++
 5 files changed, 110 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/78cce8e2/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 5a716df..1d9dd29 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -97,12 +97,14 @@ public abstract class AbstractTimelineMetricsSink {
       connection.setDoOutput(true);
 
       if (jsonData != null) {
+        LOG.info(jsonData);
         try (OutputStream os = connection.getOutputStream()) {
           os.write(jsonData.getBytes("UTF-8"));
         }
       }
 
       int statusCode = connection.getResponseCode();
+      LOG.info("statusCode:" + statusCode);
 
       if (statusCode != 200) {
         LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
@@ -117,6 +119,7 @@ public abstract class AbstractTimelineMetricsSink {
       failedCollectorConnectionsCounter.set(0);
       return true;
     } catch (IOException ioe) {
+      LOG.info(ioe.getMessage());
       StringBuilder errorMessage =
           new StringBuilder("Unable to connect to collector, " + connectUrl + "\n"
                   + "This exceptions will be ignored for next " + NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS + " times\n");

http://git-wip-us.apache.org/repos/asf/ambari/blob/78cce8e2/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 8b8b796..f1e42e4 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -29,6 +30,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
@@ -50,6 +52,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
 
@@ -90,6 +93,11 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
       // Initialize policies before TTL update
       hBaseAccessor.initPoliciesAndTTL();
 
+      String whitelistFile = metricsConf.get(TIMELINE_METRICS_WHITELIST_FILE, "");
+      if (!StringUtils.isEmpty(whitelistFile)) {
+        AggregatorUtils.populateMetricWhitelistFromFile(whitelistFile);
+      }
+
       defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
       if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
         LOG.info("Using group by aggregators for aggregating host and cluster metrics.");

http://git-wip-us.apache.org/repos/asf/ambari/blob/78cce8e2/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 47962cb..bc5396c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -78,8 +79,9 @@ import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_TABLES_DURABILITY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
@@ -166,6 +168,7 @@ public class PhoenixHBaseAccessor {
   private final int cacheCommitInterval;
   private final boolean skipBlockCacheForAggregatorsEnabled;
   private final String timelineMetricsTablesDurability;
+  private final String timelineMetricsPrecisionTableDurability;
 
   static final String HSTORE_COMPACTION_CLASS_KEY =
     "hbase.hstore.defaultengine.compactionpolicy.class";
@@ -205,7 +208,8 @@ public class PhoenixHBaseAccessor {
     this.cacheCommitInterval = Integer.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "3"));
     this.insertCache = new ArrayBlockingQueue<TimelineMetrics>(cacheSize);
     this.skipBlockCacheForAggregatorsEnabled = metricsConf.getBoolean(AGGREGATORS_SKIP_BLOCK_CACHE, false);
-    this.timelineMetricsTablesDurability = metricsConf.get(TIMELINE_METRICS_TABLES_DURABILITY, "");
+    this.timelineMetricsTablesDurability = metricsConf.get(TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY, "");
+    this.timelineMetricsPrecisionTableDurability = metricsConf.get(TIMELINE_METRICS_PRECISION_TABLE_DURABILITY, "");
 
     tableTTL.put(METRICS_RECORD_TABLE_NAME, metricsConf.get(PRECISION_TABLE_TTL, String.valueOf(1 * 86400)));  // 1 day
     tableTTL.put(CONTAINER_METRICS_TABLE_NAME, metricsConf.get(CONTAINER_METRICS_TTL, String.valueOf(30 * 86400)));  // 30 days
@@ -498,23 +502,45 @@ public class PhoenixHBaseAccessor {
             modifyTable = true;
           }
 
-          if (!timelineMetricsTablesDurability.isEmpty()) {
-            LOG.info("Setting WAL option " + timelineMetricsTablesDurability + " for table : " + tableName);
-            boolean validDurability = true;
-            if ("SKIP_WAL".equals(timelineMetricsTablesDurability)) {
-              tableDescriptor.setDurability(Durability.SKIP_WAL);
-            } else if ("SYNC_WAL".equals(timelineMetricsTablesDurability)) {
-              tableDescriptor.setDurability(Durability.SYNC_WAL);
-            } else if ("ASYNC_WAL".equals(timelineMetricsTablesDurability)) {
-              tableDescriptor.setDurability(Durability.ASYNC_WAL);
-            } else if ("FSYNC_WAL".equals(timelineMetricsTablesDurability)) {
-              tableDescriptor.setDurability(Durability.FSYNC_WAL);
-            } else {
-              LOG.info("Unknown value for " + TIMELINE_METRICS_TABLES_DURABILITY + " : " + timelineMetricsTablesDurability);
-              validDurability = false;
+          if (METRICS_RECORD_TABLE_NAME.equals(tableName)) {
+            if (!timelineMetricsPrecisionTableDurability.isEmpty()) {
+              LOG.info("Setting WAL option " + timelineMetricsPrecisionTableDurability + " for table : " + tableName);
+              boolean validDurability = true;
+              if ("SKIP_WAL".equals(timelineMetricsPrecisionTableDurability)) {
+                tableDescriptor.setDurability(Durability.SKIP_WAL);
+              } else if ("SYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
+                tableDescriptor.setDurability(Durability.SYNC_WAL);
+              } else if ("ASYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
+                tableDescriptor.setDurability(Durability.ASYNC_WAL);
+              } else if ("FSYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
+                tableDescriptor.setDurability(Durability.FSYNC_WAL);
+              } else {
+                LOG.info("Unknown value for " + TIMELINE_METRICS_PRECISION_TABLE_DURABILITY + " : " + timelineMetricsPrecisionTableDurability);
+                validDurability = false;
+              }
+              if (validDurability) {
+                modifyTable = true;
+              }
             }
-            if (validDurability) {
-              modifyTable = true;
+          } else {
+            if (!timelineMetricsTablesDurability.isEmpty()) {
+              LOG.info("Setting WAL option " + timelineMetricsTablesDurability + " for table : " + tableName);
+              boolean validDurability = true;
+              if ("SKIP_WAL".equals(timelineMetricsTablesDurability)) {
+                tableDescriptor.setDurability(Durability.SKIP_WAL);
+              } else if ("SYNC_WAL".equals(timelineMetricsTablesDurability)) {
+                tableDescriptor.setDurability(Durability.SYNC_WAL);
+              } else if ("ASYNC_WAL".equals(timelineMetricsTablesDurability)) {
+                tableDescriptor.setDurability(Durability.ASYNC_WAL);
+              } else if ("FSYNC_WAL".equals(timelineMetricsTablesDurability)) {
+                tableDescriptor.setDurability(Durability.FSYNC_WAL);
+              } else {
+                LOG.info("Unknown value for " + TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY + " : " + timelineMetricsTablesDurability);
+                validDurability = false;
+              }
+              if (validDurability) {
+                modifyTable = true;
+              }
             }
           }
 
@@ -680,6 +706,12 @@ public class PhoenixHBaseAccessor {
       return;
     }
     for (TimelineMetric tm: timelineMetrics) {
+
+      if (CollectionUtils.isNotEmpty(AggregatorUtils.whitelistedMetrics) &&
+        !AggregatorUtils.whitelistedMetrics.contains(tm.getMetricName())) {
+        continue;
+      }
+
       // Write to metadata cache on successful write to store
       if (metadataManager != null) {
         metadataManager.putIfModifiedTimelineMetricMetadata(

http://git-wip-us.apache.org/repos/asf/ambari/blob/78cce8e2/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index cb78e02..b91b43a 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -231,8 +231,14 @@ public class TimelineMetricConfiguration {
   public static final String TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED =
     "timeline.metrics.cluster.aggregator.interpolation.enabled";
 
-  public static final String TIMELINE_METRICS_TABLES_DURABILITY =
-    "timeline.metrics.tables.durability";
+  public static final String TIMELINE_METRICS_PRECISION_TABLE_DURABILITY =
+    "timeline.metrics.precision.table.durability";
+
+  public static final String TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY =
+      "timeline.metrics.aggregate.tables.durability";
+
+  public static final String TIMELINE_METRICS_WHITELIST_FILE =
+    "timeline.metrics.whitelist.file";
 
   public static final String HBASE_BLOCKING_STORE_FILES =
     "hbase.hstore.blockingStoreFiles";

http://git-wip-us.apache.org/repos/asf/ambari/blob/78cce8e2/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
index ce79b6f..9e41c87 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
@@ -18,13 +18,26 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 /**
  *
  */
 public class AggregatorUtils {
 
+  public static Set<String> whitelistedMetrics = new HashSet<String>();
+  private static final Log LOG = LogFactory.getLog(AggregatorUtils.class);
+
   public static double[] calculateAggregates(Map<Long, Double> metricValues) {
     double[] values = new double[4];
     double max = Double.MIN_VALUE;
@@ -56,4 +69,32 @@ public class AggregatorUtils {
     return values;
   }
 
+  public static void populateMetricWhitelistFromFile(String whitelistFile) {
+
+    FileInputStream fstream = null;
+    BufferedReader br = null;
+    String strLine;
+
+    try {
+      fstream = new FileInputStream(whitelistFile);
+      br = new BufferedReader(new InputStreamReader(fstream));
+
+      while ((strLine = br.readLine()) != null)   {
+        strLine = strLine.trim();
+        whitelistedMetrics.add(strLine);
+      }
+    } catch (IOException ioEx) {
+      LOG.error("Unable to parse metric whitelist file");
+      if (br != null) {
+        try {
+          br.close();
+        } catch (IOException e) {
+          LOG.error("Unable to close whitelist file reader");
+        }
+      }
+    }
+    LOG.info("Whitelisting " + whitelistedMetrics.size() + " metrics");
+    LOG.debug("Whitelisted metrics : " + Arrays.toString(whitelistedMetrics.toArray()));
+  }
+
 }