You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/30 22:07:53 UTC

[1/2] beam git commit: [BEAM-2248] KafkaIO support to use start read time to set start offset

Repository: beam
Updated Branches:
  refs/heads/master 49067b164 -> 94d677dc9


[BEAM-2248] KafkaIO support to use start read time to set start offset


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d29e353e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d29e353e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d29e353e

Branch: refs/heads/master
Commit: d29e353ea53349e3c94285fdf5b29318252087d1
Parents: 49067b1
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed May 10 19:49:04 2017 +0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 30 14:58:20 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/kafka/pom.xml                      |   2 +-
 .../apache/beam/sdk/io/kafka/ConsumerSpEL.java  |  56 +++++++++
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  44 ++++++-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 122 ++++++++++++++++---
 4 files changed, 205 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d29e353e/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index f6f0385..29350cc 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -30,7 +30,7 @@
   <description>Library to read Kafka topics.</description>
 
   <properties>
-    <kafka.clients.version>0.9.0.1</kafka.clients.version>
+    <kafka.clients.version>0.10.1.0</kafka.clients.version>
   </properties>
 
   <build>

http://git-wip-us.apache.org/repos/asf/beam/blob/d29e353e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
index 8fe17c1..8cdad22 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
@@ -17,12 +17,18 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Collection;
+import java.util.Map;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.expression.Expression;
@@ -51,13 +57,28 @@ class ConsumerSpEL {
   private Method timestampMethod;
   private boolean hasRecordTimestamp = false;
 
+  private Method offsetGetterMethod;
+  private Method offsetsForTimesMethod;
+  private boolean hasOffsetsForTimes = false;
+
   public ConsumerSpEL() {
     try {
+      // It is supported by Kafka Client 0.10.0.0 onwards.
       timestampMethod = ConsumerRecord.class.getMethod("timestamp", (Class<?>[]) null);
       hasRecordTimestamp = timestampMethod.getReturnType().equals(Long.TYPE);
     } catch (NoSuchMethodException | SecurityException e) {
       LOG.debug("Timestamp for Kafka message is not available.");
     }
+
+    try {
+      // It is supported by Kafka Client 0.10.1.0 onwards.
+      offsetGetterMethod = Class.forName("org.apache.kafka.clients.consumer.OffsetAndTimestamp")
+          .getMethod("offset", (Class<?>[]) null);
+      offsetsForTimesMethod = Consumer.class.getMethod("offsetsForTimes", Map.class);
+      hasOffsetsForTimes = offsetsForTimesMethod.getReturnType().equals(Map.class);
+    } catch (NoSuchMethodException | SecurityException | ClassNotFoundException e) {
+      LOG.debug("OffsetsForTimes is not available.");
+    }
   }
 
   public void evaluateSeek2End(Consumer consumer, TopicPartition topicPartitions) {
@@ -88,4 +109,39 @@ class ConsumerSpEL {
     }
     return timestamp;
   }
+
+  public boolean hasOffsetsForTimes() {
+    return hasOffsetsForTimes;
+  }
+
+  /**
+   * Look up the offset for the given partition by timestamp.
+   * Throws RuntimeException if there are no messages later than timestamp or if this partition
+   * does not support timestamp based offset.
+   */
+  @SuppressWarnings("unchecked")
+  public long offsetForTime(Consumer<?, ?> consumer, TopicPartition topicPartition, Instant time) {
+
+    checkArgument(hasOffsetsForTimes,
+        "This Kafka Client must support Consumer.OffsetsForTimes().");
+
+    Map<TopicPartition, Long> timestampsToSearch =
+        ImmutableMap.of(topicPartition, time.getMillis());
+    try {
+      Map offsetsByTimes = (Map) offsetsForTimesMethod.invoke(consumer, timestampsToSearch);
+      Object offsetAndTimestamp = Iterables.getOnlyElement(offsetsByTimes.values());
+
+      if (offsetAndTimestamp == null) {
+        throw new RuntimeException("There are no messages has a timestamp that is greater than or "
+            + "equals to the target time or the message format version in this partition is "
+            + "before 0.10.0, topicPartition is: " + topicPartition);
+      } else {
+        return (long) offsetGetterMethod.invoke(offsetAndTimestamp);
+      }
+    } catch (IllegalAccessException | InvocationTargetException e) {
+      throw new RuntimeException(e);
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d29e353e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index a1130fc..4d2a358 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -101,6 +101,7 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.AppInfoParser;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -306,6 +307,8 @@ public class KafkaIO {
     abstract long getMaxNumRecords();
     @Nullable abstract Duration getMaxReadTime();
 
+    @Nullable abstract Instant getStartReadTime();
+
     abstract Builder<K, V> toBuilder();
 
     @AutoValue.Builder
@@ -324,6 +327,7 @@ public class KafkaIO {
       abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
       abstract Builder<K, V> setMaxNumRecords(long maxNumRecords);
       abstract Builder<K, V> setMaxReadTime(Duration maxReadTime);
+      abstract Builder<K, V> setStartReadTime(Instant startReadTime);
 
       abstract Read<K, V> build();
     }
@@ -448,6 +452,24 @@ public class KafkaIO {
     }
 
     /**
+     * Use timestamp to set up start offset.
+     * It is only supported by Kafka Client 0.10.1.0 onwards and the message format version
+     * after 0.10.0.
+     *
+     * <p>Note that this take priority over start offset configuration
+     * {@code ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} and any auto committed offsets.
+     *
+     * <p>This results in hard failures in either of the following two cases :
+     * 1. If one of more partitions do not contain any messages with timestamp larger than or
+     * equal to desired timestamp.
+     * 2. If the message format version in a partition is before 0.10.0, i.e. the messages do
+     * not have timestamps.
+     */
+    public Read<K, V> withStartReadTime(Instant startReadTime) {
+      return toBuilder().setStartReadTime(startReadTime).build();
+    }
+
+    /**
      * Similar to
      * {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}.
      * Mainly used for tests and demo
@@ -508,6 +530,13 @@ public class KafkaIO {
           "Kafka topics or topic_partitions are required");
       checkNotNull(getKeyDeserializer(), "Key deserializer must be set");
       checkNotNull(getValueDeserializer(), "Value deserializer must be set");
+      if (getStartReadTime() != null) {
+        checkArgument(new ConsumerSpEL().hasOffsetsForTimes(),
+            "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, "
+                + "current version of Kafka Client is " + AppInfoParser.getVersion()
+                + ". If you are building with maven, set \"kafka.clients.version\" "
+                + "maven property to 0.10.1.0 or newer.");
+      }
     }
 
     @Override
@@ -1041,10 +1070,17 @@ public class KafkaIO {
           consumer.seek(p.topicPartition, p.nextOffset);
         } else {
           // nextOffset is unininitialized here, meaning start reading from latest record as of now
-          // ('latest' is the default, and is configurable). Remember the current position without
-          // waiting until the first record read. This ensures checkpoint is accurate even if the
-          // reader is closed before reading any records.
-          p.nextOffset = consumer.position(p.topicPartition);
+          // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
+          // Remember the current position without waiting until the first record is read. This
+          // ensures checkpoint is accurate even if the reader is closed before reading any records.
+          Instant startReadTime = spec.getStartReadTime();
+          if (startReadTime != null) {
+            p.nextOffset =
+                consumerSpEL.offsetForTime(consumer, p.topicPartition, spec.getStartReadTime());
+            consumer.seek(p.topicPartition, p.nextOffset);
+          } else {
+            p.nextOffset = consumer.position(p.topicPartition);
+          }
         }
 
         LOG.info("{}: reading from {} starting at offset {}", name, p.topicPartition, p.nextOffset);

http://git-wip-us.apache.org/repos/asf/beam/blob/d29e353e/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 691f7f4..b69bc83 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -24,11 +24,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -150,7 +153,7 @@ public class KafkaIOTest {
     }
 
     int numPartitions = partitions.size();
-    long[] offsets = new long[numPartitions];
+    final long[] offsets = new long[numPartitions];
 
     for (int i = 0; i < numElements; i++) {
       int pIdx = i % numPartitions;
@@ -184,6 +187,36 @@ public class KafkaIOTest {
               updateEndOffsets(ImmutableMap.of(tp, (long) records.get(tp).size()));
             }
           }
+          // Override offsetsForTimes() in order to look up the offsets by timestamp.
+          // Remove keyword '@Override' here, Kafka client 0.10.1.0 previous versions does not have
+          // this method.
+          // Should return Map<TopicPartition, OffsetAndTimestamp>, but 0.10.1.0 previous versions
+          // does not have the OffsetAndTimestamp class. So return a raw type and use reflection
+          // here.
+          @SuppressWarnings("unchecked")
+          public Map offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
+            HashMap<TopicPartition, Object> result = new HashMap<>();
+            try {
+              Class<?> cls = Class.forName("org.apache.kafka.clients.consumer.OffsetAndTimestamp");
+              // OffsetAndTimestamp(long offset, long timestamp)
+              Constructor constructor = cls.getDeclaredConstructor(long.class, long.class);
+
+              // In test scope, timestamp == offset.
+              for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
+                long maxOffset = offsets[partitions.indexOf(entry.getKey())];
+                Long offset = entry.getValue();
+                if (offset >= maxOffset) {
+                  offset = null;
+                }
+                result.put(
+                    entry.getKey(), constructor.newInstance(entry.getValue(), offset));
+              }
+              return result;
+            } catch (ClassNotFoundException | IllegalAccessException
+                | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
+              throw new RuntimeException(e);
+            }
+          }
         };
 
     for (String topic : topics) {
@@ -239,12 +272,19 @@ public class KafkaIOTest {
     }
   }
 
+  private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
+      int numElements,
+      @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
+    return mkKafkaReadTransform(numElements, numElements, timestampFn);
+  }
+
   /**
    * Creates a consumer with two topics, with 10 partitions each.
    * numElements are (round-robin) assigned all the 20 partitions.
    */
   private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
       int numElements,
+      int maxNumRecords,
       @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
 
     List<String> topics = ImmutableList.of("topic_a", "topic_b");
@@ -256,7 +296,7 @@ public class KafkaIOTest {
             topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
         .withKeyDeserializer(IntegerDeserializer.class)
         .withValueDeserializer(LongDeserializer.class)
-        .withMaxNumRecords(numElements);
+        .withMaxNumRecords(maxNumRecords);
 
     if (timestampFn != null) {
       return reader.withTimestampFn(timestampFn);
@@ -283,22 +323,31 @@ public class KafkaIOTest {
 
   public static void addCountingAsserts(PCollection<Long> input, long numElements) {
     // Count == numElements
-    PAssert
-      .thatSingleton(input.apply("Count", Count.<Long>globally()))
-      .isEqualTo(numElements);
     // Unique count == numElements
-    PAssert
-      .thatSingleton(input.apply(Distinct.<Long>create())
-                          .apply("UniqueCount", Count.<Long>globally()))
-      .isEqualTo(numElements);
     // Min == 0
-    PAssert
-      .thatSingleton(input.apply("Min", Min.<Long>globally()))
-      .isEqualTo(0L);
     // Max == numElements-1
+    addCountingAsserts(input, numElements, numElements, 0L, numElements - 1);
+  }
+
+  public static void addCountingAsserts(
+      PCollection<Long> input, long count, long uniqueCount, long min, long max) {
+
+    PAssert
+        .thatSingleton(input.apply("Count", Count.<Long>globally()))
+        .isEqualTo(count);
+
+    PAssert
+        .thatSingleton(input.apply(Distinct.<Long>create())
+            .apply("UniqueCount", Count.<Long>globally()))
+        .isEqualTo(uniqueCount);
+
     PAssert
-      .thatSingleton(input.apply("Max", Max.<Long>globally()))
-      .isEqualTo(numElements - 1);
+        .thatSingleton(input.apply("Min", Min.<Long>globally()))
+        .isEqualTo(min);
+
+    PAssert
+        .thatSingleton(input.apply("Max", Max.<Long>globally()))
+        .isEqualTo(max);
   }
 
   @Test
@@ -749,6 +798,51 @@ public class KafkaIOTest {
   }
 
   @Test
+  public void testUnboundedSourceStartReadTime() {
+
+    assumeTrue(new ConsumerSpEL().hasOffsetsForTimes());
+
+    int numElements = 1000;
+    // In this MockConsumer, we let the elements of the time and offset equal and there are 20
+    // partitions. So set this startTime can read half elements.
+    int startTime = numElements / 20 / 2;
+    int maxNumRecords = numElements / 2;
+
+    PCollection<Long> input = p
+        .apply(mkKafkaReadTransform(numElements, maxNumRecords, new ValueAsTimestampFn())
+            .withStartReadTime(new Instant(startTime))
+            .withoutMetadata())
+        .apply(Values.<Long>create());
+
+    addCountingAsserts(input, maxNumRecords, maxNumRecords, maxNumRecords, numElements - 1);
+    p.run();
+
+  }
+
+  @Rule public ExpectedException noMessagesException = ExpectedException.none();
+
+  @Test
+  public void testUnboundedSourceStartReadTimeException() {
+
+    assumeTrue(new ConsumerSpEL().hasOffsetsForTimes());
+
+    noMessagesException.expect(RuntimeException.class);
+
+    int numElements = 1000;
+    // In this MockConsumer, we let the elements of the time and offset equal and there are 20
+    // partitions. So set this startTime can not read any element.
+    int startTime = numElements / 20;
+
+    p.apply(mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn())
+            .withStartReadTime(new Instant(startTime))
+            .withoutMetadata())
+        .apply(Values.<Long>create());
+
+    p.run();
+
+  }
+
+  @Test
   public void testSourceDisplayData() {
     KafkaIO.Read<Integer, Long> read = mkKafkaReadTransform(10, null);
 


[2/2] beam git commit: This closes #3044: [BEAM-2248] KafkaIO support to use start read time to set start offset

Posted by jk...@apache.org.
This closes #3044: [BEAM-2248] KafkaIO support to use start read time to set start offset


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/94d677dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/94d677dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/94d677dc

Branch: refs/heads/master
Commit: 94d677dc97330b424ce960310f23e7992f67bfd6
Parents: 49067b1 d29e353
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue May 30 14:58:28 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 30 14:58:28 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/kafka/pom.xml                      |   2 +-
 .../apache/beam/sdk/io/kafka/ConsumerSpEL.java  |  56 +++++++++
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  44 ++++++-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 122 ++++++++++++++++---
 4 files changed, 205 insertions(+), 19 deletions(-)
----------------------------------------------------------------------