You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:14 UTC
[02/50] [abbrv] beam git commit: Add timeout to initialization of
partition in KafkaIO
Add timeout to initialization of partition in KafkaIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c167d109
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c167d109
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c167d109
Branch: refs/heads/DSL_SQL
Commit: c167d10968b1bbd4f959f93ab3bcd4f76576c823
Parents: 4862703
Author: Raghu Angadi <ra...@google.com>
Authored: Mon Jul 3 23:54:10 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:00:59 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 81 +++++++++++++++-----
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 30 ++++++++
2 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c167d109/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index e520367..026313a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -49,9 +49,11 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
@@ -1061,8 +1063,32 @@ public class KafkaIO {
curBatch = Iterators.cycle(nonEmpty);
}
+ private void setupInitialOffset(PartitionState pState) {
+ Read<K, V> spec = source.spec;
+
+ if (pState.nextOffset != UNINITIALIZED_OFFSET) {
+ consumer.seek(pState.topicPartition, pState.nextOffset);
+ } else {
+ // nextOffset is unininitialized here, meaning start reading from latest record as of now
+ // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
+ // Remember the current position without waiting until the first record is read. This
+ // ensures checkpoint is accurate even if the reader is closed before reading any records.
+ Instant startReadTime = spec.getStartReadTime();
+ if (startReadTime != null) {
+ pState.nextOffset =
+ consumerSpEL.offsetForTime(consumer, pState.topicPartition, spec.getStartReadTime());
+ consumer.seek(pState.topicPartition, pState.nextOffset);
+ } else {
+ pState.nextOffset = consumer.position(pState.topicPartition);
+ }
+ }
+ }
+
@Override
public boolean start() throws IOException {
+ final int defaultPartitionInitTimeout = 60 * 1000;
+ final int kafkaRequestTimeoutMultiple = 2;
+
Read<K, V> spec = source.spec;
consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
@@ -1077,25 +1103,38 @@ public class KafkaIO {
keyDeserializerInstance.configure(spec.getConsumerConfig(), true);
valueDeserializerInstance.configure(spec.getConsumerConfig(), false);
- for (PartitionState p : partitionStates) {
- if (p.nextOffset != UNINITIALIZED_OFFSET) {
- consumer.seek(p.topicPartition, p.nextOffset);
- } else {
- // nextOffset is unininitialized here, meaning start reading from latest record as of now
- // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
- // Remember the current position without waiting until the first record is read. This
- // ensures checkpoint is accurate even if the reader is closed before reading any records.
- Instant startReadTime = spec.getStartReadTime();
- if (startReadTime != null) {
- p.nextOffset =
- consumerSpEL.offsetForTime(consumer, p.topicPartition, spec.getStartReadTime());
- consumer.seek(p.topicPartition, p.nextOffset);
- } else {
- p.nextOffset = consumer.position(p.topicPartition);
+ // Seek to start offset for each partition. This is the first interaction with the server.
+ // Unfortunately it can block forever in case of network issues like incorrect ACLs.
+ // Initialize partition in a separate thread and cancel it if takes longer than a minute.
+ for (final PartitionState pState : partitionStates) {
+ Future<?> future = consumerPollThread.submit(new Runnable() {
+ public void run() {
+ setupInitialOffset(pState);
}
- }
+ });
- LOG.info("{}: reading from {} starting at offset {}", name, p.topicPartition, p.nextOffset);
+ try {
+ // Timeout : 1 minute OR 2 * Kafka consumer request timeout if it is set.
+ Integer reqTimeout = (Integer) source.spec.getConsumerConfig().get(
+ ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ future.get(reqTimeout != null ? kafkaRequestTimeoutMultiple * reqTimeout
+ : defaultPartitionInitTimeout,
+ TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ consumer.wakeup(); // This unblocks consumer stuck on network I/O.
+ // Likely reason : Kafka servers are configured to advertise internal ips, but
+ // those ips are not accessible from workers outside.
+ String msg = String.format(
+ "%s: Timeout while initializing partition '%s'. "
+ + "Kafka client may not be able to connect to servers.",
+ this, pState.topicPartition);
+ LOG.error("{}", msg);
+ throw new IOException(msg);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ LOG.info("{}: reading from {} starting at offset {}",
+ name, pState.topicPartition, pState.nextOffset);
}
// Start consumer read loop.
@@ -1329,8 +1368,12 @@ public class KafkaIO {
// might block to enqueue right after availableRecordsQueue.poll() below.
while (!isShutdown) {
- consumer.wakeup();
- offsetConsumer.wakeup();
+ if (consumer != null) {
+ consumer.wakeup();
+ }
+ if (offsetConsumer != null) {
+ offsetConsumer.wakeup();
+ }
availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread.
try {
isShutdown = consumerPollThread.awaitTermination(10, TimeUnit.SECONDS)
http://git-wip-us.apache.org/repos/asf/beam/blob/c167d109/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index b69bc83..482f5a2 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -83,6 +83,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -364,6 +365,35 @@ public class KafkaIOTest {
}
@Test
+ public void testUnreachableKafkaBrokers() {
+ // Expect an exception when the Kafka brokers are not reachable on the workers.
+ // We specify partitions explicitly so that splitting does not involve server interaction.
+ // Set request timeout to 10ms so that test does not take long.
+
+ thrown.expect(Exception.class);
+ thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'");
+
+ int numElements = 1000;
+ PCollection<Long> input = p
+ .apply(KafkaIO.<Integer, Long>read()
+ .withBootstrapServers("8.8.8.8:9092") // Google public DNS ip.
+ .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0)))
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(LongDeserializer.class)
+ .updateConsumerProperties(ImmutableMap.<String, Object>of(
+ ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10,
+ ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5,
+ ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 8,
+ ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 8))
+ .withMaxNumRecords(10)
+ .withoutMetadata())
+ .apply(Values.<Long>create());
+
+ addCountingAsserts(input, numElements);
+ p.run();
+ }
+
+ @Test
public void testUnboundedSourceWithSingleTopic() {
// same as testUnboundedSource, but with single topic