You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/07/22 20:29:30 UTC

samza git commit: SAMZA-733: added metrics for the Elasticsearch Producer

Repository: samza
Updated Branches:
  refs/heads/master b22909855 -> ff7bc452f


SAMZA-733: added metrics for the Elasticsearch Producer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ff7bc452
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ff7bc452
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ff7bc452

Branch: refs/heads/master
Commit: ff7bc452f261e6c1f18b964568d544ee4f925756
Parents: b229098
Author: Roger Hoover <ro...@gmail.com>
Authored: Wed Jul 22 11:29:23 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Jul 22 11:29:23 2015 -0700

----------------------------------------------------------------------
 .../org/apache/samza/metrics/MetricGroup.java   | 68 ++++++++++++++++++++
 .../org/apache/samza/metrics/MetricsBase.java   | 53 +++++++++++++++
 .../apache/samza/metrics/MetricsHelper.scala    | 22 +++----
 .../ElasticsearchSystemFactory.java             |  3 +-
 .../ElasticsearchSystemProducer.java            | 24 ++++++-
 .../ElasticsearchSystemProducerMetrics.java     | 37 +++++++++++
 .../ElasticsearchSystemProducerMetricsTest.java | 53 +++++++++++++++
 .../ElasticsearchSystemProducerTest.java        |  5 +-
 8 files changed, 251 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
