You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Roger <ro...@gmail.com> on 2020/06/19 22:41:10 UTC

window not advancing

Hello.
I am having issues with an unbounded streaming processing pipeline using
event times for processing. I can confirm that all the pieces of the
pipeline are running including the aggregation. The problem is that it is
only running for one window. This makes me think I've done something wrong
with the watermarking and/or timestamping. The value of the kafka object is
an object with a timestamp set as an epoch string in milliseconds. For
example, "1592600152163". I have tried everything I can think of to debug
including adding to logging statements but can only confirm that things
seem to be set correctly. I can see that the CustomTimeFieldPolicy values
for currentWatermark are getting set. I can print out the aggregated values
that ultimately get created as well. For some reason, though, the window
does not advance and I cannot continuously process the pipeline data.
Things just stop.

Any suggestions or ideas on what I've done wrong would be very much
appreciated.
Thanks for looking!
Roger



*Approach: Use withTimestampPolicyFactory from org.apache.beam.sdk.io.kafka
here
<https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->
along
with withTimestamps.of*

pipeline
.apply("ReadFromKafka", KafkaIO.<Long, NewRelicRecord>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(
KafkaJsonDeserializer.class, SerializableCoder.of(NewRelicRecord.class))
.withBootstrapServers(options.getKafkaBrokers())
.withTopic(options.getKafkaTopic())
.withMaxReadTime(Duration.standardSeconds(Integer.parseInt(options.
getKafkaReadTimeout())))
.withConsumerConfigUpdates(ImmutableMap.of(
"group.id", options.getKafkaGroupId()))
.withTimestampPolicyFactory((tp, previousWaterMark) -> new
CustomFieldTimePolicy(previousWaterMark))
.withoutMetadata()
)
.apply("ExtractPayload", Values.<NewRelicRecord>create())
.apply("Append event time for PCollection records",
WithTimestamps.of((NewRelicRecord record) -> {

long millisecondsFromEpoch = Long.parseLong(record.getTimestamp());
DateTime jodaDateTime = new DateTime(millisecondsFromEpoch);
Instant instant = jodaDateTime.toInstant();

return instant;
}));

Where CustomFieldTimePolicy looks like this:

public class CustomFieldTimePolicy extends TimestampPolicy<Long,
NewRelicRecord> {

/** current watermark holder. */
private Instant currentWatermark;

/** watermark logic.
* @param previousWatermark the watermark used to determine current watermark
*/
public CustomFieldTimePolicy(final Optional<Instant> previousWatermark) {
currentWatermark = previousWatermark.orElse(BoundedWindow.
TIMESTAMP_MIN_VALUE);
}

@Override
public final Instant getTimestampForRecord(
final PartitionContext ctx, final KafkaRecord<Long, NewRelicRecord> record)
{

long millisecondsFromEpoch = Long.parseLong(record.getKV().getValue().
getTimestamp());
DateTime jodaDateTime = new DateTime(millisecondsFromEpoch);

Instant instant = jodaDateTime.toInstant();

if (instant.isAfter(currentWatermark)) {
currentWatermark = instant;
}

return instant;
}

@Override
public final Instant getWatermark(final PartitionContext ctx) {
return currentWatermark;
}
}