You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/04/29 15:16:36 UTC
[1/2] beam git commit: [BEAM-1398] KafkaIO metrics.
Repository: beam
Updated Branches:
refs/heads/master 81474aeaf -> 47821ad69
[BEAM-1398] KafkaIO metrics.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/930c27f5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/930c27f5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/930c27f5
Branch: refs/heads/master
Commit: 930c27f55fc980702089fe58fdb0edded96a2ac6
Parents: 81474ae
Author: Aviem Zur <av...@gmail.com>
Authored: Tue Mar 28 07:29:53 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Sat Apr 29 18:08:19 2017 +0300
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 65 +++++++++-
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 130 +++++++++++++++++++
2 files changed, 194 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/930c27f5/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 47d8281..211f1a4 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -69,6 +69,10 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaDeserializer;
import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaSerializer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.SinkMetrics;
+import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
@@ -950,6 +954,13 @@ public class KafkaIO {
private Deserializer<K> keyDeserializerInstance = null;
private Deserializer<V> valueDeserializerInstance = null;
+ private final Counter elementsRead = SourceMetrics.elementsRead();
+ private final Counter bytesRead = SourceMetrics.bytesRead();
+ private final Counter elementsReadBySplit;
+ private final Counter bytesReadBySplit;
+ private final Gauge backlogBytesOfSplit;
+ private final Gauge backlogElementsOfSplit;
+
private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
@@ -1023,10 +1034,18 @@ public class KafkaIO {
synchronized long approxBacklogInBytes() {
// Note that is an an estimate of uncompressed backlog.
+ long backlogMessageCount = backlogMessageCount();
+ if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) {
+ return UnboundedReader.BACKLOG_UNKNOWN;
+ }
+ return (long) (backlogMessageCount * avgRecordSize);
+ }
+
+ synchronized long backlogMessageCount() {
if (latestOffset < 0 || nextOffset < 0) {
return UnboundedReader.BACKLOG_UNKNOWN;
}
- return Math.max(0, (long) ((latestOffset - nextOffset) * avgRecordSize));
+ return Math.max(0, (latestOffset - nextOffset));
}
}
@@ -1065,6 +1084,13 @@ public class KafkaIO {
partitionStates.get(i).nextOffset = ckptMark.getNextOffset();
}
}
+
+ String splitId = String.valueOf(source.id);
+
+ elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId);
+ bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId);
+ backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId);
+ backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId);
}
private void consumerPollLoop() {
@@ -1194,6 +1220,9 @@ public class KafkaIO {
if (curBatch.hasNext()) {
PartitionState pState = curBatch.next();
+ elementsRead.inc();
+ elementsReadBySplit.inc();
+
if (!pState.recordIter.hasNext()) { // -- (c)
pState.recordIter = Collections.emptyIterator(); // drop ref
curBatch.remove();
@@ -1241,6 +1270,8 @@ public class KafkaIO {
int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length)
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
pState.recordConsumed(offset, recordSize);
+ bytesRead.inc(recordSize);
+ bytesReadBySplit.inc(recordSize);
return true;
} else { // -- (b)
@@ -1278,6 +1309,19 @@ public class KafkaIO {
LOG.debug("{}: backlog {}", this, getSplitBacklogBytes());
}
+ private void reportBacklog() {
+ long splitBacklogBytes = getSplitBacklogBytes();
+ if (splitBacklogBytes < 0) {
+ splitBacklogBytes = UnboundedReader.BACKLOG_UNKNOWN;
+ }
+ backlogBytesOfSplit.set(splitBacklogBytes);
+ long splitBacklogMessages = getSplitBacklogMessageCount();
+ if (splitBacklogMessages < 0) {
+ splitBacklogMessages = UnboundedReader.BACKLOG_UNKNOWN;
+ }
+ backlogElementsOfSplit.set(splitBacklogMessages);
+ }
+
@Override
public Instant getWatermark() {
if (curRecord == null) {
@@ -1291,6 +1335,7 @@ public class KafkaIO {
@Override
public CheckpointMark getCheckpointMark() {
+ reportBacklog();
return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can change)
Lists.transform(partitionStates,
new Function<PartitionState, PartitionMark>() {
@@ -1336,6 +1381,20 @@ public class KafkaIO {
return backlogBytes;
}
+ private long getSplitBacklogMessageCount() {
+ long backlogCount = 0;
+
+ for (PartitionState p : partitionStates) {
+ long pBacklog = p.backlogMessageCount();
+ if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
+ return UnboundedReader.BACKLOG_UNKNOWN;
+ }
+ backlogCount += pBacklog;
+ }
+
+ return backlogCount;
+ }
+
@Override
public void close() throws IOException {
closed.set(true);
@@ -1561,6 +1620,8 @@ public class KafkaIO {
producer.send(
new ProducerRecord<K, V>(spec.getTopic(), kv.getKey(), kv.getValue()),
new SendCallback());
+
+ elementsWritten.inc();
}
@FinishBundle
@@ -1585,6 +1646,8 @@ public class KafkaIO {
private transient Exception sendException = null;
private transient long numSendFailures = 0;
+ private final Counter elementsWritten = SinkMetrics.elementsWritten();
+
KafkaWriter(Write<K, V> spec) {
this.spec = spec;
http://git-wip-us.apache.org/repos/asf/beam/blob/930c27f5/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index d713d90..feb65da 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -17,7 +17,9 @@
*/
package org.apache.beam.sdk.io.kafka;
+import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@@ -41,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -50,7 +53,17 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricMatchers;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.SinkMetrics;
+import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
@@ -90,6 +103,7 @@ import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -550,6 +564,76 @@ public class KafkaIOTest {
}
@Test
+ @Category(NeedsRunner.class)
+ public void testUnboundedSourceMetrics() {
+ int numElements = 1000;
+
+ String readStep = "readFromKafka";
+
+ p.apply(readStep,
+ mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata());
+
+ PipelineResult result = p.run();
+
+ String splitId = "0";
+
+ MetricName elementsRead = SourceMetrics.elementsRead().getName();
+ MetricName elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId).getName();
+ MetricName bytesRead = SourceMetrics.bytesRead().getName();
+ MetricName bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId).getName();
+ MetricName backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId).getName();
+ MetricName backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId).getName();
+
+ MetricQueryResults metrics = result.metrics().queryMetrics(
+ MetricsFilter.builder().build());
+
+ Iterable<MetricResult<Long>> counters = metrics.counters();
+ Iterable<MetricResult<GaugeResult>> gauges = metrics.gauges();
+
+ assertThat(counters, hasItem(
+ MetricMatchers.attemptedMetricsResult(
+ elementsRead.namespace(),
+ elementsRead.name(),
+ readStep,
+ 1000L)));
+
+ assertThat(counters, hasItem(
+ MetricMatchers.attemptedMetricsResult(
+ elementsReadBySplit.namespace(),
+ elementsReadBySplit.name(),
+ readStep,
+ 1000L)));
+
+ assertThat(counters, hasItem(
+ MetricMatchers.attemptedMetricsResult(
+ bytesRead.namespace(),
+ bytesRead.name(),
+ readStep,
+ 12000L)));
+
+ assertThat(counters, hasItem(
+ MetricMatchers.attemptedMetricsResult(
+ bytesReadBySplit.namespace(),
+ bytesReadBySplit.name(),
+ readStep,
+ 12000L)));
+
+ assertThat(gauges, hasItem(
+ attemptedMetricsResult(
+ backlogElementsOfSplit.namespace(),
+ backlogElementsOfSplit.name(),
+ readStep,
+ GaugeResult.create(0L, Instant.now()))));
+
+ assertThat(gauges, hasItem(
+ attemptedMetricsResult(
+ backlogBytesOfSplit.namespace(),
+ backlogBytesOfSplit.name(),
+ readStep,
+ GaugeResult.create(0L, Instant.now()))));
+ }
+
+ @Test
public void testSink() throws Exception {
// Simply read from kafka source and write to kafka sink. Then verify the records
// are correctly published to mock kafka producer.
@@ -752,6 +836,52 @@ public class KafkaIOTest {
instanceof VarLongCoder);
}
+ @Test
+ @Category(NeedsRunner.class)
+ public void testSinkMetrics() throws Exception {
+ // Simply read from kafka source and write to kafka sink. Then verify the metrics are reported.
+
+ int numElements = 1000;
+
+ synchronized (MOCK_PRODUCER_LOCK) {
+
+ MOCK_PRODUCER.clear();
+
+ ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
+
+ String topic = "test";
+
+ p
+ .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+ .withoutMetadata())
+ .apply("writeToKafka", KafkaIO.<Integer, Long>write()
+ .withBootstrapServers("none")
+ .withTopic(topic)
+ .withKeySerializer(IntegerSerializer.class)
+ .withValueSerializer(LongSerializer.class)
+ .withProducerFactoryFn(new ProducerFactoryFn()));
+
+ PipelineResult result = p.run();
+
+ MetricName elementsWritten = SinkMetrics.elementsWritten().getName();
+
+ MetricQueryResults metrics = result.metrics().queryMetrics(
+ MetricsFilter.builder()
+ .addNameFilter(MetricNameFilter.inNamespace(elementsWritten.namespace()))
+ .build());
+
+
+ assertThat(metrics.counters(), hasItem(
+ MetricMatchers.attemptedMetricsResult(
+ elementsWritten.namespace(),
+ elementsWritten.name(),
+ "writeToKafka",
+ 1000L)));
+
+ completionThread.shutdown();
+ }
+ }
+
private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent) {
// verify that appropriate messages are written to kafka
[2/2] beam git commit: This closes #2344
Posted by av...@apache.org.
This closes #2344
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/47821ad6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/47821ad6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/47821ad6
Branch: refs/heads/master
Commit: 47821ad695f67977c775f62b6f8791ca109a7d0b
Parents: 81474ae 930c27f
Author: Aviem Zur <av...@gmail.com>
Authored: Sat Apr 29 18:16:17 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Sat Apr 29 18:16:17 2017 +0300
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 65 +++++++++-
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 130 +++++++++++++++++++
2 files changed, 194 insertions(+), 1 deletion(-)
----------------------------------------------------------------------