You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/11/08 16:53:23 UTC
[1/2] beam git commit: Ensure Kafka sink serializers are set.
Repository: beam
Updated Branches:
refs/heads/master 0af972095 -> ae45bbd63
Ensure Kafka sink serializers are set.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b413a966
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b413a966
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b413a966
Branch: refs/heads/master
Commit: b413a9665f99599bfd929f850fa67d227ea190d5
Parents: 0af9720
Author: Raghu Angadi <ra...@google.com>
Authored: Fri Oct 20 15:29:20 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Nov 8 08:52:42 2017 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b413a966/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index f6158ca..33fc289 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -928,10 +928,8 @@ public class KafkaIO {
// Backlog support :
// Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd()
// then look at position(). Use another consumer to do this so that the primary consumer does
- // not need to be interrupted. The latest offsets are fetched periodically on another thread.
- // This is still a hack. There could be unintended side effects, e.g. if user enabled offset
- // auto commit in consumer config, this could interfere with the primary consumer (we will
- // handle this particular problem). We might have to make this optional.
+ // not need to be interrupted. The latest offsets are fetched periodically on a thread. This is
+ // still a bit of a hack, but so far there haven't been any issues reported by the users.
private Consumer<byte[], byte[]> offsetConsumer;
private final ScheduledExecutorService offsetFetcherThread =
Executors.newSingleThreadScheduledExecutor();
@@ -1614,6 +1612,8 @@ public class KafkaIO {
getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
"withBootstrapServers() is required");
checkArgument(getTopic() != null, "withTopic() is required");
+ checkArgument(getKeySerializer() != null, "withKeySerializer() is required");
+ checkArgument(getValueSerializer() != null, "withValueSerializer() is required");
if (isEOS()) {
EOSWrite.ensureEOSSupport();
[2/2] beam git commit: This closes #4034
Posted by ch...@apache.org.
This closes #4034
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ae45bbd6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ae45bbd6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ae45bbd6
Branch: refs/heads/master
Commit: ae45bbd635c898b879efc3251f834428f01dfb57
Parents: 0af9720 b413a96
Author: chamikara@google.com <ch...@google.com>
Authored: Wed Nov 8 08:52:53 2017 -0800
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Nov 8 08:52:53 2017 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------