You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/09/23 02:40:05 UTC

[2/2] ambari git commit: AMBARI-13173. Provide Kafka metrics filter to reduce write volume. (swagle)

AMBARI-13173. Provide Kafka metrics filter to reduce write volume. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3b1fb3e3
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3b1fb3e3
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3b1fb3e3

Branch: refs/heads/branch-2.1.2
Commit: 3b1fb3e3452c1f9c6f81202012af58874b8e370c
Parents: 69cce9c
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Sep 22 17:39:44 2015 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Sep 22 17:40:02 2015 -0700

----------------------------------------------------------------------
 .../kafka/KafkaTimelineMetricsReporter.java     |   72 +-
 .../kafka/KafkaTimelineMetricsReporterTest.java |   66 +-
 .../sink/kafka/ScheduledReporterTest.java       |   31 +-
 .../ambari-metrics-storm-sink/pom.xml           |    2 +-
 .../sink/storm/StormTimelineMetricsSink.java    |    2 +-
 .../server/upgrade/UpgradeCatalog212.java       |   19 +-
 .../package/files/service-metrics/KAFKA.txt     | 1058 ++----------------
 .../KAFKA/configuration/kafka-broker.xml        |   14 +
 8 files changed, 225 insertions(+), 1039 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index a259864..dd7604b 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -33,8 +33,9 @@ import com.yammer.metrics.stats.Snapshot;
 import kafka.metrics.KafkaMetricsConfig;
 import kafka.metrics.KafkaMetricsReporter;
 import kafka.utils.VerifiableProperties;
-import org.apache.commons.lang.ClassUtils;
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
@@ -46,7 +47,11 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -64,6 +69,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private static final String TIMELINE_HOST_PROPERTY = "kafka.timeline.metrics.host";
   private static final String TIMELINE_PORT_PROPERTY = "kafka.timeline.metrics.port";
   private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = "kafka.timeline.metrics.reporter.enabled";
+  private static final String EXCLUDED_METRICS_PROPERTY = "external.kafka.metrics.exclude.prefix";
+  private static final String INCLUDED_METRICS_PROPERTY = "external.kafka.metrics.include.prefix";
   private static final String TIMELINE_DEFAULT_HOST = "localhost";
   private static final String TIMELINE_DEFAULT_PORT = "8188";
 
@@ -76,6 +83,11 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private TimelineMetricsCache metricsCache;
   private int timeoutSeconds = 10;
 
+  private String[] excludedMetricsPrefixes;
+  private String[] includedMetricsPrefixes;
+  // Local cache to avoid prefix matching everytime
+  private Set<String> excludedMetrics = new HashSet<>();
+
   @Override
   protected String getCollectorUri() {
     return collectorUri;
@@ -108,14 +120,28 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
         String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
         collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics";
+
+        // Exclusion policy
+        String excludedMetricsStr = props.getString(EXCLUDED_METRICS_PROPERTY, "");
+        if (!StringUtils.isEmpty(excludedMetricsStr.trim())) {
+          excludedMetricsPrefixes = excludedMetricsStr.trim().split(",");
+        }
+        // Inclusion override
+        String includedMetricsStr = props.getString(INCLUDED_METRICS_PROPERTY, "");
+        if (!StringUtils.isEmpty(includedMetricsStr.trim())) {
+          includedMetricsPrefixes = includedMetricsStr.trim().split(",");
+        }
+
         initializeReporter();
         if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
           startReporter(metricsConfig.pollingIntervalSecs());
         }
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("CollectorUri = " + collectorUri);
-          LOG.trace("MetricsSendInterval = " + metricsSendInterval);
-          LOG.trace("MaxRowCacheSize = " + maxRowCacheSize);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("CollectorUri = " + collectorUri);
+          LOG.debug("MetricsSendInterval = " + metricsSendInterval);
+          LOG.debug("MaxRowCacheSize = " + maxRowCacheSize);
+          LOG.debug("Excluded metrics prefixes = " + excludedMetricsStr);
+          LOG.debug("Included metrics prefixes = " + includedMetricsStr);
         }
       }
     }
@@ -156,6 +182,24 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
     public List<TimelineMetric> getTimelineMetricList();
   }
 
