You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/10/31 22:08:11 UTC
[1/2] beam git commit: Added VoidSerializer for KafkaIO. Modified
KafkaIO.Write.values() to auto add the VoidSerializer for the key.serializer
config for kafka producer
Repository: beam
Updated Branches:
refs/heads/master 710941e90 -> 54bc58bd0
Added VoidSerializer for KafkaIO. Modified KafkaIO.Write.values() to auto add the VoidSerializer for the key.serializer config for kafka producer
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/39fdace7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/39fdace7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/39fdace7
Branch: refs/heads/master
Commit: 39fdace70457850631d8a57cf9a6906220435fd3
Parents: 710941e
Author: nerdynick <ne...@gmail.com>
Authored: Tue Oct 10 12:04:12 2017 -0600
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Oct 31 14:41:04 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 10 ++++++++--
.../java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 11 ++++-------
2 files changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/39fdace7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 17e0e34..f6158ca 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -128,6 +128,7 @@ import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
@@ -252,7 +253,7 @@ import org.slf4j.LoggerFactory;
* strings.apply(KafkaIO.<Void, String>write()
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withTopic("results")
- * .withValueSerializer(new StringSerializer()) // just need serializer for value
+ * .withValueSerializer(StringSerializer.class) // just need serializer for value
* .values()
* );
* }</pre>
@@ -1598,8 +1599,13 @@ public class KafkaIO {
* Writes just the values to Kafka. This is useful for writing collections of values rather
* thank {@link KV}s.
*/
+ @SuppressWarnings({ "unchecked", "rawtypes" })
public PTransform<PCollection<V>, PDone> values() {
- return new KafkaValueWrite<>(toBuilder().build());
+ return new KafkaValueWrite<>(
+ toBuilder()
+ .setKeySerializer((Class) StringSerializer.class)
+ .build()
+ );
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/39fdace7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 89748dd..2cbd448 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -1196,13 +1196,10 @@ public class KafkaIOTest {
public Producer<Integer, Long> apply(Map<String, Object> config) {
// Make sure the config is correctly set up for serializers.
- // There may not be a key serializer if we're interested only in values.
- if (config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) != null) {
- Utils.newInstance(
- ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
- .asSubclass(Serializer.class)
- ).configure(config, true);
- }
+ Utils.newInstance(
+ ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
+ .asSubclass(Serializer.class)
+ ).configure(config, true);
Utils.newInstance(
((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
[2/2] beam git commit: This closes #3969
Posted by ch...@apache.org.
This closes #3969
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/54bc58bd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/54bc58bd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/54bc58bd
Branch: refs/heads/master
Commit: 54bc58bd095eec01d070378cd93e5e94ad5cec0f
Parents: 710941e 39fdace
Author: chamikara@google.com <ch...@google.com>
Authored: Tue Oct 31 15:07:29 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Oct 31 15:07:29 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 10 ++++++++--
.../java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 11 ++++-------
2 files changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------