You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2017/03/01 17:44:34 UTC
[1/2] beam git commit: [BEAM-351] Add DisplayData to KafkaIO
Repository: beam
Updated Branches:
refs/heads/master d84b06791 -> 3b3d6b81a
[BEAM-351] Add DisplayData to KafkaIO
Changes after review.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52e2d3a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52e2d3a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52e2d3a7
Branch: refs/heads/master
Commit: 52e2d3a77096460ae4a10ac977b4897a1eecf3a1
Parents: d84b067
Author: Aviem Zur <av...@gmail.com>
Authored: Sun Feb 26 22:39:28 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Wed Mar 1 19:28:26 2017 +0200
----------------------------------------------------------------------
sdks/java/io/kafka/pom.xml | 7 +++
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 49 ++++++++++++++++++
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 53 +++++++++++++++++++-
3 files changed, 108 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/52e2d3a7/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index 6935f1e..d5ffe63 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -149,6 +149,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/beam/blob/52e2d3a7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 5fd34b9..890fb2b 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -44,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -63,12 +64,14 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
import org.apache.beam.sdk.values.KV;
@@ -500,6 +503,27 @@ public class KafkaIO {
return new KafkaConsumer<>(config);
}
};
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ List<String> topics = getTopics();
+ List<TopicPartition> topicPartitions = getTopicPartitions();
+ if (topics.size() > 0) {
+ builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s"));
+ } else if (topicPartitions.size() > 0) {
+ builder.add(DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions))
+ .withLabel("Topic Partition/s"));
+ }
+ Set<String> ignoredConsumerPropertiesKeys = IGNORED_CONSUMER_PROPERTIES.keySet();
+ for (Map.Entry<String, Object> conf : getConsumerConfig().entrySet()) {
+ String key = conf.getKey();
+ if (!ignoredConsumerPropertiesKeys.contains(key)) {
+ builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(conf.getValue())));
+ }
+ }
+ }
}
/**
@@ -527,6 +551,12 @@ public class KafkaIO {
}
}));
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ read.populateDisplayData(builder);
+ }
}
////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1222,6 +1252,19 @@ public class KafkaIO {
configForKeySerializer(), "Reserved for internal serializer",
configForValueSerializer(), "Reserved for internal serializer"
);
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.addIfNotNull(DisplayData.item("topic", getTopic()).withLabel("Topic"));
+ Set<String> ignoredProducerPropertiesKeys = IGNORED_PRODUCER_PROPERTIES.keySet();
+ for (Map.Entry<String, Object> conf : getProducerConfig().entrySet()) {
+ String key = conf.getKey();
+ if (!ignoredProducerPropertiesKeys.contains(key)) {
+ builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(conf.getValue())));
+ }
+ }
+ }
}
/**
@@ -1248,6 +1291,12 @@ public class KafkaIO {
.setCoder(KvCoder.of(new NullOnlyCoder<K>(), kvWriteTransform.getValueCoder()))
.apply(kvWriteTransform);
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ kvWriteTransform.populateDisplayData(builder);
+ }
}
private static class NullOnlyCoder<T> extends AtomicCoder<T> {
http://git-wip-us.apache.org/repos/asf/beam/blob/52e2d3a7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 9d7c27b..1897127 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.kafka;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@@ -24,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -58,6 +60,7 @@ import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -223,7 +226,7 @@ public class KafkaIOTest {
List<String> topics = ImmutableList.of("topic_a", "topic_b");
KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
- .withBootstrapServers("none")
+ .withBootstrapServers("myServer1:9092,myServer2:9092")
.withTopics(topics)
.withConsumerFactoryFn(new ConsumerFactoryFn(
topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
@@ -619,6 +622,54 @@ public class KafkaIOTest {
}
}
+ @Test
+ public void testSourceDisplayData() {
+ KafkaIO.Read<Integer, Long> read = mkKafkaReadTransform(10, null);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("topics", "topic_a,topic_b"));
+ assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
+ assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
+ assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
+ assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
+ }
+
+ @Test
+ public void testSourceWithExplicitPartitionsDisplayData() {
+ KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read()
+ .withBootstrapServers("myServer1:9092,myServer2:9092")
+ .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5),
+ new TopicPartition("test", 6)))
+ .withConsumerFactoryFn(new ConsumerFactoryFn(
+ Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions
+ .withKeyCoder(ByteArrayCoder.of())
+ .withValueCoder(BigEndianLongCoder.of());
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("topicPartitions", "test-5,test-6"));
+ assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
+ assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
+ assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
+ assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
+ }
+
+ @Test
+ public void testSinkDisplayData() {
+ KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write()
+ .withBootstrapServers("myServerA:9092,myServerB:9092")
+ .withTopic("myTopic")
+ .withValueCoder(BigEndianLongCoder.of())
+ .withProducerFactoryFn(new ProducerFactoryFn());
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("topic", "myTopic"));
+ assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
+ assertThat(displayData, hasDisplayItem("retries", 3));
+ }
+
private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent) {
// verify that appropriate messages are written to kafka
[2/2] beam git commit: This closes #2111
Posted by am...@apache.org.
This closes #2111
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3b3d6b81
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3b3d6b81
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3b3d6b81
Branch: refs/heads/master
Commit: 3b3d6b81a5fe0f09aaa52340fdc9e671b46e4d57
Parents: d84b067 52e2d3a
Author: Sela <an...@paypal.com>
Authored: Wed Mar 1 19:29:05 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Wed Mar 1 19:29:05 2017 +0200
----------------------------------------------------------------------
sdks/java/io/kafka/pom.xml | 7 +++
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 49 ++++++++++++++++++
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 53 +++++++++++++++++++-
3 files changed, 108 insertions(+), 1 deletion(-)
----------------------------------------------------------------------