You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2018/03/30 16:58:24 UTC
[beam] branch master updated: [BEAM-591] Support custom timestamps
& CreateTime support (#4935)
This is an automated email from the ASF dual-hosted git repository.
mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 716a269 [BEAM-591] Support custom timestamps & CreateTime support (#4935)
716a269 is described below
commit 716a269aa76eacc7b4e2e911af8eb878c4364abe
Author: Raghu Angadi <ra...@apache.org>
AuthorDate: Fri Mar 30 09:58:19 2018 -0700
[BEAM-591] Support custom timestamps & CreateTime support (#4935)
* Change TimestampPolicyFactory to interface so that it is lambda
friendly.
* Add a policy for custom timestamps.
* minor
* add Apache license
* JavaDoc update
* Fix JavaDoc errors
---
sdks/java/io/kafka/pom.xml | 8 ++
.../CustomTimestampPolicyWithLimitedDelay.java | 101 ++++++++++++++++++
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 27 ++++-
.../beam/sdk/io/kafka/TimestampPolicyFactory.java | 73 ++++++-------
.../CustomTimestampPolicyWithLimitedDelayTest.java | 117 +++++++++++++++++++++
.../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 88 ++++++++++++++--
6 files changed, 358 insertions(+), 56 deletions(-)
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index 42bb0e9..c285d01 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -122,11 +122,19 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java
new file mode 100644
index 0000000..f2dbbe8
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Optional;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A policy for custom record timestamps where timestamps within a partition are expected to be
+ * roughly monotonically increasing with a cap on out of order event delays (say 1 minute).
+ * The watermark at any time is '({@code Min(now(), Max(event timestamp so far)) - max delay})'.
+ * However, watermark is never set in future and capped to 'now - max delay'. In addition,
+ * watermark advanced to 'now - max delay' when a partition is idle.
+ */
+public class CustomTimestampPolicyWithLimitedDelay<K, V> extends TimestampPolicy<K, V> {
+
+ private final Duration maxDelay;
+ private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction;
+ private Instant maxEventTimestamp;
+
+ /**
+ * A policy for custom record timestamps where timestamps are expected to be roughly monotonically
+ * increasing with out of order event delays less than {@code maxDelay}. The watermark at any
+ * time is {@code Min(now(), max_event_timestamp) - maxDelay}.
+ * @param timestampFunction A function to extract timestamp from the record
+ * @param maxDelay For any record in the Kafka partition, the timestamp of any subsequent
+ * record is expected to be after {@code current record timestamp - maxDelay}.
+ * @param previousWatermark Latest check-pointed watermark, see
+ * {@link TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)}
+ */
+ public CustomTimestampPolicyWithLimitedDelay(
+ SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction,
+ Duration maxDelay,
+ Optional<Instant> previousWatermark) {
+ this.maxDelay = maxDelay;
+ this.timestampFunction = timestampFunction;
+
+ // 'previousWatermark' is not the same as maxEventTimestamp (e.g. it could have been in future).
+ // Initialize it such that watermark before reading any event same as previousWatermark.
+ maxEventTimestamp = previousWatermark
+ .orElse(BoundedWindow.TIMESTAMP_MIN_VALUE)
+ .plus(maxDelay);
+ }
+
+ @Override
+ public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, V> record) {
+ Instant ts = timestampFunction.apply(record);
+ if (ts.isAfter(maxEventTimestamp)) {
+ maxEventTimestamp = ts;
+ }
+ return ts;
+ }
+
+ @Override
+ public Instant getWatermark(PartitionContext ctx) {
+ // Watermark == maxEventTime - maxDelay, except in two special cases:
+ // a) maxEventTime in future : probably due to incorrect timestamps. Cap it to 'now'.
+ // b) partition is idle : Need to advance watermark if there are no records in the partition.
+ // We assume that future records will have timestamp >= 'now - maxDelay' and advance
+ // the watermark accordingly.
+ // The above handles majority of common use cases for custom timestamps. Users can implement
+ // their own policy if this does not work.
+
+ Instant now = Instant.now();
+ return getWatermark(ctx, now);
+ }
+
+ @VisibleForTesting
+ Instant getWatermark(PartitionContext ctx, Instant now) {
+ if (maxEventTimestamp.isAfter(now)) {
+ return now.minus(maxDelay); // (a) above.
+ } else if (
+ ctx.getMessageBacklog() == 0
+ && ctx.getBacklogCheckTime().minus(maxDelay).isAfter(maxEventTimestamp) // Idle
+ && maxEventTimestamp.getMillis() > 0) { // Read at least one record with positive timestamp.
+ return ctx.getBacklogCheckTime().minus(maxDelay);
+ } else {
+ return maxEventTimestamp.minus(maxDelay);
+ }
+ }
+}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index eeb9da9..11ab29c 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -111,8 +111,9 @@ import org.slf4j.LoggerFactory;
* // settings for ConsumerConfig. e.g :
* .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
*
- * // set event times and watermark based on LogAppendTime. To provide a custom
+ * // set event times and watermark based on 'LogAppendTime'. To provide a custom
* // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
+ * // Use withCreateTime() with topics that have 'CreateTime' timestamps.
* .withLogAppendTime()
*
* // restrict reader to committed messages on Kafka (see method documentation).
@@ -487,12 +488,11 @@ public class KafkaIO {
return withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
}
-
/**
* Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.ProcessingTimePolicy}.
* This is the default timestamp policy. It assigns processing time to each record.
* Specifically, this is the timestamp when the record becomes 'current' in the reader.
- * The watermark aways advances to current time. If servicer side time (log append time) is
+ * The watermark aways advances to current time. If server side time (log append time) is
* enabled in Kafka, {@link #withLogAppendTime()} is recommended over this.
*/
public Read<K, V> withProcessingTime() {
@@ -500,10 +500,29 @@ public class KafkaIO {
}
/**
+ * Sets the timestamps policy based on {@link KafkaTimestampType#CREATE_TIME} timestamp of the
+ * records. It is an error if a record's timestamp type is not
+ * {@link KafkaTimestampType#CREATE_TIME}. The timestamps within a partition are expected to
+ * be roughly monotonically increasing with a cap on out of order delays (e.g. 'max delay' of
+ * 1 minute). The watermark at any time is
+ * '({@code Min(now(), Max(event timestamp so far)) - max delay})'. However, watermark is never
+ * set in future and capped to 'now - max delay'. In addition, watermark advanced to
+ * 'now - max delay' when a partition is idle.
+ *
+ * @param maxDelay For any record in the Kafka partition, the timestamp of any subsequent
+ * record is expected to be after {@code current record timestamp - maxDelay}.
+ */
+ public Read<K, V> withCreateTime(Duration maxDelay) {
+ return withTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(maxDelay));
+ }
+
+ /**
* Provide custom {@link TimestampPolicyFactory} to set event times and watermark for each
* partition. {@link TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)}
* is invoked for each partition when the reader starts.
- * @see #withLogAppendTime() and {@link #withProcessingTime()}
+ * @see #withLogAppendTime()
+ * @see #withCreateTime(Duration)
+ * @see #withProcessingTime()
*/
public Read<K, V> withTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory) {
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
index d84bfe8..8feccb6 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
@@ -16,6 +16,8 @@
*/
package org.apache.beam.sdk.io.kafka;
+import static com.google.common.base.Preconditions.checkArgument;
+
import java.io.Serializable;
import java.util.Optional;
import org.apache.beam.sdk.io.kafka.TimestampPolicy.PartitionContext;
@@ -31,7 +33,8 @@ import org.joda.time.Instant;
* the the reader while starting or resuming from a checkpoint. Two commonly used policies are
* provided. See {@link #withLogAppendTime()} and {@link #withProcessingTime()}.
*/
-public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializable {
+@FunctionalInterface
+public interface TimestampPolicyFactory<KeyT, ValueT> extends Serializable {
/**
* Creates a TimestampPolicy for a partition. This is invoked by the reader at the start or while
@@ -42,9 +45,8 @@ public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializab
* is resuming from a checkpoint. This is a good value to return by implementations
* of {@link TimestampPolicy#getWatermark(PartitionContext)} until a better watermark
* can be established as more records are read.
- * @return
*/
- public abstract TimestampPolicy<KeyT, ValueT> createTimestampPolicy(
+ TimestampPolicy<KeyT, ValueT> createTimestampPolicy(
TopicPartition tp, Optional<Instant> previousWatermark);
/**
@@ -52,14 +54,8 @@ public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializab
* Specifically, this is the timestamp when the record becomes 'current' in the reader.
* The watermark aways advances to current time.
*/
- public static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() {
- return new TimestampPolicyFactory<K, V>() {
- @Override
- public TimestampPolicy<K, V>
- createTimestampPolicy(TopicPartition tp, Optional<Instant> previousWatermark) {
- return new ProcessingTimePolicy<>();
- }
- };
+ static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() {
+ return (tp, prev) -> new ProcessingTimePolicy<>();
}
/**
@@ -68,51 +64,42 @@ public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializab
* read. If a partition is idle, the watermark advances roughly to 'current time - 2 seconds'.
* See {@link KafkaIO.Read#withLogAppendTime()} for longer description.
*/
- public static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() {
- //return (tp, previousWatermark) -> new LogAppendTimePolicy<>(previousWatermark);
- return new TimestampPolicyFactory<K, V>() {
- @Override
- public TimestampPolicy<K, V>
- createTimestampPolicy(TopicPartition tp, Optional<Instant> previousWatermark) {
- return new LogAppendTimePolicy<>(previousWatermark);
- }
- };
+ static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() {
+ return (tp, previousWatermark) -> new LogAppendTimePolicy<>(previousWatermark);
}
- /*
- * TODO
- * Provide a another built in implementation where the watermark is based on all the timestamps
- * seen in last 1 minute of wall clock time (this duration could be configurable). This is
- * similar to watermark set by PubsubIO.
- *
- * public static <K, V> TimestampPolicyFactory<K, V> withCreateTime() {
- * return withCustomTypestamp(...);
- * }
- *
- * public static <K, V> TimestampPolicyFactory<K, V> withCustomTimestamp() {
- * }
+ /**
+ * {@link CustomTimestampPolicyWithLimitedDelay} using {@link KafkaTimestampType#CREATE_TIME}
+ * from the record for timestamp. See {@link KafkaIO.Read#withCreateTime(Duration)} for more
+ * complete documentation.
*/
+ static <K, V> TimestampPolicyFactory<K, V> withCreateTime(Duration maxDelay) {
+ SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction = record -> {
+ checkArgument(
+ record.getTimestampType() == KafkaTimestampType.CREATE_TIME,
+ "Kafka record's timestamp is not 'CREATE_TIME' "
+ + "(topic: %s, partition %s, offset %s, timestamp type '%s')",
+ record.getTopic(), record.getPartition(), record.getOffset(), record.getTimestampType());
+ return new Instant(record.getTimestamp());
+ };
+
+ return (tp, previousWatermark) ->
+ new CustomTimestampPolicyWithLimitedDelay<>(timestampFunction, maxDelay, previousWatermark);
+ }
/**
* Used by the Read transform to support old timestamp functions API.
*/
static <K, V> TimestampPolicyFactory<K, V> withTimestampFn(
final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
-
- return new TimestampPolicyFactory<K, V>() {
- @Override
- public TimestampPolicy<K, V> createTimestampPolicy(TopicPartition tp,
- Optional<Instant> previousWatermark) {
- return new TimestampFnPolicy<>(timestampFn, previousWatermark);
- }
- };
+ return (tp, previousWatermark) -> new TimestampFnPolicy<>(timestampFn, previousWatermark);
}
/**
* A simple policy that uses current time for event time and watermark. This should be used
* when better timestamps like LogAppendTime are not available for a topic.
*/
- public static class ProcessingTimePolicy<K, V> extends TimestampPolicy<K, V> {
+ class ProcessingTimePolicy<K, V> extends TimestampPolicy<K, V> {
@Override
public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<K, V> record) {
@@ -131,7 +118,7 @@ public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializab
* read. If a partition is idle, the watermark advances roughly to 'current time - 2 seconds'.
* See {@link KafkaIO.Read#withLogAppendTime()} for longer description.
*/
- public static class LogAppendTimePolicy<K, V> extends TimestampPolicy<K, V> {
+ class LogAppendTimePolicy<K, V> extends TimestampPolicy<K, V> {
/**
* When a partition is idle or caught up (i.e. backlog is zero), we advance the watermark
@@ -181,7 +168,7 @@ public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializab
* Internal policy to support deprecated withTimestampFn API. It returns last record
* timestamp for watermark!.
*/
- private static class TimestampFnPolicy<K, V> extends TimestampPolicy<K, V> {
+ class TimestampFnPolicy<K, V> extends TimestampPolicy<K, V> {
final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
Instant lastRecordTimestamp;
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
new file mode 100644
index 0000000..04e86a6
--- /dev/null
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link CustomTimestampPolicyWithLimitedDelay}. */
+@RunWith(JUnit4.class)
+public class CustomTimestampPolicyWithLimitedDelayTest {
+
+ // Takes offsets of timestamps from now returns the results as offsets from 'now'.
+ private static List<Long> getTimestampsForRecords(
+ TimestampPolicy<String, String> policy, Instant now, List<Long> timestampOffsets) {
+
+ return timestampOffsets
+ .stream()
+ .map(ts -> {
+ Instant result = policy.getTimestampForRecord(
+ null, new KafkaRecord<>("topic", 0, 0, now.getMillis() + ts,
+ KafkaTimestampType.CREATE_TIME, "key", "value"));
+ return result.getMillis() - now.getMillis();
+ })
+ .collect(Collectors.toList());
+ }
+
+
+ @Test
+ public void testCustomTimestampPolicyWithLimitedDelay() {
+ // Verifies that max delay is applies appropriately for reporting watermark
+
+ Duration maxDelay = Duration.standardSeconds(60);
+
+ CustomTimestampPolicyWithLimitedDelay<String, String> policy =
+ new CustomTimestampPolicyWithLimitedDelay<>(
+ (record -> new Instant(record.getTimestamp())),
+ maxDelay,
+ Optional.empty());
+
+ Instant now = Instant.now();
+
+ TimestampPolicy.PartitionContext ctx = mock(TimestampPolicy.PartitionContext.class);
+ when(ctx.getMessageBacklog()).thenReturn(100L);
+ when(ctx.getBacklogCheckTime()).thenReturn(now);
+
+ assertThat(policy.getWatermark(ctx), is(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+ // (1) Test simple case : watermark == max_timesatmp - max_delay
+
+ List<Long> input = ImmutableList.of(-200_000L,
+ -150_000L,
+ -120_000L,
+ -140_000L,
+ -100_000L, // <<< Max timestamp
+ -110_000L);
+ assertThat(getTimestampsForRecords(policy, now, input), is(input));
+
+ // Watermark should be max_timestamp - maxDelay
+ assertThat(policy.getWatermark(ctx), is(now
+ .minus(Duration.standardSeconds(100))
+ .minus(maxDelay)));
+
+ // (2) Verify future timestamps
+
+ input = ImmutableList.of(-200_000L,
+ -150_000L,
+ -120_000L,
+ -140_000L,
+ 100_000L, // <<< timestamp is in future
+ -100_000L,
+ -110_000L);
+
+ assertThat(getTimestampsForRecords(policy, now, input), is(input));
+
+ // Watermark should be now - max_delay (backlog in context still non zero)
+ assertThat(policy.getWatermark(ctx, now), is(now.minus(maxDelay)));
+
+ // (3) Verify that Watermark advances when there is no backlog
+
+ // advance current time by 5 minutes
+ now = now.plus(Duration.standardSeconds(300));
+ Instant backlogCheckTime = now.minus(Duration.standardSeconds(10));
+
+ when(ctx.getMessageBacklog()).thenReturn(0L);
+ when(ctx.getBacklogCheckTime()).thenReturn(backlogCheckTime);
+
+ assertThat(policy.getWatermark(ctx, now), is(backlogCheckTime.minus(maxDelay)));
+ }
+}
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 235a7d9..5819a67 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -151,12 +151,14 @@ public class KafkaIOTest {
public ExpectedException thrown = ExpectedException.none();
private static final Instant LOG_APPEND_START_TIME = new Instant(600 * 1000);
+ private static final String TIMESTAMP_START_MILLIS_CONFIG = "test.timestamp.start.millis";
+ private static final String TIMESTAMP_TYPE_CONFIG = "test.timestamp.type";
// Update mock consumer with records distributed among the given topics, each with given number
// of partitions. Records are assigned in round-robin order among the partitions.
private static MockConsumer<byte[], byte[]> mkMockConsumer(
List<String> topics, int partitionsPerTopic, int numElements,
- OffsetResetStrategy offsetResetStrategy) {
+ OffsetResetStrategy offsetResetStrategy, Map<String, Object> config) {
final List<TopicPartition> partitions = new ArrayList<>();
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
@@ -176,6 +178,10 @@ public class KafkaIOTest {
int numPartitions = partitions.size();
final long[] offsets = new long[numPartitions];
+ long timestampStartMillis = (Long) config.getOrDefault(TIMESTAMP_START_MILLIS_CONFIG,
+ LOG_APPEND_START_TIME.getMillis());
+ TimestampType timestampType = TimestampType.forName((String)
+ config.getOrDefault(TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.toString()));
for (int i = 0; i < numElements; i++) {
int pIdx = i % numPartitions;
@@ -189,8 +195,8 @@ public class KafkaIOTest {
tp.topic(),
tp.partition(),
offsets[pIdx]++,
- LOG_APPEND_START_TIME.plus(Duration.standardSeconds(i)).getMillis(),
- TimestampType.LOG_APPEND_TIME,
+ timestampStartMillis + Duration.standardSeconds(i).getMillis(),
+ timestampType,
0, key.length, value.length, key, value));
}
@@ -277,7 +283,7 @@ public class KafkaIOTest {
@Override
public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
- return mkMockConsumer(topics, partitionsPerTopic, numElements, offsetResetStrategy);
+ return mkMockConsumer(topics, partitionsPerTopic, numElements, offsetResetStrategy, config);
}
}
@@ -498,8 +504,73 @@ public class KafkaIOTest {
p.run();
}
+ @Test
+ public void testUnboundedSourceCustomTimestamps() {
+ // The custom timestamps is set to customTimestampStartMillis + value.
+ // Tests basic functionality of custom timestamps.
+
+ final int numElements = 1000;
+ final long customTimestampStartMillis = 80000L;
+
+ PCollection<Long> input =
+ p.apply(mkKafkaReadTransform(numElements, null)
+ .withTimestampPolicyFactory(
+ (tp, prevWatermark) -> new CustomTimestampPolicyWithLimitedDelay<Integer, Long>(
+ (record -> new Instant(TimeUnit.SECONDS.toMillis(record.getKV().getValue())
+ + customTimestampStartMillis)),
+ Duration.millis(0),
+ prevWatermark))
+ .withoutMetadata())
+ .apply(Values.create());
+
+ addCountingAsserts(input, numElements);
+
+ PCollection<Long> diffs =
+ input
+ .apply(MapElements.into(TypeDescriptors.longs())
+ .via(t -> TimeUnit.SECONDS.toMillis(t) + customTimestampStartMillis))
+ .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+ .apply("DistinctTimestamps", Distinct.create());
+
+ // This assert also confirms that diff only has one unique value.
+ PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+ p.run();
+ }
+
+ @Test
+ public void testUnboundedSourceCreateTimestamps() {
+ // Same as testUnboundedSourceCustomTimestamps with create timestamp.
+
+ final int numElements = 1000;
+ final long createTimestampStartMillis = 50000L;
+
+ PCollection<Long> input =
+ p.apply(mkKafkaReadTransform(numElements, null)
+ .withCreateTime(Duration.millis(0))
+ .updateConsumerProperties(ImmutableMap.of(
+ TIMESTAMP_TYPE_CONFIG, "CreateTime",
+ TIMESTAMP_START_MILLIS_CONFIG, createTimestampStartMillis))
+ .withoutMetadata())
+ .apply(Values.create());
+
+ addCountingAsserts(input, numElements);
+
+ PCollection<Long> diffs =
+ input
+ .apply(MapElements.into(TypeDescriptors.longs())
+ .via(t -> TimeUnit.SECONDS.toMillis(t) + createTimestampStartMillis))
+ .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+ .apply("DistinctTimestamps", Distinct.create());
+
+ // This assert also confirms that diff only has one unique value.
+ PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+ p.run();
+ }
+
// Returns TIMESTAMP_MAX_VALUE for watermark when all the records are read from a partition.
- static class TimestampPolicyWithEndOfSource<K, V> extends TimestampPolicyFactory<K, V> {
+ static class TimestampPolicyWithEndOfSource<K, V> implements TimestampPolicyFactory<K, V> {
private final long maxOffset;
TimestampPolicyWithEndOfSource(long maxOffset) {
@@ -553,10 +624,9 @@ public class KafkaIOTest {
.withTimestampPolicyFactory(
new TimestampPolicyWithEndOfSource<>(numElements / numPartitions - 1));
- PCollection <Long> input =
- p.apply("readFromKafka", reader.withoutMetadata())
- .apply(Values.create())
- .apply(Window.into(FixedWindows.of(Duration.standardDays(100))));
+ p.apply("readFromKafka", reader.withoutMetadata())
+ .apply(Values.create())
+ .apply(Window.into(FixedWindows.of(Duration.standardDays(100))));
PipelineResult result = p.run();
--
To stop receiving notification emails like this one, please contact
mingmxu@apache.org.