You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by kant kodali <ka...@gmail.com> on 2020/03/10 04:01:04 UTC

Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

Hi All,

Do I need to set assignTimestampsAndWatermarks if I set my time
characteristic to IngestionTime?

say I set my time characteristic of stream execution environment to
Ingestion time as follows

streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

do I need to call
datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?

I thought datastream.assignTimestampsAndWatermarks is mandatory only if
time characteristic is event time. No? Did this behavior change in Flink
1.10? because I see libraries not setting
datastream.assignTimestampsAndWatermarks when time characteristic is
Ingestion time but they do for event time. If not, I am wondering how can I
set AscendingTimestampExtractor in a distributed environment? is there
anyway to add monotonically increasing long(AscendingTimestampExtractor)
without any distributed locks?

Thanks!

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

Posted by Aljoscha Krettek <al...@apache.org>.
On 10.03.20 10:13, kant kodali wrote:

> If ingestion time programs cannot handle late data then why would it
> generate watermarks? Isn't the whole point of watermarks is to handle the
> late data?

Watermarks are not only used for handling late data. Watermarks are the 
mechanism that is used to update time throughout the streaming topology, 
starting from the sources. Among other things is is used to detect late 
data.

When setting the characteristic to "ingestion time" you are essentially 
instating a watermark extractor that extracts the current processing 
time at the sources as event time.

> My last question was more about this library
> <https://github.com/vasia/gelly-streaming> I run several algorithms using
> SimpleEdgeStream.aggregrate(<algoirthm>).print() and I am running into the
> following error whenever I invoke the following constructor
> <https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SimpleEdgeStream.java#L69>
> .
> But it works if I change it to this
> <https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SimpleEdgeStream.java#L86>
> so
> I am not exactly sure what is happening there.

I don't know what is going on here, could it be that the library 
internally sets the characteristic to event-time, thereby overriding 
your ingestion-time setting? In that case you would indeed be missing a 
watermark extractor. I'm cc'ing Vasia, as the author of that library.

-Aljoscha

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

Posted by David Anderson <da...@ververica.com>.
Watermarks are a tool for handling out-of-orderness when working with event
time timestamps. They provide a mechanism for managing the tradeoff between
latency and completeness, allowing you to manage how long to wait for any
out-of-orderness to resolve itself. Note the way that Flink uses these
terms, out-of-orderness is not the same as lateness: your watermarking will
accommodate a certain amount of out-of-orderness, and out-of-order events
that arrive within this timeframe are not considered late. Only events that
are excessively out-of-order -- i.e., with timestamps behind the current
watermark -- are late.

I would say that the documentation you quoted is a bit misleading, since
with ingestion time processing there can be no late events.

Most of the Flink runtime only makes a distinction between processing time
and event time. For example, there are processing time timers (triggered by
the system clock) and event time timers (triggered by watermarks), but
there's no such thing as an ingestion time timer. Ingestion time is a
hybrid between the two that assigns timestamps and watermarks based on
processing time, and then the rest of the pipeline behaves as though you
were doing event time processing.

This means that when working with ingestion time you lose most of the
benefits of event time processing, such as deterministic, reproducible
behavior. But using ingestion time does make it possible to use certain
parts of the APIs that are described as "event time only", such as interval
joins.

I don't know enough about streaming-gelly to speculate about what's going
on there.

David



On Tue, Mar 10, 2020 at 10:14 AM kant kodali <ka...@gmail.com> wrote:

