You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Márk Bartos <ma...@evotrex.eu> on 2022/06/09 13:22:19 UTC

Exception: class java.sql.Timestamp cannot be cast to class java.time.LocalDateTime | pyflink 1.15.0

Hi,

I'd like to ask for help regarding the java exception:
Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be
cast to class java.time.LocalDateTime (java.sql.Timestamp is in module
java.sql of loader 'platform'; java.time.LocalDateTime is in module
java.base of loader 'bootstrap')

Full backtrace:

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
org.apache.flink.api.java.ClosureCleaner
(file:/opt/venv/lib/python3.8/site-packages/pyflink/lib/flink-dist-1.15.0.jar)
to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal
reflective access operations
WARNING: All illegal access operations will be denied in a future release
+------+------------------------+-------+-----+--------+--------------------+
| name |                   type |  null | key | extras |          watermark
|
+------+------------------------+-------+-----+--------+--------------------+
|   f0 |               CHAR(36) | FALSE |     |        |
 |
|   f1 |                 BIGINT | FALSE |     |        |
 |
|   f2 |                  FLOAT |  TRUE |     |        |
 |
|   f3 |                  FLOAT |  TRUE |     |        |
 |
|   f4 |                  FLOAT |  TRUE |     |        |
 |
|   f5 |            VARCHAR(64) |  TRUE |     |        |
 |
|   f6 | TIMESTAMP(3) *ROWTIME* | FALSE |     |        | SOURCE_WATERMARK()
|
+------+------------------------+-------+-----+--------+--------------------+
7 rows in set
+------------+----------------------------------------------------------------------+-------+-----+--------+-----------+
|       name |
    type |  null | key | extras | watermark |
+------------+----------------------------------------------------------------------+-------+-----+--------+-----------+
|         f0 |
CHAR(36) | FALSE |     |        |           |
|   start_ts |
  BIGINT | FALSE |     |        |           |
|     end_ts |
  BIGINT | FALSE |     |        |           |
| trajectory | ARRAY<ROW<`timestamp_unix` BIGINT, `x` FLOAT, `y` FLOAT, `z`
FLOAT>> |  TRUE |     |        |           |
+------------+----------------------------------------------------------------------+-------+-----+--------+-----------+
4 rows in set
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in
_bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File
"/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
line 598, in <lambda>
    target=lambda: self._read_inputs(elements_iterator),
  File
"/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
line 581, in _read_inputs
    for elements in elements_iterator:
  File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 426,
in __next__
    return self._next()
  File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 826,
in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string =
"{"created":"@1654778704.584603399","description":"Error received from peer
ipv4:127.0.0.1:43123","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Multiplexer
hanging up","grpc_status":1}"
>
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in
_bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File
"/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
line 598, in <lambda>
    target=lambda: self._read_inputs(elements_iterator),
  File
"/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
line 581, in _read_inputs
    for elements in elements_iterator:
  File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 426,
in __next__
    return self._next()
  File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 826,
in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string =
"{"created":"@1654778704.633462921","description":"Error received from peer
ipv4:127.0.0.1:41365","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Multiplexer
hanging up","grpc_status":1}"
>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/app/trajectory_maker/__main__.py", line 29, in <module>
    loop.run_until_complete(main())
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in
run_until_complete
    return future.result()
  File "/app/trajectory_maker/__main__.py", line 24, in main
    raise ex
  File "/app/trajectory_maker/__main__.py", line 20, in main
    job_res = exec_env.execute(job_name)
  File
"/opt/venv/lib/python3.8/site-packages/pyflink/datastream/stream_execution_environment.py",
line 761, in execute
    return
JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/opt/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line
1321, in __call__
    return_value = get_return_value(
  File "/opt/venv/lib/python3.8/site-packages/pyflink/util/exceptions.py",
line 146, in deco
    return f(*a, **kw)
  File "/opt/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326,
in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:259)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
at jdk.internal.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 5 more
Caused by: org.apache.flink.runtime.taskmanager.AsynchronousException:
Caught exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1535)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1510)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1650)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1639)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by:
TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator}
... 14 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.streaming.api.operators.python.collector.RunnerOutputCollector.collect(RunnerOutputCollector.java:47)
at
org.apache.flink.streaming.api.operators.python.AbstractTwoInputPythonFunctionOperator.emitResult(AbstractTwoInputPythonFunctionOperator.java:121)
at
org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:99)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:274)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:114)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1648)
... 13 more