+  protected boolean isExcludedMetric(String metricName) {
+    if (excludedMetrics.contains(metricName)) {
+      return true;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("metricName => " + metricName +
+        ", exclude: " + StringUtils.startsWithAny(metricName, excludedMetricsPrefixes) +
+        ", include: " + StringUtils.startsWithAny(metricName, includedMetricsPrefixes));
+    }
+    if (StringUtils.startsWithAny(metricName, excludedMetricsPrefixes)) {
+      if (!StringUtils.startsWithAny(metricName, includedMetricsPrefixes)) {
+        excludedMetrics.add(metricName);
+        return true;
+      }
+    }
+    return false;
+  }
+
   class TimelineScheduledReporter extends ScheduledReporter implements MetricProcessor<Context> {
 
     private static final String APP_ID = "kafka_broker";
@@ -187,21 +231,21 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
           final MetricName metricName = entry.getKey();
           final Metric metric = entry.getValue();
           Context context = new Context() {
-
             public List<TimelineMetric> getTimelineMetricList() {
               return metricsList;
             }
-
           };
           metric.processWith(this, metricName, context);
         }
       } catch (Throwable t) {
         LOG.error("Exception processing Kafka metric", t);
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Metrics List size: " + metricsList.size());
-        LOG.trace("Metics Set size: " + metrics.size());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Metrics List size: " + metricsList.size());
+        LOG.debug("Metics Set size: " + metrics.size());
+        LOG.debug("Excluded metrics set: " + excludedMetrics);
       }
+
       if (!metricsList.isEmpty()) {
         TimelineMetrics timelineMetrics = new TimelineMetrics();
         timelineMetrics.setMetrics(metricsList);
@@ -333,7 +377,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
       final String ninetyNinthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
           NINETY_NINTH_PERCENTILE_SUFIX, snapshot.get99thPercentile());
       final String ninetyNinePointNinePercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
-          NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX, snapshot.get999thPercentile());
+        NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX, snapshot.get999thPercentile());
 
       return new String[] { medianName,
           ninetyEighthPercentileName, ninetyFifthPercentileName, ninetyNinePointNinePercentileName,
@@ -343,7 +387,11 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
     private String cacheSanitizedTimelineMetric(long currentTimeMillis, String sanitizedName, String suffix, Number metricValue) {
       final String meterName = sanitizedName + suffix;
       final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, meterName, metricValue);
-      metricsCache.putTimelineMetric(metric);
+      // Skip cache if we decide not to include the metric
+      // Cannot do this before calculations of percentiles
+      if (!isExcludedMetric(meterName)) {
+        metricsCache.putTimelineMetric(metric);
+      }
       return meterName;
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
index 67c61e1..70f4850 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
@@ -18,18 +18,16 @@
 
 package org.apache.hadoop.metrics2.sink.kafka;
 
-import static org.mockito.Mockito.mock;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replay;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+import junit.framework.Assert;
 import kafka.utils.VerifiableProperties;
-
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
@@ -40,19 +38,19 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Timer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replay;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ Metrics.class, HttpClient.class })
-@PowerMockIgnore("javax.management.*")
+@PrepareForTest({ Metrics.class, HttpClient.class,
+  KafkaTimelineMetricsReporter.TimelineScheduledReporter.class })
+@PowerMockIgnore({"javax.management.*", "org.apache.log4j.*", "org.slf4j.*"})
 public class KafkaTimelineMetricsReporterTest {
 
   private final List<Metric> list = new ArrayList<Metric>();
@@ -81,6 +79,8 @@ public class KafkaTimelineMetricsReporterTest {
     properties.setProperty("kafka.timeline.metrics.host", "localhost");
     properties.setProperty("kafka.timeline.metrics.port", "8188");
     properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
+    properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c");
+    properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d");
     props = new VerifiableProperties(properties);
   }
 
@@ -98,6 +98,27 @@ public class KafkaTimelineMetricsReporterTest {
     verifyAll();
   }
 
+  @Test
+  public void testMetricsExclusionPolicy() throws Exception {
+    mockStatic(Metrics.class);
+    EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2);
+    TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(kafkaTimelineMetricsReporter);
+    kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache);
+    HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+    kafkaTimelineMetricsReporter.setHttpClient(httpClient);
+
+    replay(Metrics.class, httpClient, timelineMetricsCache);
+    kafkaTimelineMetricsReporter.init(props);
+
+    Assert.assertTrue(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c"));
+    Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b"));
+    Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.d"));
+    Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.d.e"));
+
+    kafkaTimelineMetricsReporter.stopReporter();
+    verifyAll();
+  }
+
   private TimelineMetricsCache getTimelineMetricsCache(KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter) {
     TimelineMetricsCache timelineMetricsCache = EasyMock.createNiceMock(TimelineMetricsCache.class);
     kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache);