new file mode 100644
index 0000000..53526d8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
@@ -0,0 +1,68 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.metrics;
+
+/**
+ * MetricGroup is a little helper class to make it easy to register and
+ * manage a group of counters, gauges and timers.  It's shared between Java
+ * and Scala
+ */
+public class MetricGroup {
+
+  public interface ValueFunction<T> {
+    T getValue();
+  }
+
+  protected final MetricsRegistry registry;
+  protected final String groupName;
+  protected final String prefix;
+
+  public MetricGroup(String groupName, String prefix, MetricsRegistry registry) {
+    this.groupName = groupName;
+    this.registry = registry;
+    this.prefix = prefix;
+  }
+
+  public Counter newCounter(String name) {
+    return registry.newCounter(groupName, (prefix + name).toLowerCase());
+  }
+
+  public <T> Gauge<T> newGauge(String name, T value) {
+    return registry.newGauge(groupName, new Gauge<T>((prefix + name).toLowerCase(), value));
+  }
+
+  /*
+   * Specify a dynamic gauge that always returns the latest value when polled.
+   * The value closure/object must be thread safe, since metrics reporters may access
+   * it from another thread.
+   */
+  public <T> Gauge<T> newGauge(String name, final ValueFunction<T> valueFunc) {
+    return registry.newGauge(groupName, new Gauge<T>((prefix + name).toLowerCase(), valueFunc.getValue()) {
+      @Override
+      public T getValue() {
+        return valueFunc.getValue();
+      }
+    });
+  }
+
+  public Timer newTimer(String name) {
+    return registry.newTimer(groupName, (prefix + name).toLowerCase());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java
new file mode 100644
index 0000000..7db3b65
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java
@@ -0,0 +1,53 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.metrics;
+
+/**
+ * A base class for metrics. The name of the class that extends the
+ * base class will be used as the metric group name
+ */
+public abstract class MetricsBase {
+  protected final MetricGroup group;
+
+  public MetricsBase(String prefix, MetricsRegistry registry) {
+    String groupName = this.getClass().getName();
+    group = new MetricGroup(groupName, prefix, registry);
+  }
+
+  public MetricsBase(MetricsRegistry registry) {
+    this("", registry);
+  }
+
+  public Counter newCounter(String name) {
+    return group.newCounter(name);
+  }
+
+  public <T> Gauge<T> newGauge(String name, T value) {
+    return group.newGauge(name, value);
+  }
+
+  public <T> Gauge<T> newGauge(String name, final MetricGroup.ValueFunction<T> valueFunc) {
+    return group.newGauge(name, valueFunc);
+  }
+
+  public Timer newTimer(String name) {
+    return group.newTimer(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
index 8eac8ef..1520b0e 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
@@ -19,22 +19,26 @@
 
 package org.apache.samza.metrics
 
+import org.apache.samza.metrics.MetricGroup.ValueFunction
+
 
 /**
  * MetricsHelper is a little helper class to make it easy to register and
  * manage counters, gauges and timers.
+ *
+ * The name of the class that extends this trait will be used as the
+ * metric group name
  */
 trait MetricsHelper {
   val group = this.getClass.getName
   val registry: MetricsRegistry
+  val metricGroup = new MetricGroup(group, getPrefix, registry)
 
-  def newCounter(name: String) = {
-    registry.newCounter(group, (getPrefix + name).toLowerCase)
-  }
+  def newCounter(name: String) = metricGroup.newCounter(name)
 
-  def newGauge[T](name: String, value: T) = {
-    registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value))
-  }
+  def newTimer(name: String) = metricGroup.newTimer(name)
+
+  def newGauge[T](name: String, value: T) = metricGroup.newGauge[T](name,value)
 
   /**
    * Specify a dynamic gauge that always returns the latest value when polled. 
@@ -42,15 +46,11 @@ trait MetricsHelper {
    * it from another thread.
    */
   def newGauge[T](name: String, value: () => T) = {
-    registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value()) {
+    metricGroup.newGauge(name, new ValueFunction[T] {
       override def getValue = value()
     })
   }
 
-  def newTimer(name: String) = {
-    registry.newTimer(group, (getPrefix + name).toLowerCase)
-  }
-
   /**
    * Returns a prefix for metric names.
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
index a277b69..d8ca70e 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
@@ -55,7 +55,8 @@ public class ElasticsearchSystemFactory implements SystemFactory {
     return new ElasticsearchSystemProducer(name,
                                            getBulkProcessorFactory(elasticsearchConfig),
                                            getClient(elasticsearchConfig),
-                                           getIndexRequestFactory(elasticsearchConfig));
+                                           getIndexRequestFactory(elasticsearchConfig),
+                                           new ElasticsearchSystemProducerMetrics(name, metricsRegistry));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
index 7eb14a2..f61bd36 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
@@ -23,10 +23,13 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,16 +64,19 @@ public class ElasticsearchSystemProducer implements SystemProducer {
 
   private final IndexRequestFactory indexRequestFactory;
   private final BulkProcessorFactory bulkProcessorFactory;
+  private final ElasticsearchSystemProducerMetrics metrics;
 
   private Client client;
 
   public ElasticsearchSystemProducer(String system, BulkProcessorFactory bulkProcessorFactory,
-                                     Client client, IndexRequestFactory indexRequestFactory) {
+                                     Client client, IndexRequestFactory indexRequestFactory,
+                                     ElasticsearchSystemProducerMetrics metrics) {
     this.system = system;
     this.sourceBulkProcessor = new HashMap<>();
     this.bulkProcessorFactory = bulkProcessorFactory;
     this.client = client;
     this.indexRequestFactory = indexRequestFactory;
+    this.metrics = metrics;
   }
 
 
@@ -102,6 +108,7 @@ public class ElasticsearchSystemProducer implements SystemProducer {
           if (response.hasFailures()) {
             sendFailed.set(true);
           } else {
+            updateSuccessMetrics(response);
             LOGGER.info(String.format("Written %s messages from %s to %s.",
                                       response.getItems().length, source, system));
           }
@@ -142,4 +149,19 @@ public class ElasticsearchSystemProducer implements SystemProducer {
 
     LOGGER.info(String.format("Flushed %s to %s.", source, system));
   }
+
+  private void updateSuccessMetrics(BulkResponse response) {
+    metrics.bulkSendSuccess.inc();
+    for (BulkItemResponse itemResp: response.getItems()) {
+        ActionResponse resp = itemResp.getResponse();
+        if (resp instanceof IndexResponse) {
+          if (((IndexResponse)resp).isCreated()) {
+            metrics.inserts.inc();
+          }
+          else {
+            metrics.updates.inc();
+          }
+        }
+      }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
new file mode 100644
index 0000000..e3b635b
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsBase;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public class ElasticsearchSystemProducerMetrics extends MetricsBase {
+    public final Counter bulkSendSuccess;
+    public final Counter inserts;
+    public final Counter updates;
+
+    public ElasticsearchSystemProducerMetrics(String systemName, MetricsRegistry registry) {
+        super(systemName + "-", registry);
+
+        bulkSendSuccess = newCounter("bulk-send-success");
+        inserts = newCounter("docs-inserted");
+        updates = newCounter("docs-updated");
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
new file mode 100644
index 0000000..980964f
--- /dev/null
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.metrics.*;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+public class ElasticsearchSystemProducerMetricsTest {
+
+    public final static String GRP_NAME = "org.apache.samza.system.elasticsearch.ElasticsearchSystemProducerMetrics";
+
+    @Test
+    public void testMetrics() {
+        ReadableMetricsRegistry registry = new MetricsRegistryMap();
+        ElasticsearchSystemProducerMetrics metrics = new ElasticsearchSystemProducerMetrics("es", registry);
+        metrics.bulkSendSuccess.inc(29L);
+        metrics.inserts.inc();
+        metrics.updates.inc(7L);
+
+        Set<String> groups = registry.getGroups();
+        assertEquals(1, groups.size());
+        assertEquals(GRP_NAME, groups.toArray()[0]);
+
+        Map<String, Metric> metricMap = registry.getGroup(GRP_NAME);
+        assertEquals(3, metricMap.size());
+        assertEquals(29L, ((Counter) metricMap.get("es-bulk-send-success")).getCount());
+        assertEquals(1L, ((Counter) metricMap.get("es-docs-inserted")).getCount());
+        assertEquals(7L, ((Counter) metricMap.get("es-docs-updated")).getCount());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
index e63d62c..684d7f6 100644
--- a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
@@ -20,6 +20,7 @@
 package org.apache.samza.system.elasticsearch;
 
 import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
@@ -43,6 +44,7 @@ public class ElasticsearchSystemProducerTest {
   private static final BulkProcessorFactory BULK_PROCESSOR_FACTORY = mock(BulkProcessorFactory.class);
   private static final Client CLIENT = mock(Client.class);
   private static final IndexRequestFactory INDEX_REQUEST_FACTORY = mock(IndexRequestFactory.class);
+  private static final ElasticsearchSystemProducerMetrics METRICS = new ElasticsearchSystemProducerMetrics("es", new MetricsRegistryMap());
   public static final String SOURCE_ONE = "one";
   public static final String SOURCE_TWO = "two";
   private SystemProducer producer;
@@ -54,7 +56,8 @@ public class ElasticsearchSystemProducerTest {
     producer = new ElasticsearchSystemProducer(SYSTEM_NAME,
                                                BULK_PROCESSOR_FACTORY,
                                                CLIENT,
-                                               INDEX_REQUEST_FACTORY);
+                                               INDEX_REQUEST_FACTORY,
+                                               METRICS);
 
     processorOne = mock(BulkProcessor.class);
     processorTwo = mock(BulkProcessor.class);