You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/09/23 02:40:05 UTC
[2/2] ambari git commit: AMBARI-13173. Provide Kafka metrics filter
to reduce write volume. (swagle)
AMBARI-13173. Provide Kafka metrics filter to reduce write volume. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3b1fb3e3
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3b1fb3e3
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3b1fb3e3
Branch: refs/heads/branch-2.1.2
Commit: 3b1fb3e3452c1f9c6f81202012af58874b8e370c
Parents: 69cce9c
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Sep 22 17:39:44 2015 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Sep 22 17:40:02 2015 -0700
----------------------------------------------------------------------
.../kafka/KafkaTimelineMetricsReporter.java | 72 +-
.../kafka/KafkaTimelineMetricsReporterTest.java | 66 +-
.../sink/kafka/ScheduledReporterTest.java | 31 +-
.../ambari-metrics-storm-sink/pom.xml | 2 +-
.../sink/storm/StormTimelineMetricsSink.java | 2 +-
.../server/upgrade/UpgradeCatalog212.java | 19 +-
.../package/files/service-metrics/KAFKA.txt | 1058 ++----------------
.../KAFKA/configuration/kafka-broker.xml | 14 +
8 files changed, 225 insertions(+), 1039 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index a259864..dd7604b 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -33,8 +33,9 @@ import com.yammer.metrics.stats.Snapshot;
import kafka.metrics.KafkaMetricsConfig;
import kafka.metrics.KafkaMetricsReporter;
import kafka.utils.VerifiableProperties;
-import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
@@ -46,7 +47,11 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -64,6 +69,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
private static final String TIMELINE_HOST_PROPERTY = "kafka.timeline.metrics.host";
private static final String TIMELINE_PORT_PROPERTY = "kafka.timeline.metrics.port";
private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = "kafka.timeline.metrics.reporter.enabled";
+ private static final String EXCLUDED_METRICS_PROPERTY = "external.kafka.metrics.exclude.prefix";
+ private static final String INCLUDED_METRICS_PROPERTY = "external.kafka.metrics.include.prefix";
private static final String TIMELINE_DEFAULT_HOST = "localhost";
private static final String TIMELINE_DEFAULT_PORT = "8188";
@@ -76,6 +83,11 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
private TimelineMetricsCache metricsCache;
private int timeoutSeconds = 10;
+ private String[] excludedMetricsPrefixes;
+ private String[] includedMetricsPrefixes;
+ // Local cache to avoid prefix matching everytime
+ private Set<String> excludedMetrics = new HashSet<>();
+
@Override
protected String getCollectorUri() {
return collectorUri;
@@ -108,14 +120,28 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics";
+
+ // Exclusion policy
+ String excludedMetricsStr = props.getString(EXCLUDED_METRICS_PROPERTY, "");
+ if (!StringUtils.isEmpty(excludedMetricsStr.trim())) {
+ excludedMetricsPrefixes = excludedMetricsStr.trim().split(",");
+ }
+ // Inclusion override
+ String includedMetricsStr = props.getString(INCLUDED_METRICS_PROPERTY, "");
+ if (!StringUtils.isEmpty(includedMetricsStr.trim())) {
+ includedMetricsPrefixes = includedMetricsStr.trim().split(",");
+ }
+
initializeReporter();
if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
startReporter(metricsConfig.pollingIntervalSecs());
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("CollectorUri = " + collectorUri);
- LOG.trace("MetricsSendInterval = " + metricsSendInterval);
- LOG.trace("MaxRowCacheSize = " + maxRowCacheSize);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CollectorUri = " + collectorUri);
+ LOG.debug("MetricsSendInterval = " + metricsSendInterval);
+ LOG.debug("MaxRowCacheSize = " + maxRowCacheSize);
+ LOG.debug("Excluded metrics prefixes = " + excludedMetricsStr);
+ LOG.debug("Included metrics prefixes = " + includedMetricsStr);
}
}
}
@@ -156,6 +182,24 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
public List<TimelineMetric> getTimelineMetricList();
}
+ protected boolean isExcludedMetric(String metricName) {
+ if (excludedMetrics.contains(metricName)) {
+ return true;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("metricName => " + metricName +
+ ", exclude: " + StringUtils.startsWithAny(metricName, excludedMetricsPrefixes) +
+ ", include: " + StringUtils.startsWithAny(metricName, includedMetricsPrefixes));
+ }
+ if (StringUtils.startsWithAny(metricName, excludedMetricsPrefixes)) {
+ if (!StringUtils.startsWithAny(metricName, includedMetricsPrefixes)) {
+ excludedMetrics.add(metricName);
+ return true;
+ }
+ }
+ return false;
+ }
+
class TimelineScheduledReporter extends ScheduledReporter implements MetricProcessor<Context> {
private static final String APP_ID = "kafka_broker";
@@ -187,21 +231,21 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
final MetricName metricName = entry.getKey();
final Metric metric = entry.getValue();
Context context = new Context() {
-
public List<TimelineMetric> getTimelineMetricList() {
return metricsList;
}
-
};
metric.processWith(this, metricName, context);
}
} catch (Throwable t) {
LOG.error("Exception processing Kafka metric", t);
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Metrics List size: " + metricsList.size());
- LOG.trace("Metics Set size: " + metrics.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Metrics List size: " + metricsList.size());
+ LOG.debug("Metics Set size: " + metrics.size());
+ LOG.debug("Excluded metrics set: " + excludedMetrics);
}
+
if (!metricsList.isEmpty()) {
TimelineMetrics timelineMetrics = new TimelineMetrics();
timelineMetrics.setMetrics(metricsList);
@@ -333,7 +377,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
final String ninetyNinthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
NINETY_NINTH_PERCENTILE_SUFIX, snapshot.get99thPercentile());
final String ninetyNinePointNinePercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
- NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX, snapshot.get999thPercentile());
+ NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX, snapshot.get999thPercentile());
return new String[] { medianName,
ninetyEighthPercentileName, ninetyFifthPercentileName, ninetyNinePointNinePercentileName,
@@ -343,7 +387,11 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
private String cacheSanitizedTimelineMetric(long currentTimeMillis, String sanitizedName, String suffix, Number metricValue) {
final String meterName = sanitizedName + suffix;
final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, meterName, metricValue);
- metricsCache.putTimelineMetric(metric);
+ // Skip cache if we decide not to include the metric
+ // Cannot do this before calculations of percentiles
+ if (!isExcludedMetric(meterName)) {
+ metricsCache.putTimelineMetric(metric);
+ }
return meterName;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
index 67c61e1..70f4850 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
@@ -18,18 +18,16 @@
package org.apache.hadoop.metrics2.sink.kafka;
-import static org.mockito.Mockito.mock;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replay;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+import junit.framework.Assert;
import kafka.utils.VerifiableProperties;
-
import org.apache.commons.httpclient.HttpClient;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
@@ -40,19 +38,19 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Timer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replay;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ Metrics.class, HttpClient.class })
-@PowerMockIgnore("javax.management.*")
+@PrepareForTest({ Metrics.class, HttpClient.class,
+ KafkaTimelineMetricsReporter.TimelineScheduledReporter.class })
+@PowerMockIgnore({"javax.management.*", "org.apache.log4j.*", "org.slf4j.*"})
public class KafkaTimelineMetricsReporterTest {
private final List<Metric> list = new ArrayList<Metric>();
@@ -81,6 +79,8 @@ public class KafkaTimelineMetricsReporterTest {
properties.setProperty("kafka.timeline.metrics.host", "localhost");
properties.setProperty("kafka.timeline.metrics.port", "8188");
properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
+ properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c");
+ properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d");
props = new VerifiableProperties(properties);
}
@@ -98,6 +98,27 @@ public class KafkaTimelineMetricsReporterTest {
verifyAll();
}
+ @Test
+ public void testMetricsExclusionPolicy() throws Exception {
+ mockStatic(Metrics.class);
+ EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2);
+ TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(kafkaTimelineMetricsReporter);
+ kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache);
+ HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+ kafkaTimelineMetricsReporter.setHttpClient(httpClient);
+
+ replay(Metrics.class, httpClient, timelineMetricsCache);
+ kafkaTimelineMetricsReporter.init(props);
+
+ Assert.assertTrue(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c"));
+ Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b"));
+ Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.d"));
+ Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.d.e"));
+
+ kafkaTimelineMetricsReporter.stopReporter();
+ verifyAll();
+ }
+
private TimelineMetricsCache getTimelineMetricsCache(KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter) {
TimelineMetricsCache timelineMetricsCache = EasyMock.createNiceMock(TimelineMetricsCache.class);
kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache);
@@ -106,4 +127,5 @@ public class KafkaTimelineMetricsReporterTest {
EasyMock.expectLastCall().once();
return timelineMetricsCache;
}
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
index 41f9126..de8026f 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
@@ -18,10 +18,17 @@
package org.apache.hadoop.metrics2.sink.kafka;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
@@ -31,18 +38,10 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Timer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class ScheduledReporterTest {
private final Gauge gauge = mock(Gauge.class);
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-storm-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/pom.xml b/ambari-metrics/ambari-metrics-storm-sink/pom.xml
index c666de0..d3ffe22 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/pom.xml
+++ b/ambari-metrics/ambari-metrics-storm-sink/pom.xml
@@ -32,7 +32,7 @@ limitations under the License.
<properties>
<!--<storm.version>0.9.3.2.2.1.0-2340</storm.version>-->
- <storm.version>0.10.0.2.3.2.0-2650</storm.version>
+ <storm.version>0.10.0.2.3.0.0-2557</storm.version>
</properties>
<build>
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 36339c5..3a49e0a 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -82,7 +82,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
for (DataPoint dataPoint : dataPoints) {
if (dataPoint.value != null && NumberUtils.isNumber(dataPoint.value.toString())) {
- LOG.info(dataPoint.name + " = " + dataPoint.value);
+ LOG.debug(dataPoint.name + " = " + dataPoint.value);
TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp,
taskInfo.srcComponentId, dataPoint.name, dataPoint.value.toString());
// Put intermediate values into the cache until it is time to send
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b1fb3e3/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java
index 610ab14..cab9d3c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java
@@ -119,7 +119,7 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog {
private void executeTopologyDDLUpdates() throws AmbariException, SQLException {
dbAccessor.addColumn(TOPOLOGY_REQUEST_TABLE, new DBColumnInfo(TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN,
- Long.class, null, null, true));
+ Long.class, null, null, true));
// TOPOLOGY_REQUEST_CLUSTER_NAME_COLUMN will be deleted in PreDML. We need a cluster name to set cluster id.
// dbAccessor.dropColumn(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_NAME_COLUMN);
// dbAccessor.setColumnNullable(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, false);
@@ -140,7 +140,7 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog {
dbAccessor.dropColumn(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_NAME_COLUMN);
dbAccessor.setColumnNullable(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, false);
dbAccessor.addFKConstraint(TOPOLOGY_REQUEST_TABLE, TOPOLOGY_REQUEST_CLUSTER_ID_FK_CONSTRAINT_NAME,
- TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, CLUSTERS_TABLE, CLUSTERS_TABLE_CLUSTER_ID_COLUMN, false);
+ TOPOLOGY_REQUEST_CLUSTER_ID_COLUMN, CLUSTERS_TABLE, CLUSTERS_TABLE_CLUSTER_ID_COLUMN, false);
}
/**
@@ -196,6 +196,21 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog {
updateHiveConfigs();
updateOozieConfigs();
updateHbaseAndClusterConfigurations();
+ updateKafkaConfigurations();
+ }
+
+ protected void updateKafkaConfigurations() throws AmbariException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("external.kafka.metrics.exclude.prefix",
+ "kafka.network.RequestMetrics,kafka.server.DelayedOperationPurgatory," +
+ "kafka.server.BrokerTopicMetrics.BytesRejectedPerSec");
+ properties.put("external.kafka.metrics.include.prefix",
+ "kafka.network.RequestMetrics.ResponseQueueTimeMs.request.OffsetCommit.98percentile," +
+ "kafka.network.RequestMetrics.ResponseQueueTimeMs.request.Offsets.95percentile," +
+ "kafka.network.RequestMetrics.ResponseSendTimeMs.request.Fetch.95percentile," +
+ "kafka.network.RequestMetrics.RequestsPerSec.request");
+
+ updateConfigurationProperties("kafka-broker", properties, false, false);
}
protected void updateHbaseAndClusterConfigurations() throws AmbariException {