You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/04/29 15:16:36 UTC

[1/2] beam git commit: [BEAM-1398] KafkaIO metrics.

Repository: beam
Updated Branches:
  refs/heads/master 81474aeaf -> 47821ad69


[BEAM-1398] KafkaIO metrics.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/930c27f5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/930c27f5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/930c27f5

Branch: refs/heads/master
Commit: 930c27f55fc980702089fe58fdb0edded96a2ac6
Parents: 81474ae
Author: Aviem Zur <av...@gmail.com>
Authored: Tue Mar 28 07:29:53 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Sat Apr 29 18:08:19 2017 +0300

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  65 +++++++++-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 130 +++++++++++++++++++
 2 files changed, 194 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/930c27f5/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 47d8281..211f1a4 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -69,6 +69,10 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
 import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaDeserializer;
 import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaSerializer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.SinkMetrics;
+import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -950,6 +954,13 @@ public class KafkaIO {
     private Deserializer<K> keyDeserializerInstance = null;
     private Deserializer<V> valueDeserializerInstance = null;
 
+    private final Counter elementsRead = SourceMetrics.elementsRead();
+    private final Counter bytesRead = SourceMetrics.bytesRead();
+    private final Counter elementsReadBySplit;
+    private final Counter bytesReadBySplit;
+    private final Gauge backlogBytesOfSplit;
+    private final Gauge backlogElementsOfSplit;
+
     private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
     private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
 
@@ -1023,10 +1034,18 @@ public class KafkaIO {
 
       synchronized long approxBacklogInBytes() {
         // Note that is an an estimate of uncompressed backlog.
+        long backlogMessageCount = backlogMessageCount();
+        if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) {
+          return UnboundedReader.BACKLOG_UNKNOWN;
+        }
+        return (long) (backlogMessageCount * avgRecordSize);
+      }
+
+      synchronized long backlogMessageCount() {
         if (latestOffset < 0 || nextOffset < 0) {
           return UnboundedReader.BACKLOG_UNKNOWN;
         }
-        return Math.max(0, (long) ((latestOffset - nextOffset) * avgRecordSize));
+        return Math.max(0, (latestOffset - nextOffset));
       }
     }
 
@@ -1065,6 +1084,13 @@ public class KafkaIO {
           partitionStates.get(i).nextOffset = ckptMark.getNextOffset();
         }
       }