> Hi Arvid,
>
> If ingestion time programs cannot handle late data then why would it
> generate watermarks? Isn't the whole point of watermarks is to handle the
> late data?
>
> My last question was more about this library
> <https://github.com/vasia/gelly-streaming> I run several algorithms using
> SimpleEdgeStream.aggregrate(<algoirthm>).print() and I am running into
> the following error whenever I invoke the following constructor
> <https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SimpleEdgeStream.java#L69> .
> But it works if I change it to this
> <https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SimpleEdgeStream.java#L86> so
> I am not exactly sure what is happening there.
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 0c61d6ef0483c3068076a988bc252a74)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 0c61d6ef0483c3068076a988bc252a74)
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>
> at Test.main(Test.java:86)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>
> ... 8 more
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 0c61d6ef0483c3068076a988bc252a74)
>
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>
> ... 19 more
>
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp
> (= no timestamp marker). Is the time characteristic set to
> 'ProcessingTime', or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
>
> at
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
>
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
>
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>
> at java.lang.Thread.run(Thread.java:748)
>
> On Tue, Mar 10, 2020 at 1:40 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Kant,
>>
>> I just saw that asked the same question on SO [1]. Could you, in the
>> future, please cross-reference these posts, so that we don't waste
>> resources on answering?
>>
>> [1]
>> https://stackoverflow.com/questions/60610985/do-i-need-to-set-assigntimestampsandwatermarks-if-i-set-my-time-characteristic-t
>>
>> On Tue, Mar 10, 2020 at 9:33 AM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Kant,
>>>
>>> according to the documentation [1], you don't need to set a watermark
>>> assigner:
>>>
>>>> Compared to *event time*, *ingestion time* programs cannot handle any
>>>> out-of-order events or late data, but the programs don’t have to specify
>>>> how to generate *watermarks*.
>>>>
>>>> Internally, *ingestion time* is treated much like *event time*, but
>>>> with automatic timestamp assignment and automatic watermark generation.
>>>>
>>>
>>> So it's neither possible to assign timestamps nor watermark, but it
>>> seems as if the default behavior is exactly as you want it to be. If that
>>> doesn't work for you, could you please rephrase your last question or
>>> describe your use case? I didn't get it.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_time.html
>>>
>>> On Tue, Mar 10, 2020 at 5:01 AM kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Do I need to set assignTimestampsAndWatermarks if I set my time
>>>> characteristic to IngestionTime?
>>>>
>>>> say I set my time characteristic of stream execution environment to
>>>> Ingestion time as follows
>>>>
>>>>
>>>> streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>>>
>>>> do I need to call
>>>> datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?
>>>>
>>>> I thought datastream.assignTimestampsAndWatermarks is mandatory only
>>>> if time characteristic is event time. No? Did this behavior change in Flink
>>>> 1.10? because I see libraries not setting
>>>> datastream.assignTimestampsAndWatermarks when time characteristic is
>>>> Ingestion time but they do for event time. If not, I am wondering how can I
>>>> set AscendingTimestampExtractor in a distributed environment? is there
>>>> anyway to add monotonically increasing long(AscendingTimestampExtractor)
>>>> without any distributed locks?
>>>>
>>>> Thanks!
>>>>
>>>

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

Posted by kant kodali <ka...@gmail.com>.
Hi Arvid,

If ingestion time programs cannot handle late data then why would it
generate watermarks? Isn't the whole point of watermarks is to handle the
late data?

My last question was more about this library
<https://github.com/vasia/gelly-streaming> I run several algorithms using
SimpleEdgeStream.aggregrate(<algoirthm>).print() and I am running into the
following error whenever I invoke the following constructor
<https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SimpleEdgeStream.java#L69>
.
But it works if I change it to this
<https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SimpleEdgeStream.java#L86>
so
I am not exactly sure what is happening there.

 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 0c61d6ef0483c3068076a988bc252a74)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 0c61d6ef0483c3068076a988bc252a74)

at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at Test.main(Test.java:86)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

... 8 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: 0c61d6ef0483c3068076a988bc252a74)

at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)

at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)

at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)

at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)

at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)

at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.

at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 19 more

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy

at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)

at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)

at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)

at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)

at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)

at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)

at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)

at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp
(= no timestamp marker). Is the time characteristic set to
'ProcessingTime', or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?

at
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)

at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)

at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)

at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)

at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

On Tue, Mar 10, 2020 at 1:40 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Kant,
>
> I just saw that asked the same question on SO [1]. Could you, in the
> future, please cross-reference these posts, so that we don't waste
> resources on answering?
>
> [1]
> https://stackoverflow.com/questions/60610985/do-i-need-to-set-assigntimestampsandwatermarks-if-i-set-my-time-characteristic-t
>
> On Tue, Mar 10, 2020 at 9:33 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Kant,
>>
>> according to the documentation [1], you don't need to set a watermark
>> assigner:
>>
>>> Compared to *event time*, *ingestion time* programs cannot handle any
>>> out-of-order events or late data, but the programs don’t have to specify
>>> how to generate *watermarks*.
>>>
>>> Internally, *ingestion time* is treated much like *event time*, but
>>> with automatic timestamp assignment and automatic watermark generation.
>>>
>>
>> So it's neither possible to assign timestamps nor watermark, but it seems
>> as if the default behavior is exactly as you want it to be. If that doesn't
>> work for you, could you please rephrase your last question or describe your
>> use case? I didn't get it.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_time.html
>>
>> On Tue, Mar 10, 2020 at 5:01 AM kant kodali <ka...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Do I need to set assignTimestampsAndWatermarks if I set my time
>>> characteristic to IngestionTime?
>>>
>>> say I set my time characteristic of stream execution environment to
>>> Ingestion time as follows
>>>
>>>
>>> streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>>
>>> do I need to call
>>> datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?
>>>
>>> I thought datastream.assignTimestampsAndWatermarks is mandatory only if
>>> time characteristic is event time. No? Did this behavior change in Flink
>>> 1.10? because I see libraries not setting
>>> datastream.assignTimestampsAndWatermarks when time characteristic is
>>> Ingestion time but they do for event time. If not, I am wondering how can I
>>> set AscendingTimestampExtractor in a distributed environment? is there
>>> anyway to add monotonically increasing long(AscendingTimestampExtractor)
>>> without any distributed locks?
>>>
>>> Thanks!
>>>
>>

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

