You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by amir bahmanyari <am...@yahoo.com> on 2016/08/04 02:25:45 UTC

changing the allowed skew

Hi Colleagues,I am basically running the code in example WindowedWordCount.The only difference is that I dont TextIO but get records via KakkaIO.Everything else the same. I get the following exception.Appreciate your suggestions to fix  it..Cheers

Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp 2016-08-04T02:23:21.137Z. Output timestamps must be no earlier than the timestamp of the current input (2016-08-04T02:23:22.896Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.        at org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkAbstractParDoWrapper.checkTimestamp(FlinkAbstractParDoWrapper.java:201)


Re: changing the allowed skew

Posted by Kenneth Knowles <kl...@google.com>.
Hi Amir,

I believe you should use KafkaIO#withTimestampFn [1]. For unbounded
PCollections, the source itself needs to know about the timestamps so it
can maintain a good watermark.

The example you are editing uses a bounded input, which has different
implications for the watermark. The text in the comment seems to have
strayed from the example code [2].

Hope this helps,

Kenn

[1]
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136
[2]
https://github.com/apache/incubator-beam/commit/691a5828b45423940e850bbc64ccc5daf7599c76#diff-a1ce476065c2eb973a02d794d29758e0

On Thu, Aug 4, 2016 at 11:38 AM, amir bahmanyari <am...@yahoo.com>
wrote:

> I found in the following method that the time is deliberetly calculated
> within the past 2 hours.
> On the other hand, I get the following exception complaining why its in
> the past!!!
> I appreciate any clarification...
>
> public class WindowedWordCount {
>
> static class *AddTimestampFn* extends DoFn<KV<byte[], String>, String> {
>     private static final Duration RAND_RANGE = Duration.standardHours(2);
>     private final Instant minTimestamp;
>
>     AddTimestampFn() {
>       this.minTimestamp = new Instant(System.currentTimeMillis());
>     }
>
>     @Override
>     public void processElement(ProcessContext c) {
>      * // Generate a timestamp that falls somewhere in the past two
> hours.*
>       long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
>       Instant randomTimestamp = minTimestamp.plus(randMillis);
>       /**
>        * Concept #2: Set the data element with that timestamp.
>        */
>       c.outputWithTimestamp(c.element().toString()  , new
> Instant(randomTimestamp));
>     }
>   }
>
>
> ------------------------------
> *From:* amir bahmanyari <am...@yahoo.com>
> *To:* "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
> *Sent:* Wednesday, August 3, 2016 7:25 PM
> *Subject:* changing the allowed skew
>
> Hi Colleagues,
> I am basically running the code in example WindowedWordCount.
> The only difference is that I dont TextIO but get records via KakkaIO.
> Everything else the same. I get the following exception.
> Appreciate your suggestions to fix  it..
> Cheers
>
>
> Caused by: java.lang.IllegalArgumentException: Cannot output with
> timestamp 2016-08-04T02:23:21.137Z. Output timestamps must be no earlier
> than the timestamp of the current input (2016-08-04T02:23:22.896Z) minus
> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestmapSkew()
> Javadoc for details on changing the allowed skew.
>         at org.apache.beam.runners.flink.translation.wrappers.streaming.
> FlinkAbstractParDoWrapper.checkTimestamp(FlinkAbstractParDoWrapper.
> java:201)
>
>
>
>
>

Re: changing the allowed skew

Posted by amir bahmanyari <am...@yahoo.com>.
I found in the following method that the time is deliberetly calculated within the past 2 hours.On the other hand, I get the following exception complaining why its in the past!!!I appreciate any clarification...
public class WindowedWordCount {
static class AddTimestampFn extends DoFn<KV<byte[], String>, String> {    private static final Duration RAND_RANGE = Duration.standardHours(2);    private final Instant minTimestamp;
    AddTimestampFn() {      this.minTimestamp = new Instant(System.currentTimeMillis());    }
    @Override    public void processElement(ProcessContext c) {      // Generate a timestamp that falls somewhere in the past two hours.      long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());      Instant randomTimestamp = minTimestamp.plus(randMillis);      /**       * Concept #2: Set the data element with that timestamp.       */      c.outputWithTimestamp(c.element().toString()  , new Instant(randomTimestamp));    }  }

      From: amir bahmanyari <am...@yahoo.com>
 To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org> 
 Sent: Wednesday, August 3, 2016 7:25 PM
 Subject: changing the allowed skew
   
Hi Colleagues,I am basically running the code in example WindowedWordCount.The only difference is that I dont TextIO but get records via KakkaIO.Everything else the same. I get the following exception.Appreciate your suggestions to fix  it..Cheers

Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp 2016-08-04T02:23:21.137Z. Output timestamps must be no earlier than the timestamp of the current input (2016-08-04T02:23:22.896Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.        at org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkAbstractParDoWrapper.checkTimestamp(FlinkAbstractParDoWrapper.java:201)