You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2018/03/30 16:58:24 UTC

[beam] branch master updated: [BEAM-591] Support custom timestamps & CreateTime support (#4935)

This is an automated email from the ASF dual-hosted git repository.

mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 716a269  [BEAM-591] Support custom timestamps & CreateTime support (#4935)
716a269 is described below

commit 716a269aa76eacc7b4e2e911af8eb878c4364abe
Author: Raghu Angadi <ra...@apache.org>
AuthorDate: Fri Mar 30 09:58:19 2018 -0700

    [BEAM-591] Support custom timestamps & CreateTime support (#4935)
    
    * Change TimestampPolicyFactory to interface so that it is lambda
    friendly.
    
    * Add a policy for custom timestamps.
    
    * minor
    
    * add Apache license
    
    * JavaDoc update
    
    * Fix JavaDoc errors
---
 sdks/java/io/kafka/pom.xml                         |   8 ++
 .../CustomTimestampPolicyWithLimitedDelay.java     | 101 ++++++++++++++++++
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java |  27 ++++-
 .../beam/sdk/io/kafka/TimestampPolicyFactory.java  |  73 ++++++-------
 .../CustomTimestampPolicyWithLimitedDelayTest.java | 117 +++++++++++++++++++++
 .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java  |  88 ++++++++++++++--
 6 files changed, 358 insertions(+), 56 deletions(-)

diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index 42bb0e9..c285d01 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -122,11 +122,19 @@
       <artifactId>hamcrest-core</artifactId>
       <scope>test</scope>
     </dependency>
+
     <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-library</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java
new file mode 100644
index 0000000..f2dbbe8
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Optional;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A policy for custom record timestamps where timestamps within a partition are expected to be
+ * roughly monotonically increasing with a cap on out of order event delays (say 1 minute).
+ * The watermark at any time is '({@code Min(now(), Max(event timestamp so far)) - max delay})'.
+ * However, watermark is never set in future and capped to 'now - max delay'. In addition,
+ * watermark advanced to 'now - max delay' when a partition is idle.
+ */
+public class CustomTimestampPolicyWithLimitedDelay<K, V> extends TimestampPolicy<K, V> {
+
+  private final Duration maxDelay;
+  private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction;
+  private Instant maxEventTimestamp;
+
+  /**
+   * A policy for custom record timestamps where timestamps are expected to be roughly monotonically
+   * increasing with out of order event delays less than {@code maxDelay}. The watermark at any
+   * time is {@code Min(now(), max_event_timestamp) - maxDelay}.
+   * @param timestampFunction A function to extract timestamp from the record
+   * @param maxDelay For any record in the Kafka partition, the timestamp of any subsequent
+   *                 record is expected to be after {@code current record timestamp - maxDelay}.
+   * @param previousWatermark Latest check-pointed watermark, see
+   *                {@link TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)}
+   */
+  public CustomTimestampPolicyWithLimitedDelay(
+    SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction,
+    Duration maxDelay,
+    Optional<Instant> previousWatermark) {
+    this.maxDelay = maxDelay;
+    this.timestampFunction = timestampFunction;
+
+    // 'previousWatermark' is not the same as maxEventTimestamp (e.g. it could have been in future).
+    // Initialize it such that watermark before reading any event same as previousWatermark.
+    maxEventTimestamp = previousWatermark
+      .orElse(BoundedWindow.TIMESTAMP_MIN_VALUE)
+      .plus(maxDelay);
+  }
+
+  @Override
+  public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, V> record) {
+    Instant ts = timestampFunction.apply(record);
+    if (ts.isAfter(maxEventTimestamp)) {
+      maxEventTimestamp = ts;
+    }
+    return ts;
+  }
+
+  @Override
+  public Instant getWatermark(PartitionContext ctx) {
+    // Watermark == maxEventTime - maxDelay, except in two special cases:
+    //   a) maxEventTime in future : probably due to incorrect timestamps. Cap it to 'now'.
+    //   b) partition is idle : Need to advance watermark if there are no records in the partition.
+    //         We assume that future records will have timestamp >= 'now - maxDelay' and advance
+    //         the watermark accordingly.
+    // The above handles majority of common use cases for custom timestamps. Users can implement
+    // their own policy if this does not work.
+
+    Instant now = Instant.now();
+    return getWatermark(ctx, now);
+  }
+
+  @VisibleForTesting
+  Instant getWatermark(PartitionContext ctx, Instant now) {
+    if (maxEventTimestamp.isAfter(now)) {
+      return now.minus(maxDelay);  // (a) above.
+    } else if (
+      ctx.getMessageBacklog() == 0
+      && ctx.getBacklogCheckTime().minus(maxDelay).isAfter(maxEventTimestamp) // Idle
+      && maxEventTimestamp.getMillis() > 0) { // Read at least one record with positive timestamp.
+      return ctx.getBacklogCheckTime().minus(maxDelay);
+    } else {
+      return maxEventTimestamp.minus(maxDelay);
+    }
+  }
+}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index eeb9da9..11ab29c 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -111,8 +111,9 @@ import org.slf4j.LoggerFactory;
  *       // settings for ConsumerConfig. e.g :
  *       .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
  *
- *       // set event times and watermark based on LogAppendTime. To provide a custom
+ *       // set event times and watermark based on 'LogAppendTime'. To provide a custom
  *       // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
+ *       // Use withCreateTime() with topics that have 'CreateTime' timestamps.
  *       .withLogAppendTime()
  *
  *       // restrict reader to committed messages on Kafka (see method documentation).
@@ -487,12 +488,11 @@ public class KafkaIO {
       return withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
     }
 
-
     /**
      * Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.ProcessingTimePolicy}.
      * This is the default timestamp policy. It assigns processing time to each record.
      * Specifically, this is the timestamp when the record becomes 'current' in the reader.
-     * The watermark aways advances to current time. If servicer side time (log append time) is
+     * The watermark aways advances to current time. If server side time (log append time) is
      * enabled in Kafka, {@link #withLogAppendTime()} is recommended over this.
      */
     public Read<K, V> withProcessingTime() {
@@ -500,10 +500,29 @@ public class KafkaIO {
     }
 
     /**
+     * Sets the timestamps policy based on {@link KafkaTimestampType#CREATE_TIME} timestamp of the
+     * records. It is an error if a record's timestamp type is not
+     * {@link KafkaTimestampType#CREATE_TIME}. The timestamps within a partition are expected to
+     * be roughly monotonically increasing with a cap on out of order delays (e.g. 'max delay' of
+     * 1 minute). The watermark at any time is
+     * '({@code Min(now(), Max(event timestamp so far)) - max delay})'. However, watermark is never
+     * set in future and capped to 'now - max delay'. In addition, watermark advanced to
+     * 'now - max delay' when a partition is idle.
+     *
+     * @param maxDelay For any record in the Kafka partition, the timestamp of any subsequent
+     *                 record is expected to be after {@code current record timestamp - maxDelay}.
+     */
+    public Read<K, V> withCreateTime(Duration maxDelay) {
+      return withTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(maxDelay));
+    }
+
+    /**
      * Provide custom {@link TimestampPolicyFactory} to set event times and watermark for each
      * partition. {@link TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)}
      * is invoked for each partition when the reader starts.
-     * @see #withLogAppendTime() and {@link #withProcessingTime()}
+     * @see #withLogAppendTime()
+     * @see #withCreateTime(Duration)
+     * @see #withProcessingTime()
      */
     public Read<K, V> withTimestampPolicyFactory(
       TimestampPolicyFactory<K, V> timestampPolicyFactory) {
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
index d84bfe8..8feccb6 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
@@ -16,6 +16,8 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.io.Serializable;
 import java.util.Optional;
 import org.apache.beam.sdk.io.kafka.TimestampPolicy.PartitionContext;
@@ -31,7 +33,8 @@ import org.joda.time.Instant;
  * the the reader while starting or resuming from a checkpoint. Two commonly used policies are
  * provided. See {@link #withLogAppendTime()} and {@link #withProcessingTime()}.
  */
-public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializable {
+@FunctionalInterface
+public interface TimestampPolicyFactory<KeyT, ValueT> extends Serializable {
 
   /**
    * Creates a TimestampPolicy for a partition. This is invoked by the reader at the start or while
@@ -42,9 +45,8 @@ public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializab
    *           is resuming from a checkpoint. This is a good value to return by implementations
    *           of {@link TimestampPolicy#getWatermark(PartitionContext)} until a better watermark
    *           can be established as more records are read.
-   * @return
    */
-  public abstract TimestampPolicy<KeyT, ValueT> createTimestampPolicy(
+  TimestampPolicy<KeyT, ValueT> createTimestampPolicy(
     TopicPartition tp, Optional<Instant> previousWatermark);
 
   /**
@@ -52,14 +54,8 @@ public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializab
    * Specifically, this is the timestamp when the record becomes 'current' in the reader.
    * The watermark aways advances to current time.
    */
-  public static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() {
-    return new TimestampPolicyFactory<K, V>() {
-      @Override
-      public TimestampPolicy<K, V>
-      createTimestampPolicy(TopicPartition tp, Optional<Instant> previousWatermark) {
-        return new ProcessingTimePolicy<>();
-      }
-    };
+  static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() {
+    return (tp, prev) -> new ProcessingTimePolicy<>();
   }
 
   /**
@@ -68,51 +64,42 @@ public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializab
    * read. If a partition is idle, the watermark advances roughly to 'current time - 2 seconds'.
    * See {@link KafkaIO.Read#withLogAppendTime()} for longer description.
    */
-  public static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() {
-    //return (tp, previousWatermark) -> new LogAppendTimePolicy<>(previousWatermark);
-    return new TimestampPolicyFactory<K, V>() {
-      @Override
-      public TimestampPolicy<K, V>
-      createTimestampPolicy(TopicPartition tp, Optional<Instant> previousWatermark) {
-        return new LogAppendTimePolicy<>(previousWatermark);
-      }
-    };
+  static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() {
+    return (tp, previousWatermark) -> new LogAppendTimePolicy<>(previousWatermark);
   }
 
-  /*
-   * TODO
-   * Provide a another built in implementation where the watermark is based on all the timestamps
-   * seen in last 1 minute of wall clock time (this duration could be configurable). This is
-   * similar to watermark set by PubsubIO.
-   *
-   * public static <K, V> TimestampPolicyFactory<K, V> withCreateTime() {
-   *   return withCustomTypestamp(...);
-   * }
-   *
-   * public static <K, V> TimestampPolicyFactory<K, V> withCustomTimestamp() {
-   * }
+  /**
+   * {@link CustomTimestampPolicyWithLimitedDelay} using {@link KafkaTimestampType#CREATE_TIME}
+   * from the record for timestamp. See {@link KafkaIO.Read#withCreateTime(Duration)} for more
+   * complete documentation.
    */
+  static <K, V> TimestampPolicyFactory<K, V> withCreateTime(Duration maxDelay) {
+    SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction = record -> {
+      checkArgument(
+        record.getTimestampType() == KafkaTimestampType.CREATE_TIME,
+        "Kafka record's timestamp is not 'CREATE_TIME' "
+        + "(topic: %s, partition %s, offset %s, timestamp type '%s')",
+        record.getTopic(), record.getPartition(), record.getOffset(), record.getTimestampType());
+      return new Instant(record.getTimestamp());
+    };
+
+    return (tp, previousWatermark) ->
+      new CustomTimestampPolicyWithLimitedDelay<>(timestampFunction, maxDelay, previousWatermark);
+  }
 
   /**
    * Used by the Read transform to support old timestamp functions API.
    */
   static <K, V> TimestampPolicyFactory<K, V> withTimestampFn(
     final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
-
-    return new TimestampPolicyFactory<K, V>() {
-      @Override
-      public TimestampPolicy<K, V> createTimestampPolicy(TopicPartition tp,
-                                                         Optional<Instant> previousWatermark) {
-        return new TimestampFnPolicy<>(timestampFn, previousWatermark);
-      }
-    };
+    return (tp, previousWatermark) -> new TimestampFnPolicy<>(timestampFn, previousWatermark);
   }
 
   /**
    * A simple policy that uses current time for event time and watermark. This should be used
    * when better timestamps like LogAppendTime are not available for a topic.
    */
-  public static class ProcessingTimePolicy<K, V> extends TimestampPolicy<K, V> {
+  class ProcessingTimePolicy<K, V> extends TimestampPolicy<K, V> {
 
     @Override
     public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<K, V> record) {
@@ -131,7 +118,7 @@ public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializab
    * read. If a partition is idle, the watermark advances roughly to 'current time - 2 seconds'.
    * See {@link KafkaIO.Read#withLogAppendTime()} for longer description.
    */
-  public static class LogAppendTimePolicy<K, V> extends TimestampPolicy<K, V> {
+  class LogAppendTimePolicy<K, V> extends TimestampPolicy<K, V> {
 
     /**
      * When a partition is idle or caught up (i.e. backlog is zero), we advance the watermark
@@ -181,7 +168,7 @@ public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializab
    * Internal policy to support deprecated withTimestampFn API. It returns last record
    * timestamp for watermark!.
    */
-  private static class TimestampFnPolicy<K, V> extends TimestampPolicy<K, V> {
+  class TimestampFnPolicy<K, V> extends TimestampPolicy<K, V> {
 
     final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
     Instant lastRecordTimestamp;
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
new file mode 100644
index 0000000..04e86a6
--- /dev/null
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link CustomTimestampPolicyWithLimitedDelay}. */
+@RunWith(JUnit4.class)
+public class CustomTimestampPolicyWithLimitedDelayTest {
+
+  // Takes offsets of timestamps from now returns the results as offsets from 'now'.
+  private static List<Long> getTimestampsForRecords(
+    TimestampPolicy<String, String> policy, Instant now, List<Long> timestampOffsets) {
+
+    return timestampOffsets
+      .stream()
+      .map(ts -> {
+        Instant result = policy.getTimestampForRecord(
+          null, new KafkaRecord<>("topic", 0, 0, now.getMillis() + ts,
+                                  KafkaTimestampType.CREATE_TIME, "key", "value"));
+        return result.getMillis() - now.getMillis();
+      })
+      .collect(Collectors.toList());
+  }
+
+
+  @Test
+  public void testCustomTimestampPolicyWithLimitedDelay() {
+    // Verifies that max delay is applies appropriately for reporting watermark
+
+    Duration maxDelay = Duration.standardSeconds(60);
+
+    CustomTimestampPolicyWithLimitedDelay<String, String> policy =
+      new CustomTimestampPolicyWithLimitedDelay<>(
+        (record -> new Instant(record.getTimestamp())),
+        maxDelay,
+        Optional.empty());
+
+    Instant now = Instant.now();
+
+    TimestampPolicy.PartitionContext ctx = mock(TimestampPolicy.PartitionContext.class);
+    when(ctx.getMessageBacklog()).thenReturn(100L);
+    when(ctx.getBacklogCheckTime()).thenReturn(now);
+
+    assertThat(policy.getWatermark(ctx), is(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    // (1) Test simple case : watermark == max_timesatmp - max_delay
+
+    List<Long> input = ImmutableList.of(-200_000L,
+                                        -150_000L,
+                                        -120_000L,
+                                        -140_000L,
+                                        -100_000L,  // <<< Max timestamp
+                                        -110_000L);
+    assertThat(getTimestampsForRecords(policy, now, input), is(input));
+
+    // Watermark should be max_timestamp - maxDelay
+    assertThat(policy.getWatermark(ctx), is(now
+                                              .minus(Duration.standardSeconds(100))
+                                              .minus(maxDelay)));
+
+    // (2) Verify future timestamps
+
+    input = ImmutableList.of(-200_000L,
+                             -150_000L,
+                             -120_000L,
+                             -140_000L,
+                              100_000L,  // <<< timestamp is in future
+                             -100_000L,
+                             -110_000L);
+
+    assertThat(getTimestampsForRecords(policy, now, input), is(input));
+
+    // Watermark should be now - max_delay (backlog in context still non zero)
+    assertThat(policy.getWatermark(ctx, now), is(now.minus(maxDelay)));
+
+    // (3) Verify that Watermark advances when there is no backlog
+
+    // advance current time by 5 minutes
+    now = now.plus(Duration.standardSeconds(300));
+    Instant backlogCheckTime = now.minus(Duration.standardSeconds(10));
+
+    when(ctx.getMessageBacklog()).thenReturn(0L);
+    when(ctx.getBacklogCheckTime()).thenReturn(backlogCheckTime);
+
+    assertThat(policy.getWatermark(ctx, now), is(backlogCheckTime.minus(maxDelay)));
+  }
+}
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 235a7d9..5819a67 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -151,12 +151,14 @@ public class KafkaIOTest {
   public ExpectedException thrown = ExpectedException.none();
 
   private static final Instant LOG_APPEND_START_TIME = new Instant(600 * 1000);
+  private static final String TIMESTAMP_START_MILLIS_CONFIG = "test.timestamp.start.millis";
+  private static final String TIMESTAMP_TYPE_CONFIG = "test.timestamp.type";
 
   // Update mock consumer with records distributed among the given topics, each with given number
   // of partitions. Records are assigned in round-robin order among the partitions.
   private static MockConsumer<byte[], byte[]> mkMockConsumer(
       List<String> topics, int partitionsPerTopic, int numElements,
-      OffsetResetStrategy offsetResetStrategy) {
+      OffsetResetStrategy offsetResetStrategy, Map<String, Object> config) {
 
     final List<TopicPartition> partitions = new ArrayList<>();
     final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
@@ -176,6 +178,10 @@ public class KafkaIOTest {
     int numPartitions = partitions.size();
     final long[] offsets = new long[numPartitions];
 
+    long timestampStartMillis = (Long) config.getOrDefault(TIMESTAMP_START_MILLIS_CONFIG,
+                                                           LOG_APPEND_START_TIME.getMillis());
+    TimestampType timestampType = TimestampType.forName((String)
+      config.getOrDefault(TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.toString()));
 
     for (int i = 0; i < numElements; i++) {
       int pIdx = i % numPartitions;
@@ -189,8 +195,8 @@ public class KafkaIOTest {
               tp.topic(),
               tp.partition(),
               offsets[pIdx]++,
-              LOG_APPEND_START_TIME.plus(Duration.standardSeconds(i)).getMillis(),
-              TimestampType.LOG_APPEND_TIME,
+              timestampStartMillis + Duration.standardSeconds(i).getMillis(),
+              timestampType,
               0, key.length, value.length, key, value));
     }
 
@@ -277,7 +283,7 @@ public class KafkaIOTest {
 
     @Override
     public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
-      return mkMockConsumer(topics, partitionsPerTopic, numElements, offsetResetStrategy);
+      return mkMockConsumer(topics, partitionsPerTopic, numElements, offsetResetStrategy, config);
     }
   }
 
@@ -498,8 +504,73 @@ public class KafkaIOTest {
     p.run();
   }
 
+  @Test
+  public void testUnboundedSourceCustomTimestamps() {
+    // The custom timestamps is set to customTimestampStartMillis + value.
+    // Tests basic functionality of custom timestamps.
+
+    final int numElements = 1000;
+    final long customTimestampStartMillis = 80000L;
+
+    PCollection<Long> input =
+      p.apply(mkKafkaReadTransform(numElements, null)
+                .withTimestampPolicyFactory(
+                  (tp, prevWatermark) -> new CustomTimestampPolicyWithLimitedDelay<Integer, Long>(
+                    (record -> new Instant(TimeUnit.SECONDS.toMillis(record.getKV().getValue())
+                                             + customTimestampStartMillis)),
+                   Duration.millis(0),
+                   prevWatermark))
+                .withoutMetadata())
+        .apply(Values.create());
+
+    addCountingAsserts(input, numElements);
+
+    PCollection<Long> diffs =
+      input
+        .apply(MapElements.into(TypeDescriptors.longs())
+                 .via(t -> TimeUnit.SECONDS.toMillis(t) + customTimestampStartMillis))
+        .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+        .apply("DistinctTimestamps", Distinct.create());
+
+    // This assert also confirms that diff only has one unique value.
+    PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+    p.run();
+  }
+
+  @Test
+  public void testUnboundedSourceCreateTimestamps() {
+    // Same as testUnboundedSourceCustomTimestamps with create timestamp.
+
+    final int numElements = 1000;
+    final long createTimestampStartMillis = 50000L;
+
+    PCollection<Long> input =
+      p.apply(mkKafkaReadTransform(numElements, null)
+                .withCreateTime(Duration.millis(0))
+                .updateConsumerProperties(ImmutableMap.of(
+                  TIMESTAMP_TYPE_CONFIG, "CreateTime",
+                  TIMESTAMP_START_MILLIS_CONFIG, createTimestampStartMillis))
+                .withoutMetadata())
+        .apply(Values.create());
+
+    addCountingAsserts(input, numElements);
+
+    PCollection<Long> diffs =
+      input
+        .apply(MapElements.into(TypeDescriptors.longs())
+                 .via(t -> TimeUnit.SECONDS.toMillis(t) + createTimestampStartMillis))
+        .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+        .apply("DistinctTimestamps", Distinct.create());
+
+    // This assert also confirms that diff only has one unique value.
+    PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+    p.run();
+  }
+
   // Returns TIMESTAMP_MAX_VALUE for watermark when all the records are read from a partition.
-  static class TimestampPolicyWithEndOfSource<K, V> extends TimestampPolicyFactory<K, V> {
+  static class TimestampPolicyWithEndOfSource<K, V> implements TimestampPolicyFactory<K, V> {
     private final long maxOffset;
 
     TimestampPolicyWithEndOfSource(long maxOffset) {
@@ -553,10 +624,9 @@ public class KafkaIOTest {
       .withTimestampPolicyFactory(
         new TimestampPolicyWithEndOfSource<>(numElements / numPartitions - 1));
 
-    PCollection <Long> input =
-      p.apply("readFromKafka", reader.withoutMetadata())
-        .apply(Values.create())
-        .apply(Window.into(FixedWindows.of(Duration.standardDays(100))));
+    p.apply("readFromKafka", reader.withoutMetadata())
+      .apply(Values.create())
+      .apply(Window.into(FixedWindows.of(Duration.standardDays(100))));
 
     PipelineResult result = p.run();
 

-- 
To stop receiving notification emails like this one, please contact
mingmxu@apache.org.