You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/07/22 20:29:30 UTC
samza git commit: SAMZA-733: added metrics for the Elasticsearch
Producer
Repository: samza
Updated Branches:
refs/heads/master b22909855 -> ff7bc452f
SAMZA-733: added metrics for the Elasticsearch Producer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ff7bc452
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ff7bc452
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ff7bc452
Branch: refs/heads/master
Commit: ff7bc452f261e6c1f18b964568d544ee4f925756
Parents: b229098
Author: Roger Hoover <ro...@gmail.com>
Authored: Wed Jul 22 11:29:23 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Jul 22 11:29:23 2015 -0700
----------------------------------------------------------------------
.../org/apache/samza/metrics/MetricGroup.java | 68 ++++++++++++++++++++
.../org/apache/samza/metrics/MetricsBase.java | 53 +++++++++++++++
.../apache/samza/metrics/MetricsHelper.scala | 22 +++----
.../ElasticsearchSystemFactory.java | 3 +-
.../ElasticsearchSystemProducer.java | 24 ++++++-
.../ElasticsearchSystemProducerMetrics.java | 37 +++++++++++
.../ElasticsearchSystemProducerMetricsTest.java | 53 +++++++++++++++
.../ElasticsearchSystemProducerTest.java | 5 +-
8 files changed, 251 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
new file mode 100644
index 0000000..53526d8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
@@ -0,0 +1,68 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.metrics;
+
+/**
+ * MetricGroup is a little helper class to make it easy to register and
+ * manage a group of counters, gauges and timers. It's shared between Java
+ * and Scala
+ */
+public class MetricGroup {
+
+ public interface ValueFunction<T> {
+ T getValue();
+ }
+
+ protected final MetricsRegistry registry;
+ protected final String groupName;
+ protected final String prefix;
+
+ public MetricGroup(String groupName, String prefix, MetricsRegistry registry) {
+ this.groupName = groupName;
+ this.registry = registry;
+ this.prefix = prefix;
+ }
+
+ public Counter newCounter(String name) {
+ return registry.newCounter(groupName, (prefix + name).toLowerCase());
+ }
+
+ public <T> Gauge<T> newGauge(String name, T value) {
+ return registry.newGauge(groupName, new Gauge<T>((prefix + name).toLowerCase(), value));
+ }
+
+ /*
+ * Specify a dynamic gauge that always returns the latest value when polled.
+ * The value closure/object must be thread safe, since metrics reporters may access
+ * it from another thread.
+ */
+ public <T> Gauge<T> newGauge(String name, final ValueFunction<T> valueFunc) {
+ return registry.newGauge(groupName, new Gauge<T>((prefix + name).toLowerCase(), valueFunc.getValue()) {
+ @Override
+ public T getValue() {
+ return valueFunc.getValue();
+ }
+ });
+ }
+
+ public Timer newTimer(String name) {
+ return registry.newTimer(groupName, (prefix + name).toLowerCase());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java
new file mode 100644
index 0000000..7db3b65
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java
@@ -0,0 +1,53 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.metrics;
+
+/**
+ * A base class for metrics. The name of the class that extends the
+ * base class will be used as the metric group name
+ */
+public abstract class MetricsBase {
+ protected final MetricGroup group;
+
+ public MetricsBase(String prefix, MetricsRegistry registry) {
+ String groupName = this.getClass().getName();
+ group = new MetricGroup(groupName, prefix, registry);
+ }
+
+ public MetricsBase(MetricsRegistry registry) {
+ this("", registry);
+ }
+
+ public Counter newCounter(String name) {
+ return group.newCounter(name);
+ }
+
+ public <T> Gauge<T> newGauge(String name, T value) {
+ return group.newGauge(name, value);
+ }
+
+ public <T> Gauge<T> newGauge(String name, final MetricGroup.ValueFunction<T> valueFunc) {
+ return group.newGauge(name, valueFunc);
+ }
+
+ public Timer newTimer(String name) {
+ return group.newTimer(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
index 8eac8ef..1520b0e 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
@@ -19,22 +19,26 @@
package org.apache.samza.metrics
+import org.apache.samza.metrics.MetricGroup.ValueFunction
+
/**
* MetricsHelper is a little helper class to make it easy to register and
* manage counters, gauges and timers.
+ *
+ * The name of the class that extends this trait will be used as the
+ * metric group name
*/
trait MetricsHelper {
val group = this.getClass.getName
val registry: MetricsRegistry
+ val metricGroup = new MetricGroup(group, getPrefix, registry)
- def newCounter(name: String) = {
- registry.newCounter(group, (getPrefix + name).toLowerCase)
- }
+ def newCounter(name: String) = metricGroup.newCounter(name)
- def newGauge[T](name: String, value: T) = {
- registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value))
- }
+ def newTimer(name: String) = metricGroup.newTimer(name)
+
+ def newGauge[T](name: String, value: T) = metricGroup.newGauge[T](name,value)
/**
* Specify a dynamic gauge that always returns the latest value when polled.
@@ -42,15 +46,11 @@ trait MetricsHelper {
* it from another thread.
*/
def newGauge[T](name: String, value: () => T) = {
- registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value()) {
+ metricGroup.newGauge(name, new ValueFunction[T] {
override def getValue = value()
})
}
- def newTimer(name: String) = {
- registry.newTimer(group, (getPrefix + name).toLowerCase)
- }
-
/**
* Returns a prefix for metric names.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
index a277b69..d8ca70e 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
@@ -55,7 +55,8 @@ public class ElasticsearchSystemFactory implements SystemFactory {
return new ElasticsearchSystemProducer(name,
getBulkProcessorFactory(elasticsearchConfig),
getClient(elasticsearchConfig),
- getIndexRequestFactory(elasticsearchConfig));
+ getIndexRequestFactory(elasticsearchConfig),
+ new ElasticsearchSystemProducerMetrics(name, metricsRegistry));
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
index 7eb14a2..f61bd36 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
@@ -23,10 +23,13 @@ import org.apache.samza.SamzaException;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,16 +64,19 @@ public class ElasticsearchSystemProducer implements SystemProducer {
private final IndexRequestFactory indexRequestFactory;
private final BulkProcessorFactory bulkProcessorFactory;
+ private final ElasticsearchSystemProducerMetrics metrics;
private Client client;
public ElasticsearchSystemProducer(String system, BulkProcessorFactory bulkProcessorFactory,
- Client client, IndexRequestFactory indexRequestFactory) {
+ Client client, IndexRequestFactory indexRequestFactory,
+ ElasticsearchSystemProducerMetrics metrics) {
this.system = system;
this.sourceBulkProcessor = new HashMap<>();
this.bulkProcessorFactory = bulkProcessorFactory;
this.client = client;
this.indexRequestFactory = indexRequestFactory;
+ this.metrics = metrics;
}
@@ -102,6 +108,7 @@ public class ElasticsearchSystemProducer implements SystemProducer {
if (response.hasFailures()) {
sendFailed.set(true);
} else {
+ updateSuccessMetrics(response);
LOGGER.info(String.format("Written %s messages from %s to %s.",
response.getItems().length, source, system));
}
@@ -142,4 +149,19 @@ public class ElasticsearchSystemProducer implements SystemProducer {
LOGGER.info(String.format("Flushed %s to %s.", source, system));
}
+
+ private void updateSuccessMetrics(BulkResponse response) {
+ metrics.bulkSendSuccess.inc();
+ for (BulkItemResponse itemResp: response.getItems()) {
+ ActionResponse resp = itemResp.getResponse();
+ if (resp instanceof IndexResponse) {
+ if (((IndexResponse)resp).isCreated()) {
+ metrics.inserts.inc();
+ }
+ else {
+ metrics.updates.inc();
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
new file mode 100644
index 0000000..e3b635b
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsBase;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public class ElasticsearchSystemProducerMetrics extends MetricsBase {
+ public final Counter bulkSendSuccess;
+ public final Counter inserts;
+ public final Counter updates;
+
+ public ElasticsearchSystemProducerMetrics(String systemName, MetricsRegistry registry) {
+ super(systemName + "-", registry);
+
+ bulkSendSuccess = newCounter("bulk-send-success");
+ inserts = newCounter("docs-inserted");
+ updates = newCounter("docs-updated");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
new file mode 100644
index 0000000..980964f
--- /dev/null
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.metrics.*;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+public class ElasticsearchSystemProducerMetricsTest {
+
+ public final static String GRP_NAME = "org.apache.samza.system.elasticsearch.ElasticsearchSystemProducerMetrics";
+
+ @Test
+ public void testMetrics() {
+ ReadableMetricsRegistry registry = new MetricsRegistryMap();
+ ElasticsearchSystemProducerMetrics metrics = new ElasticsearchSystemProducerMetrics("es", registry);
+ metrics.bulkSendSuccess.inc(29L);
+ metrics.inserts.inc();
+ metrics.updates.inc(7L);
+
+ Set<String> groups = registry.getGroups();
+ assertEquals(1, groups.size());
+ assertEquals(GRP_NAME, groups.toArray()[0]);
+
+ Map<String, Metric> metricMap = registry.getGroup(GRP_NAME);
+ assertEquals(3, metricMap.size());
+ assertEquals(29L, ((Counter) metricMap.get("es-bulk-send-success")).getCount());
+ assertEquals(1L, ((Counter) metricMap.get("es-docs-inserted")).getCount());
+ assertEquals(7L, ((Counter) metricMap.get("es-docs-updated")).getCount());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
index e63d62c..684d7f6 100644
--- a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
@@ -20,6 +20,7 @@
package org.apache.samza.system.elasticsearch;
import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
@@ -43,6 +44,7 @@ public class ElasticsearchSystemProducerTest {
private static final BulkProcessorFactory BULK_PROCESSOR_FACTORY = mock(BulkProcessorFactory.class);
private static final Client CLIENT = mock(Client.class);
private static final IndexRequestFactory INDEX_REQUEST_FACTORY = mock(IndexRequestFactory.class);
+ private static final ElasticsearchSystemProducerMetrics METRICS = new ElasticsearchSystemProducerMetrics("es", new MetricsRegistryMap());
public static final String SOURCE_ONE = "one";
public static final String SOURCE_TWO = "two";
private SystemProducer producer;
@@ -54,7 +56,8 @@ public class ElasticsearchSystemProducerTest {
producer = new ElasticsearchSystemProducer(SYSTEM_NAME,
BULK_PROCESSOR_FACTORY,
CLIENT,
- INDEX_REQUEST_FACTORY);
+ INDEX_REQUEST_FACTORY,
+ METRICS);
processorOne = mock(BulkProcessor.class);
processorTwo = mock(BulkProcessor.class);