You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/30 22:07:53 UTC
[1/2] beam git commit: [BEAM-2248] KafkaIO support to use start read
time to set start offset
Repository: beam
Updated Branches:
refs/heads/master 49067b164 -> 94d677dc9
[BEAM-2248] KafkaIO support to use start read time to set start offset
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d29e353e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d29e353e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d29e353e
Branch: refs/heads/master
Commit: d29e353ea53349e3c94285fdf5b29318252087d1
Parents: 49067b1
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed May 10 19:49:04 2017 +0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 30 14:58:20 2017 -0700
----------------------------------------------------------------------
sdks/java/io/kafka/pom.xml | 2 +-
.../apache/beam/sdk/io/kafka/ConsumerSpEL.java | 56 +++++++++
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 44 ++++++-
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 122 ++++++++++++++++---
4 files changed, 205 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d29e353e/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index f6f0385..29350cc 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -30,7 +30,7 @@
<description>Library to read Kafka topics.</description>
<properties>
- <kafka.clients.version>0.9.0.1</kafka.clients.version>
+ <kafka.clients.version>0.10.1.0</kafka.clients.version>
</properties>
<build>
http://git-wip-us.apache.org/repos/asf/beam/blob/d29e353e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
index 8fe17c1..8cdad22 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
@@ -17,12 +17,18 @@
*/
package org.apache.beam.sdk.io.kafka;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
+import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.expression.Expression;
@@ -51,13 +57,28 @@ class ConsumerSpEL {
private Method timestampMethod;
private boolean hasRecordTimestamp = false;
+ private Method offsetGetterMethod;
+ private Method offsetsForTimesMethod;
+ private boolean hasOffsetsForTimes = false;
+
public ConsumerSpEL() {
try {
+ // It is supported by Kafka Client 0.10.0.0 onwards.
timestampMethod = ConsumerRecord.class.getMethod("timestamp", (Class<?>[]) null);
hasRecordTimestamp = timestampMethod.getReturnType().equals(Long.TYPE);
} catch (NoSuchMethodException | SecurityException e) {
LOG.debug("Timestamp for Kafka message is not available.");
}
+
+ try {
+ // It is supported by Kafka Client 0.10.1.0 onwards.
+ offsetGetterMethod = Class.forName("org.apache.kafka.clients.consumer.OffsetAndTimestamp")
+ .getMethod("offset", (Class<?>[]) null);
+ offsetsForTimesMethod = Consumer.class.getMethod("offsetsForTimes", Map.class);
+ hasOffsetsForTimes = offsetsForTimesMethod.getReturnType().equals(Map.class);
+ } catch (NoSuchMethodException | SecurityException | ClassNotFoundException e) {
+ LOG.debug("OffsetsForTimes is not available.");
+ }
}
public void evaluateSeek2End(Consumer consumer, TopicPartition topicPartitions) {
@@ -88,4 +109,39 @@ class ConsumerSpEL {
}
return timestamp;
}
+
+ public boolean hasOffsetsForTimes() {
+ return hasOffsetsForTimes;
+ }
+
+ /**
+ * Look up the offset for the given partition by timestamp.
+ * Throws RuntimeException if there are no messages later than timestamp or if this partition
+ * does not support timestamp based offset.
+ */
+ @SuppressWarnings("unchecked")
+ public long offsetForTime(Consumer<?, ?> consumer, TopicPartition topicPartition, Instant time) {
+
+ checkArgument(hasOffsetsForTimes,
+ "This Kafka Client must support Consumer.OffsetsForTimes().");
+
+ Map<TopicPartition, Long> timestampsToSearch =
+ ImmutableMap.of(topicPartition, time.getMillis());
+ try {
+ Map offsetsByTimes = (Map) offsetsForTimesMethod.invoke(consumer, timestampsToSearch);
+ Object offsetAndTimestamp = Iterables.getOnlyElement(offsetsByTimes.values());
+
+ if (offsetAndTimestamp == null) {
+ throw new RuntimeException("There are no messages has a timestamp that is greater than or "
+ + "equals to the target time or the message format version in this partition is "
+ + "before 0.10.0, topicPartition is: " + topicPartition);
+ } else {
+ return (long) offsetGetterMethod.invoke(offsetAndTimestamp);
+ }
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d29e353e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index a1130fc..4d2a358 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -101,6 +101,7 @@ import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -306,6 +307,8 @@ public class KafkaIO {
abstract long getMaxNumRecords();
@Nullable abstract Duration getMaxReadTime();
+ @Nullable abstract Instant getStartReadTime();
+
abstract Builder<K, V> toBuilder();
@AutoValue.Builder
@@ -324,6 +327,7 @@ public class KafkaIO {
abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
abstract Builder<K, V> setMaxNumRecords(long maxNumRecords);
abstract Builder<K, V> setMaxReadTime(Duration maxReadTime);
+ abstract Builder<K, V> setStartReadTime(Instant startReadTime);
abstract Read<K, V> build();
}
@@ -448,6 +452,24 @@ public class KafkaIO {
}
/**
+ * Use timestamp to set up start offset.
+ * It is only supported by Kafka Client 0.10.1.0 onwards and the message format version
+ * after 0.10.0.
+ *
+ * <p>Note that this take priority over start offset configuration
+ * {@code ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} and any auto committed offsets.
+ *
+ * <p>This results in hard failures in either of the following two cases :
+ * 1. If one of more partitions do not contain any messages with timestamp larger than or
+ * equal to desired timestamp.
+ * 2. If the message format version in a partition is before 0.10.0, i.e. the messages do
+ * not have timestamps.
+ */
+ public Read<K, V> withStartReadTime(Instant startReadTime) {
+ return toBuilder().setStartReadTime(startReadTime).build();
+ }
+
+ /**
* Similar to
* {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}.
* Mainly used for tests and demo
@@ -508,6 +530,13 @@ public class KafkaIO {
"Kafka topics or topic_partitions are required");
checkNotNull(getKeyDeserializer(), "Key deserializer must be set");
checkNotNull(getValueDeserializer(), "Value deserializer must be set");
+ if (getStartReadTime() != null) {
+ checkArgument(new ConsumerSpEL().hasOffsetsForTimes(),
+ "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, "
+ + "current version of Kafka Client is " + AppInfoParser.getVersion()
+ + ". If you are building with maven, set \"kafka.clients.version\" "
+ + "maven property to 0.10.1.0 or newer.");
+ }
}
@Override
@@ -1041,10 +1070,17 @@ public class KafkaIO {
consumer.seek(p.topicPartition, p.nextOffset);
} else {
// nextOffset is unininitialized here, meaning start reading from latest record as of now
- // ('latest' is the default, and is configurable). Remember the current position without
- // waiting until the first record read. This ensures checkpoint is accurate even if the
- // reader is closed before reading any records.
- p.nextOffset = consumer.position(p.topicPartition);
+ // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
+ // Remember the current position without waiting until the first record is read. This
+ // ensures checkpoint is accurate even if the reader is closed before reading any records.
+ Instant startReadTime = spec.getStartReadTime();
+ if (startReadTime != null) {
+ p.nextOffset =
+ consumerSpEL.offsetForTime(consumer, p.topicPartition, spec.getStartReadTime());
+ consumer.seek(p.topicPartition, p.nextOffset);
+ } else {
+ p.nextOffset = consumer.position(p.topicPartition);
+ }
}
LOG.info("{}: reading from {} starting at offset {}", name, p.topicPartition, p.nextOffset);
http://git-wip-us.apache.org/repos/asf/beam/blob/d29e353e/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 691f7f4..b69bc83 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -24,11 +24,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -150,7 +153,7 @@ public class KafkaIOTest {
}
int numPartitions = partitions.size();
- long[] offsets = new long[numPartitions];
+ final long[] offsets = new long[numPartitions];
for (int i = 0; i < numElements; i++) {
int pIdx = i % numPartitions;
@@ -184,6 +187,36 @@ public class KafkaIOTest {
updateEndOffsets(ImmutableMap.of(tp, (long) records.get(tp).size()));
}
}
+ // Override offsetsForTimes() in order to look up the offsets by timestamp.
+ // Remove keyword '@Override' here, Kafka client 0.10.1.0 previous versions does not have
+ // this method.
+ // Should return Map<TopicPartition, OffsetAndTimestamp>, but 0.10.1.0 previous versions
+ // does not have the OffsetAndTimestamp class. So return a raw type and use reflection
+ // here.
+ @SuppressWarnings("unchecked")
+ public Map offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
+ HashMap<TopicPartition, Object> result = new HashMap<>();
+ try {
+ Class<?> cls = Class.forName("org.apache.kafka.clients.consumer.OffsetAndTimestamp");
+ // OffsetAndTimestamp(long offset, long timestamp)
+ Constructor constructor = cls.getDeclaredConstructor(long.class, long.class);
+
+ // In test scope, timestamp == offset.
+ for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
+ long maxOffset = offsets[partitions.indexOf(entry.getKey())];
+ Long offset = entry.getValue();
+ if (offset >= maxOffset) {
+ offset = null;
+ }
+ result.put(
+ entry.getKey(), constructor.newInstance(entry.getValue(), offset));
+ }
+ return result;
+ } catch (ClassNotFoundException | IllegalAccessException
+ | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
};
for (String topic : topics) {
@@ -239,12 +272,19 @@ public class KafkaIOTest {
}
}
+ private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
+ int numElements,
+ @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
+ return mkKafkaReadTransform(numElements, numElements, timestampFn);
+ }
+
/**
* Creates a consumer with two topics, with 10 partitions each.
* numElements are (round-robin) assigned all the 20 partitions.
*/
private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
int numElements,
+ int maxNumRecords,
@Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
List<String> topics = ImmutableList.of("topic_a", "topic_b");
@@ -256,7 +296,7 @@ public class KafkaIOTest {
topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(LongDeserializer.class)
- .withMaxNumRecords(numElements);
+ .withMaxNumRecords(maxNumRecords);
if (timestampFn != null) {
return reader.withTimestampFn(timestampFn);
@@ -283,22 +323,31 @@ public class KafkaIOTest {
public static void addCountingAsserts(PCollection<Long> input, long numElements) {
// Count == numElements
- PAssert
- .thatSingleton(input.apply("Count", Count.<Long>globally()))
- .isEqualTo(numElements);
// Unique count == numElements
- PAssert
- .thatSingleton(input.apply(Distinct.<Long>create())
- .apply("UniqueCount", Count.<Long>globally()))
- .isEqualTo(numElements);
// Min == 0
- PAssert
- .thatSingleton(input.apply("Min", Min.<Long>globally()))
- .isEqualTo(0L);
// Max == numElements-1
+ addCountingAsserts(input, numElements, numElements, 0L, numElements - 1);
+ }
+
+ public static void addCountingAsserts(
+ PCollection<Long> input, long count, long uniqueCount, long min, long max) {
+
+ PAssert
+ .thatSingleton(input.apply("Count", Count.<Long>globally()))
+ .isEqualTo(count);
+
+ PAssert
+ .thatSingleton(input.apply(Distinct.<Long>create())
+ .apply("UniqueCount", Count.<Long>globally()))
+ .isEqualTo(uniqueCount);
+
PAssert
- .thatSingleton(input.apply("Max", Max.<Long>globally()))
- .isEqualTo(numElements - 1);
+ .thatSingleton(input.apply("Min", Min.<Long>globally()))
+ .isEqualTo(min);
+
+ PAssert
+ .thatSingleton(input.apply("Max", Max.<Long>globally()))
+ .isEqualTo(max);
}
@Test
@@ -749,6 +798,51 @@ public class KafkaIOTest {
}
@Test
+ public void testUnboundedSourceStartReadTime() {
+
+ assumeTrue(new ConsumerSpEL().hasOffsetsForTimes());
+
+ int numElements = 1000;
+ // In this MockConsumer, we let the elements of the time and offset equal and there are 20
+ // partitions. So set this startTime can read half elements.
+ int startTime = numElements / 20 / 2;
+ int maxNumRecords = numElements / 2;
+
+ PCollection<Long> input = p
+ .apply(mkKafkaReadTransform(numElements, maxNumRecords, new ValueAsTimestampFn())
+ .withStartReadTime(new Instant(startTime))
+ .withoutMetadata())
+ .apply(Values.<Long>create());
+
+ addCountingAsserts(input, maxNumRecords, maxNumRecords, maxNumRecords, numElements - 1);
+ p.run();
+
+ }
+
+ @Rule public ExpectedException noMessagesException = ExpectedException.none();
+
+ @Test
+ public void testUnboundedSourceStartReadTimeException() {
+
+ assumeTrue(new ConsumerSpEL().hasOffsetsForTimes());
+
+ noMessagesException.expect(RuntimeException.class);
+
+ int numElements = 1000;
+ // In this MockConsumer, we let the elements of the time and offset equal and there are 20
+ // partitions. So set this startTime can not read any element.
+ int startTime = numElements / 20;
+
+ p.apply(mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn())
+ .withStartReadTime(new Instant(startTime))
+ .withoutMetadata())
+ .apply(Values.<Long>create());
+
+ p.run();
+
+ }
+
+ @Test
public void testSourceDisplayData() {
KafkaIO.Read<Integer, Long> read = mkKafkaReadTransform(10, null);
[2/2] beam git commit: This closes #3044: [BEAM-2248] KafkaIO support
to use start read time to set start offset
Posted by jk...@apache.org.
This closes #3044: [BEAM-2248] KafkaIO support to use start read time to set start offset
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/94d677dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/94d677dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/94d677dc
Branch: refs/heads/master
Commit: 94d677dc97330b424ce960310f23e7992f67bfd6
Parents: 49067b1 d29e353
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue May 30 14:58:28 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 30 14:58:28 2017 -0700
----------------------------------------------------------------------
sdks/java/io/kafka/pom.xml | 2 +-
.../apache/beam/sdk/io/kafka/ConsumerSpEL.java | 56 +++++++++
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 44 ++++++-
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 122 ++++++++++++++++---
4 files changed, 205 insertions(+), 19 deletions(-)
----------------------------------------------------------------------