You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/06 19:00:05 UTC
[1/2] beam git commit: Add timeout to initialization of partition in
KafkaIO
Repository: beam
Updated Branches:
refs/heads/master 85a99e294 -> b8f8d18ae
Add timeout to initialization of partition in KafkaIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/526037b6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/526037b6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/526037b6
Branch: refs/heads/master
Commit: 526037b6786315b9f9fdca6edb636baeb6f83e3f
Parents: 85a99e2
Author: Raghu Angadi <ra...@google.com>
Authored: Mon Jul 3 23:54:10 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 11:58:41 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 81 +++++++++++++++-----
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 30 ++++++++
2 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/526037b6/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index e520367..026313a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -49,9 +49,11 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
@@ -1061,8 +1063,32 @@ public class KafkaIO {
curBatch = Iterators.cycle(nonEmpty);
}
+ private void setupInitialOffset(PartitionState pState) {
+ Read<K, V> spec = source.spec;
+
+ if (pState.nextOffset != UNINITIALIZED_OFFSET) {
+ consumer.seek(pState.topicPartition, pState.nextOffset);
+ } else {
+ // nextOffset is unininitialized here, meaning start reading from latest record as of now
+ // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
+ // Remember the current position without waiting until the first record is read. This
+ // ensures checkpoint is accurate even if the reader is closed before reading any records.
+ Instant startReadTime = spec.getStartReadTime();
+ if (startReadTime != null) {
+ pState.nextOffset =
+ consumerSpEL.offsetForTime(consumer, pState.topicPartition, spec.getStartReadTime());
+ consumer.seek(pState.topicPartition, pState.nextOffset);
+ } else {
+ pState.nextOffset = consumer.position(pState.topicPartition);
+ }
+ }
+ }
+
@Override
public boolean start() throws IOException {
+ final int defaultPartitionInitTimeout = 60 * 1000;
+ final int kafkaRequestTimeoutMultiple = 2;
+
Read<K, V> spec = source.spec;
consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
@@ -1077,25 +1103,38 @@ public class KafkaIO {
keyDeserializerInstance.configure(spec.getConsumerConfig(), true);
valueDeserializerInstance.configure(spec.getConsumerConfig(), false);
- for (PartitionState p : partitionStates) {
- if (p.nextOffset != UNINITIALIZED_OFFSET) {
- consumer.seek(p.topicPartition, p.nextOffset);
- } else {
- // nextOffset is unininitialized here, meaning start reading from latest record as of now
- // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
- // Remember the current position without waiting until the first record is read. This
- // ensures checkpoint is accurate even if the reader is closed before reading any records.
- Instant startReadTime = spec.getStartReadTime();
- if (startReadTime != null) {
- p.nextOffset =
- consumerSpEL.offsetForTime(consumer, p.topicPartition, spec.getStartReadTime());
- consumer.seek(p.topicPartition, p.nextOffset);
- } else {
- p.nextOffset = consumer.position(p.topicPartition);
+ // Seek to start offset for each partition. This is the first interaction with the server.
+ // Unfortunately it can block forever in case of network issues like incorrect ACLs.
+ // Initialize partition in a separate thread and cancel it if takes longer than a minute.
+ for (final PartitionState pState : partitionStates) {
+ Future<?> future = consumerPollThread.submit(new Runnable() {
+ public void run() {
+ setupInitialOffset(pState);
}
- }
+ });
- LOG.info("{}: reading from {} starting at offset {}", name, p.topicPartition, p.nextOffset);
+ try {
+ // Timeout : 1 minute OR 2 * Kafka consumer request timeout if it is set.
+ Integer reqTimeout = (Integer) source.spec.getConsumerConfig().get(
+ ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ future.get(reqTimeout != null ? kafkaRequestTimeoutMultiple * reqTimeout
+ : defaultPartitionInitTimeout,
+ TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ consumer.wakeup(); // This unblocks consumer stuck on network I/O.
+ // Likely reason : Kafka servers are configured to advertise internal ips, but
+ // those ips are not accessible from workers outside.
+ String msg = String.format(
+ "%s: Timeout while initializing partition '%s'. "
+ + "Kafka client may not be able to connect to servers.",
+ this, pState.topicPartition);
+ LOG.error("{}", msg);
+ throw new IOException(msg);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ LOG.info("{}: reading from {} starting at offset {}",
+ name, pState.topicPartition, pState.nextOffset);
}
// Start consumer read loop.
@@ -1329,8 +1368,12 @@ public class KafkaIO {
// might block to enqueue right after availableRecordsQueue.poll() below.
while (!isShutdown) {
- consumer.wakeup();
- offsetConsumer.wakeup();
+ if (consumer != null) {
+ consumer.wakeup();
+ }
+ if (offsetConsumer != null) {
+ offsetConsumer.wakeup();
+ }
availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread.
try {
isShutdown = consumerPollThread.awaitTermination(10, TimeUnit.SECONDS)
http://git-wip-us.apache.org/repos/asf/beam/blob/526037b6/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index b69bc83..482f5a2 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -83,6 +83,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -364,6 +365,35 @@ public class KafkaIOTest {
}
@Test
+ public void testUnreachableKafkaBrokers() {
+ // Expect an exception when the Kafka brokers are not reachable on the workers.
+ // We specify partitions explicitly so that splitting does not involve server interaction.
+ // Set request timeout to 10ms so that test does not take long.
+
+ thrown.expect(Exception.class);
+ thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'");
+
+ int numElements = 1000;
+ PCollection<Long> input = p
+ .apply(KafkaIO.<Integer, Long>read()
+ .withBootstrapServers("8.8.8.8:9092") // Google public DNS ip.
+ .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0)))
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(LongDeserializer.class)
+ .updateConsumerProperties(ImmutableMap.<String, Object>of(
+ ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10,
+ ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5,
+ ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 8,
+ ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 8))
+ .withMaxNumRecords(10)
+ .withoutMetadata())
+ .apply(Values.<Long>create());
+
+ addCountingAsserts(input, numElements);
+ p.run();
+ }
+
+ @Test
public void testUnboundedSourceWithSingleTopic() {
// same as testUnboundedSource, but with single topic
[2/2] beam git commit: This closes #3492: [BEAM-2551] KafkaIO reader
blocks indefinitely in case of network issues
Posted by ke...@apache.org.
This closes #3492: [BEAM-2551] KafkaIO reader blocks indefinitely in case of network issues
Add timeout to initialization of partition in KafkaIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b8f8d18a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b8f8d18a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b8f8d18a
Branch: refs/heads/master
Commit: b8f8d18ae2cdbb4874d9a0f45038037ecc2381d1
Parents: 85a99e2 526037b
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 6 11:59:18 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 11:59:18 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 81 +++++++++++++++-----
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 30 ++++++++
2 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------