+
+      String splitId = String.valueOf(source.id);
+
+      elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId);
+      bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId);
+      backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId);
+      backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId);
     }
 
     private void consumerPollLoop() {
@@ -1194,6 +1220,9 @@ public class KafkaIO {
         if (curBatch.hasNext()) {
           PartitionState pState = curBatch.next();
 
+          elementsRead.inc();
+          elementsReadBySplit.inc();
+
           if (!pState.recordIter.hasNext()) { // -- (c)
             pState.recordIter = Collections.emptyIterator(); // drop ref
             curBatch.remove();
@@ -1241,6 +1270,8 @@ public class KafkaIO {
           int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length)
               + (rawRecord.value() == null ? 0 : rawRecord.value().length);
           pState.recordConsumed(offset, recordSize);
+          bytesRead.inc(recordSize);
+          bytesReadBySplit.inc(recordSize);
           return true;
 
         } else { // -- (b)
@@ -1278,6 +1309,19 @@ public class KafkaIO {
       LOG.debug("{}:  backlog {}", this, getSplitBacklogBytes());
     }
 
+    private void reportBacklog() {
+      long splitBacklogBytes = getSplitBacklogBytes();
+      if (splitBacklogBytes < 0) {
+        splitBacklogBytes = UnboundedReader.BACKLOG_UNKNOWN;
+      }
+      backlogBytesOfSplit.set(splitBacklogBytes);
+      long splitBacklogMessages = getSplitBacklogMessageCount();
+      if (splitBacklogMessages < 0) {
+        splitBacklogMessages = UnboundedReader.BACKLOG_UNKNOWN;
+      }
+      backlogElementsOfSplit.set(splitBacklogMessages);
+    }
+
     @Override
     public Instant getWatermark() {
       if (curRecord == null) {
@@ -1291,6 +1335,7 @@ public class KafkaIO {
 
     @Override
     public CheckpointMark getCheckpointMark() {
+      reportBacklog();
       return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can change)
           Lists.transform(partitionStates,
               new Function<PartitionState, PartitionMark>() {
@@ -1336,6 +1381,20 @@ public class KafkaIO {
       return backlogBytes;
     }
 
+    private long getSplitBacklogMessageCount() {
+      long backlogCount = 0;
+
+      for (PartitionState p : partitionStates) {
+        long pBacklog = p.backlogMessageCount();
+        if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
+          return UnboundedReader.BACKLOG_UNKNOWN;
+        }
+        backlogCount += pBacklog;
+      }
+
+      return backlogCount;
+    }
+
     @Override
     public void close() throws IOException {
       closed.set(true);
@@ -1561,6 +1620,8 @@ public class KafkaIO {
       producer.send(
           new ProducerRecord<K, V>(spec.getTopic(), kv.getKey(), kv.getValue()),
           new SendCallback());
+
+      elementsWritten.inc();
     }
 
     @FinishBundle
@@ -1585,6 +1646,8 @@ public class KafkaIO {
     private transient Exception sendException = null;
     private transient long numSendFailures = 0;
 
+    private final Counter elementsWritten = SinkMetrics.elementsWritten();
+
     KafkaWriter(Write<K, V> spec) {
       this.spec = spec;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/930c27f5/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index d713d90..feb65da 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -41,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -50,7 +53,17 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricMatchers;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.SinkMetrics;
+import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -90,6 +103,7 @@ import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -550,6 +564,76 @@ public class KafkaIOTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
+  public void testUnboundedSourceMetrics() {
+    int numElements = 1000;
+
+    String readStep = "readFromKafka";
+
+    p.apply(readStep,
+        mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata());
+
+    PipelineResult result = p.run();
+
+    String splitId = "0";
+
+    MetricName elementsRead = SourceMetrics.elementsRead().getName();
+    MetricName elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId).getName();
+    MetricName bytesRead = SourceMetrics.bytesRead().getName();
+    MetricName bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId).getName();
+    MetricName backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId).getName();
+    MetricName backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId).getName();
+
+    MetricQueryResults metrics = result.metrics().queryMetrics(
+        MetricsFilter.builder().build());
+
+    Iterable<MetricResult<Long>> counters = metrics.counters();
+    Iterable<MetricResult<GaugeResult>> gauges = metrics.gauges();
+
+    assertThat(counters, hasItem(
+        MetricMatchers.attemptedMetricsResult(
+            elementsRead.namespace(),
+            elementsRead.name(),
+            readStep,
+            1000L)));
+
+    assertThat(counters, hasItem(
+        MetricMatchers.attemptedMetricsResult(
+            elementsReadBySplit.namespace(),
+            elementsReadBySplit.name(),
+            readStep,
+            1000L)));
+
+    assertThat(counters, hasItem(
+        MetricMatchers.attemptedMetricsResult(
+            bytesRead.namespace(),
+            bytesRead.name(),
+            readStep,
+            12000L)));
+
+    assertThat(counters, hasItem(
+        MetricMatchers.attemptedMetricsResult(
+            bytesReadBySplit.namespace(),
+            bytesReadBySplit.name(),
+            readStep,
+            12000L)));
+
+    assertThat(gauges, hasItem(
+        attemptedMetricsResult(
+            backlogElementsOfSplit.namespace(),
+            backlogElementsOfSplit.name(),
+            readStep,
+            GaugeResult.create(0L, Instant.now()))));
+
+    assertThat(gauges, hasItem(
+        attemptedMetricsResult(
+            backlogBytesOfSplit.namespace(),
+            backlogBytesOfSplit.name(),
+            readStep,
+            GaugeResult.create(0L, Instant.now()))));
+  }
+
+  @Test
   public void testSink() throws Exception {
     // Simply read from kafka source and write to kafka sink. Then verify the records
     // are correctly published to mock kafka producer.
@@ -752,6 +836,52 @@ public class KafkaIOTest {
             instanceof VarLongCoder);
   }
 
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSinkMetrics() throws Exception {
+    // Simply read from kafka source and write to kafka sink. Then verify the metrics are reported.
+
+    int numElements = 1000;
+
+    synchronized (MOCK_PRODUCER_LOCK) {
+
+      MOCK_PRODUCER.clear();
+
+      ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
+
+      String topic = "test";
+
+      p
+          .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+              .withoutMetadata())
+          .apply("writeToKafka", KafkaIO.<Integer, Long>write()
+              .withBootstrapServers("none")
+              .withTopic(topic)
+              .withKeySerializer(IntegerSerializer.class)
+              .withValueSerializer(LongSerializer.class)
+              .withProducerFactoryFn(new ProducerFactoryFn()));
+
+      PipelineResult result = p.run();
+
+      MetricName elementsWritten = SinkMetrics.elementsWritten().getName();
+
+      MetricQueryResults metrics = result.metrics().queryMetrics(
+          MetricsFilter.builder()
+              .addNameFilter(MetricNameFilter.inNamespace(elementsWritten.namespace()))
+              .build());
+
+
+      assertThat(metrics.counters(), hasItem(
+          MetricMatchers.attemptedMetricsResult(
+              elementsWritten.namespace(),
+              elementsWritten.name(),
+              "writeToKafka",
+              1000L)));
+
+      completionThread.shutdown();
+    }
+  }
+
   private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent) {
 
     // verify that appropriate messages are written to kafka


[2/2] beam git commit: This closes #2344

Posted by av...@apache.org.
This closes #2344


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/47821ad6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/47821ad6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/47821ad6

Branch: refs/heads/master
Commit: 47821ad695f67977c775f62b6f8791ca109a7d0b
Parents: 81474ae 930c27f
Author: Aviem Zur <av...@gmail.com>
Authored: Sat Apr 29 18:16:17 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Sat Apr 29 18:16:17 2017 +0300

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  65 +++++++++-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 130 +++++++++++++++++++
 2 files changed, 194 insertions(+), 1 deletion(-)
----------------------------------------------------------------------