Posted by Arvid Heise <ar...@ververica.com>.
Hi Kant,

I just saw that asked the same question on SO [1]. Could you, in the
future, please cross-reference these posts, so that we don't waste
resources on answering?

[1]
https://stackoverflow.com/questions/60610985/do-i-need-to-set-assigntimestampsandwatermarks-if-i-set-my-time-characteristic-t

On Tue, Mar 10, 2020 at 9:33 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Kant,
>
> according to the documentation [1], you don't need to set a watermark
> assigner:
>
>> Compared to *event time*, *ingestion time* programs cannot handle any
>> out-of-order events or late data, but the programs don’t have to specify
>> how to generate *watermarks*.
>>
>> Internally, *ingestion time* is treated much like *event time*, but with
>> automatic timestamp assignment and automatic watermark generation.
>>
>
> So it's neither possible to assign timestamps nor watermark, but it seems
> as if the default behavior is exactly as you want it to be. If that doesn't
> work for you, could you please rephrase your last question or describe your
> use case? I didn't get it.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_time.html
>
> On Tue, Mar 10, 2020 at 5:01 AM kant kodali <ka...@gmail.com> wrote:
>
>> Hi All,
>>
>> Do I need to set assignTimestampsAndWatermarks if I set my time
>> characteristic to IngestionTime?
>>
>> say I set my time characteristic of stream execution environment to
>> Ingestion time as follows
>>
>>
>> streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>
>> do I need to call
>> datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?
>>
>> I thought datastream.assignTimestampsAndWatermarks is mandatory only if
>> time characteristic is event time. No? Did this behavior change in Flink
>> 1.10? because I see libraries not setting
>> datastream.assignTimestampsAndWatermarks when time characteristic is
>> Ingestion time but they do for event time. If not, I am wondering how can I
>> set AscendingTimestampExtractor in a distributed environment? is there
>> anyway to add monotonically increasing long(AscendingTimestampExtractor)
>> without any distributed locks?
>>
>> Thanks!
>>
>

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

Posted by Arvid Heise <ar...@ververica.com>.
Hi Kant,

according to the documentation [1], you don't need to set a watermark
assigner:

> Compared to *event time*, *ingestion time* programs cannot handle any
> out-of-order events or late data, but the programs don’t have to specify
> how to generate *watermarks*.
>
> Internally, *ingestion time* is treated much like *event time*, but with
> automatic timestamp assignment and automatic watermark generation.
>

So it's neither possible to assign timestamps nor watermark, but it seems
as if the default behavior is exactly as you want it to be. If that doesn't
work for you, could you please rephrase your last question or describe your
use case? I didn't get it.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_time.html

On Tue, Mar 10, 2020 at 5:01 AM kant kodali <ka...@gmail.com> wrote:

> Hi All,
>
> Do I need to set assignTimestampsAndWatermarks if I set my time
> characteristic to IngestionTime?
>
> say I set my time characteristic of stream execution environment to
> Ingestion time as follows
>
>
> streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>
> do I need to call
> datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?
>
> I thought datastream.assignTimestampsAndWatermarks is mandatory only if
> time characteristic is event time. No? Did this behavior change in Flink
> 1.10? because I see libraries not setting
> datastream.assignTimestampsAndWatermarks when time characteristic is
> Ingestion time but they do for event time. If not, I am wondering how can I
> set AscendingTimestampExtractor in a distributed environment? is there
> anyway to add monotonically increasing long(AscendingTimestampExtractor)
> without any distributed locks?
>
> Thanks!
>