You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/10/17 15:17:43 UTC
[08/13] beam git commit: Add runner check. Add withReadCommit() to
reader.
Add runner check. Add withReadCommit() to reader.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e280fad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e280fad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e280fad
Branch: refs/heads/master
Commit: 5e280fad28b97c26f27bfb037ef7f45b8fe034e5
Parents: 265e7c7
Author: Raghu Angadi <ra...@google.com>
Authored: Wed Oct 4 17:43:47 2017 -0700
Committer: Raghu Angadi <ra...@google.com>
Committed: Tue Oct 17 00:02:05 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 31 ++++++++++++++++++++
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 11 ++++---
2 files changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5e280fad/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 76e8a1e..b9b6bd4 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -549,6 +549,18 @@ public class KafkaIO {
}
/**
+ * Sets "isolation_level" to "read_committed" in Kafka consumer configuration. This is
+ * ensures that the consumer does not read uncommitted messages. Kafka version 0.11
+ * introduced transactional writes. Applications requiring end-to-end exactly-once
+ * semantics should only read committed messages. See JavaDoc for {@link KafkaConsumer}
+ * for more description.
+ */
+ public Read<K, V> withReadCommitted() {
+ return updateConsumerProperties(
+ ImmutableMap.<String, Object>of("isolation.level", "read_committed"));
+ }
+
+ /**
* Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata.
*/
public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
@@ -1275,6 +1287,9 @@ public class KafkaIO {
private void updateLatestOffsets() {
for (PartitionState p : partitionStates) {
try {
+ // If "read_committed" is enabled in the config, this seeks to 'Last Stable Offset'.
+ // As a result uncommitted messages are not counted in backlog. It is correct since
+ // the reader can not read them anyway.
consumerSpEL.evaluateSeek2End(offsetConsumer, p.topicPartition);
long offset = offsetConsumer.position(p.topicPartition);
p.setLatestOffset(offset);
@@ -1573,6 +1588,22 @@ public class KafkaIO {
return PDone.in(input.getPipeline());
}
+ public void validate(PipelineOptions options) {
+ if (isEOS()) {
+ String runner = options.getRunner().getName();
+ if (runner.equals("org.apache.beam.runners.direct.DirectRunner")
+ || runner.startsWith("org.apache.beam.runners.dataflow.")
+ || runner.startsWith("org.apache.beam.runners.spark.")) {
+ return;
+ }
+ throw new UnsupportedOperationException(
+ runner + " is not whitelisted among runners compatible with Kafka exactly-once sink. "
+ + "This implementation of exactly-once sink relies on specific checkpoint guarantees. "
+ + "Only the runners with known to have compatible checkpoint semantics are whitelisted."
+ );
+ }
+ }
+
// set config defaults
private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
ImmutableMap.<String, Object>of(
http://git-wip-us.apache.org/repos/asf/beam/blob/5e280fad/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 1532788..b1e4a4d 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -791,9 +791,13 @@ public class KafkaIOTest {
}
@Test
- public void testEOSink() throws Exception {
- // similar to testSink(), enables to EOS.
- // This does not actually test exactly-once-semantics. Mainly exercises the code.
+ public void testEOSink() {
+ // testSink() with EOS enabled.
+ // This does not actually inject retries in a stage to test exactly-once-semantics.
+ // It mainly exercises the code in normal flow without retries.
+ // Ideally we should test EOS Sink by triggering replays of a messages between stages.
+ // It is not feasible to test such retries with direct runner. When DoFnTester supports
+ // state, we can test KafkaEOWriter DoFn directly to ensure it handles retries correctly.
if (!ProducerSpEL.supportsTransactions()) {
LOG.warn("testEOSink() is disabled as Kafka client version does not support transactions.");
@@ -832,7 +836,6 @@ public class KafkaIOTest {
}
}
-
@Test
public void testSinkWithSendErrors() throws Throwable {
// similar to testSink(), except that up to 10 of the send calls to producer will fail