*Caused by: org.apache.flink.util.FlinkRuntimeException: Error during input
conversion from external DataStream API to internal Table API data
structures. Make sure that the provided data types that configure the
converters are correctly declared in the schema. Affected
record:(00000000-0000-0000-0000-000000000001,1652186037,10.0,15.0,0.0,,2022-05-10
12:33:57.0)*
at
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:95)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 24 more
*Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot
be cast to class java.time.LocalDateTime (java.sql.Timestamp is in module
java.sql of loader 'platform'; java.time.LocalDateTime is in module
java.base of loader 'bootstrap')*
at org$apache$flink$api$java$tuple$Tuple7$2$Converter.toInternal(Unknown
Source)
at
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:96)
at
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:46)
at
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
at
org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
at
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:92)
... 25 more

make: *** [Makefile:69: run-dev] Error 1

-------------------------


* I run a match_recognize query on the 1st table creating the 2nd view
using f6 as the order_by.
* The 1st table is made from a stream whose elements are returned by a
.key_by(..).process(MyProcessor()).
* MyProcessor for f6 is using:
datetime.utcfromtimestamp(self._state.my_event_ts())

* I am using *pyflink 1.15.0*.

I am not sure if i do something wrong or if this is a bug in (py)flink.

-- 

Sincerely,
Mark

Re: Exception: class java.sql.Timestamp cannot be cast to class java.time.LocalDateTime | pyflink 1.15.0

Posted by Dian Fu <di...@gmail.com>.
Hi Mark,

Could you share an example which could reproduce this issue?

Regards,
Dian

On Thu, Jun 9, 2022 at 9:22 PM Márk Bartos <ma...@evotrex.eu> wrote:

> Hi,
>
> I'd like to ask for help regarding the java exception:
> Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot
> be cast to class java.time.LocalDateTime (java.sql.Timestamp is in module
> java.sql of loader 'platform'; java.time.LocalDateTime is in module
> java.base of loader 'bootstrap')
>
> Full backtrace:
>
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/opt/venv/lib/python3.8/site-packages/pyflink/lib/flink-dist-1.15.0.jar)
> to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
>
> +------+------------------------+-------+-----+--------+--------------------+
> | name |                   type |  null | key | extras |
>  watermark |
>
> +------+------------------------+-------+-----+--------+--------------------+
> |   f0 |               CHAR(36) | FALSE |     |        |
>  |
> |   f1 |                 BIGINT | FALSE |     |        |
>  |
> |   f2 |                  FLOAT |  TRUE |     |        |
>  |
> |   f3 |                  FLOAT |  TRUE |     |        |
>  |
> |   f4 |                  FLOAT |  TRUE |     |        |
>  |
> |   f5 |            VARCHAR(64) |  TRUE |     |        |
>  |
> |   f6 | TIMESTAMP(3) *ROWTIME* | FALSE |     |        |
> SOURCE_WATERMARK() |
>
> +------+------------------------+-------+-----+--------+--------------------+
> 7 rows in set
>
> +------------+----------------------------------------------------------------------+-------+-----+--------+-----------+
> |       name |
>     type |  null | key | extras | watermark |
>
> +------------+----------------------------------------------------------------------+-------+-----+--------+-----------+
> |         f0 |
> CHAR(36) | FALSE |     |        |           |
> |   start_ts |
>   BIGINT | FALSE |     |        |           |
> |     end_ts |
>   BIGINT | FALSE |     |        |           |
> | trajectory | ARRAY<ROW<`timestamp_unix` BIGINT, `x` FLOAT, `y` FLOAT,
> `z` FLOAT>> |  TRUE |     |        |           |
>
> +------------+----------------------------------------------------------------------+-------+-----+--------+-----------+
> 4 rows in set
> Exception in thread read_grpc_client_inputs:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.8/threading.py", line 932, in
> _bootstrap_inner
>     self.run()
>   File "/usr/local/lib/python3.8/threading.py", line 870, in run
>     self._target(*self._args, **self._kwargs)
>   File
> "/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
> line 598, in <lambda>
>     target=lambda: self._read_inputs(elements_iterator),
>   File
> "/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
> line 581, in _read_inputs
>     for elements in elements_iterator:
>   File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 426,
> in __next__
>     return self._next()
>   File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 826,
> in _next
>     raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
> that terminated with:
> status = StatusCode.CANCELLED
> details = "Multiplexer hanging up"
> debug_error_string =
> "{"created":"@1654778704.584603399","description":"Error received from peer
> ipv4:127.0.0.1:43123","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Multiplexer
> hanging up","grpc_status":1}"
> >
> Exception in thread read_grpc_client_inputs:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.8/threading.py", line 932, in
> _bootstrap_inner
>     self.run()
>   File "/usr/local/lib/python3.8/threading.py", line 870, in run
>     self._target(*self._args, **self._kwargs)
>   File
> "/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
> line 598, in <lambda>
>     target=lambda: self._read_inputs(elements_iterator),
>   File
> "/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
> line 581, in _read_inputs
>     for elements in elements_iterator:
>   File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 426,
> in __next__
>     return self._next()
>   File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 826,
> in _next
>     raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
> that terminated with:
> status = StatusCode.CANCELLED
> details = "Multiplexer hanging up"
> debug_error_string =
> "{"created":"@1654778704.633462921","description":"Error received from peer
> ipv4:127.0.0.1:41365","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Multiplexer
> hanging up","grpc_status":1}"
> >
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.8/runpy.py", line 194, in
> _run_module_as_main
>     return _run_code(code, main_globals, None,
>   File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
>     exec(code, run_globals)
>   File "/app/trajectory_maker/__main__.py", line 29, in <module>
>     loop.run_until_complete(main())
>   File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in
> run_until_complete
>     return future.result()
>   File "/app/trajectory_maker/__main__.py", line 24, in main
>     raise ex
>   File "/app/trajectory_maker/__main__.py", line 20, in main
>     job_res = exec_env.execute(job_name)
>   File
> "/opt/venv/lib/python3.8/site-packages/pyflink/datastream/stream_execution_environment.py",
> line 761, in execute
>     return
> JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
>   File "/opt/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line
> 1321, in __call__
>     return_value = get_return_value(
>   File "/opt/venv/lib/python3.8/site-packages/pyflink/util/exceptions.py",
> line 146, in deco
>     return f(*a, **kw)
>   File "/opt/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326,
> in get_return_value
>     raise Py4JJavaError(
> py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
> : org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:259)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
> at akka.dispatch.OnComplete.internal(Future.scala:300)
> at akka.dispatch.OnComplete.internal(Future.scala:297)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
> at jdk.internal.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> ... 5 more
> Caused by: org.apache.flink.runtime.taskmanager.AsynchronousException:
> Caught exception while processing timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1535)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1510)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1650)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1639)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by:
> TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator}
> ... 14 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> org.apache.flink.streaming.api.operators.python.collector.RunnerOutputCollector.collect(RunnerOutputCollector.java:47)
> at
> org.apache.flink.streaming.api.operators.python.AbstractTwoInputPythonFunctionOperator.emitResult(AbstractTwoInputPythonFunctionOperator.java:121)
> at
> org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:99)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:274)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:114)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1648)
> ... 13 more
>
> *Caused by: org.apache.flink.util.FlinkRuntimeException: Error during
> input conversion from external DataStream API to internal Table API data
> structures. Make sure that the provided data types that configure the
> converters are correctly declared in the schema. Affected
> record:(00000000-0000-0000-0000-000000000001,1652186037,10.0,15.0,0.0,,2022-05-10
> 12:33:57.0)*
> at
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:95)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> ... 24 more
> *Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot
> be cast to class java.time.LocalDateTime (java.sql.Timestamp is in module
> java.sql of loader 'platform'; java.time.LocalDateTime is in module
> java.base of loader 'bootstrap')*
> at org$apache$flink$api$java$tuple$Tuple7$2$Converter.toInternal(Unknown
> Source)
> at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:96)
> at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:46)
> at
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> at
> org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
> at
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:92)
> ... 25 more
>
> make: *** [Makefile:69: run-dev] Error 1
>
> -------------------------
>
>
> * I run a match_recognize query on the 1st table creating the 2nd view
> using f6 as the order_by.
> * The 1st table is made from a stream whose elements are returned by a
> .key_by(..).process(MyProcessor()).
> * MyProcessor for f6 is using:
> datetime.utcfromtimestamp(self._state.my_event_ts())
>
> * I am using *pyflink 1.15.0*.
>
> I am not sure if i do something wrong or if this is a bug in (py)flink.
>
> --
>
> Sincerely,
> Mark
>