@@ -106,4 +127,5 @@ public class KafkaTimelineMetricsReporterTest {
     EasyMock.expectLastCall().once();
     return timelineMetricsCache;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
index 41f9126..de8026f 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
@@ -18,10 +18,17 @@
 
 package org.apache.hadoop.metrics2.sink.kafka;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -31,18 +38,10 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Timer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class ScheduledReporterTest {
   private final Gauge gauge = mock(Gauge.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-storm-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/pom.xml b/ambari-metrics/ambari-metrics-storm-sink/pom.xml
index c666de0..d3ffe22 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/pom.xml
+++ b/ambari-metrics/ambari-metrics-storm-sink/pom.xml
@@ -32,7 +32,7 @@ limitations under the License.
 
   <properties>
     <!--<storm.version>0.9.3.2.2.1.0-2340</storm.version>-->
-    <storm.version>0.10.0.2.3.2.0-2650</storm.version>
+    <storm.version>0.10.0.2.3.0.0-2557</storm.version>
   </properties>
 
   <build>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 36339c5..3a49e0a 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -82,7 +82,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
     for (DataPoint dataPoint : dataPoints) {
       if (dataPoint.value != null && NumberUtils.isNumber(dataPoint.value.toString())) {
-        LOG.info(dataPoint.name + " = " + dataPoint.value);
+        LOG.debug(dataPoint.name + " = " + dataPoint.value);
         TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp,
             taskInfo.srcComponentId, dataPoint.name, dataPoint.value.toString());
         // Put intermediate values into the cache until it is time to send

http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java
index 610ab14..cab9d3c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java
@@ -119,7 +119,7 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog {
 
   private void executeTopologyDDLUpdates() throws AmbariException, SQLException {
     dbAccessor.addColumn(TOPOLOGY_REQUEST_TABLE, new DBColumnInfo(TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN,
-        Long.class, null, null, true));
+      Long.class, null, null, true));
     // TOPOLOGY_REQUEST_CLUSTER_NAME_COLUMN will be deleted in PreDML. We need a cluster name to set cluster id.
     // dbAccessor.dropColumn(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_NAME_COLUMN);
     // dbAccessor.setColumnNullable(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, false);
@@ -140,7 +140,7 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog {
     dbAccessor.dropColumn(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_NAME_COLUMN);
     dbAccessor.setColumnNullable(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, false);
     dbAccessor.addFKConstraint(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_ID_FK_CONSTRAINT_NAME,
-        TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, CLUSTERS_TABLE, CLUSTERS_TABLE_CLUSTER_ID_COLUMN, false);
+      TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, CLUSTERS_TABLE, CLUSTERS_TABLE_CLUSTER_ID_COLUMN, false);
   }
 
   /**
@@ -196,6 +196,21 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog {
     updateHiveConfigs();
     updateOozieConfigs();
     updateHbaseAndClusterConfigurations();
+    updateKafkaConfigurations();
+  }
+
+  protected void updateKafkaConfigurations() throws AmbariException {
+    Map<String, String> properties = new HashMap<>();
+    properties.put("external.kafka.metrics.exclude.prefix",
+      "kafka.network.RequestMetrics,kafka.server.DelayedOperationPurgatory," +
+        "kafka.server.BrokerTopicMetrics.BytesRejectedPerSec");
+    properties.put("external.kafka.metrics.include.prefix",
+      "kafka.network.RequestMetrics.ResponseQueueTimeMs.request.OffsetCommit.98percentile," +
+        "kafka.network.RequestMetrics.ResponseQueueTimeMs.request.Offsets.95percentile," +
+        "kafka.network.RequestMetrics.ResponseSendTimeMs.request.Fetch.95percentile," +
+        "kafka.network.RequestMetrics.RequestsPerSec.request");
+
+    updateConfigurationProperties("kafka-broker", properties, false, false);
   }
 
   protected void updateHbaseAndClusterConfigurations() throws AmbariException {