You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Alex Hall (Jira)" <ji...@apache.org> on 2020/10/20 15:53:00 UTC

[jira] [Updated] (FLINK-19740) Error in to_pandas for table containing event time: class java.time.LocalDateTime cannot be cast to class java.sql.Timestamp

     [ https://issues.apache.org/jira/browse/FLINK-19740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alex Hall updated FLINK-19740:
------------------------------
    Description: 
In a nutshell, if I create a table with an event time column:

{{CREATE TABLE simple_table (}}
 {{   ts TIMESTAMP(3),}}
 {{   WATERMARK FOR ts AS ts - INTERVAL '5' SECOND}}
 {{)}}

then it fails to serialize with .to_pandas(). This only happens with the watermark line and in streaming mode.

Full code:

{{from pyflink.table import EnvironmentSettings, StreamTableEnvironment}}

{{env_settings = (}}
{{ EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()}}
{{)}}
{{table_env = StreamTableEnvironment.create(environment_settings=env_settings)}}
{{table_env.execute_sql(}}
{{ """}}
{{ CREATE TABLE simple_table (}}
{{ ts TIMESTAMP(3),}}
{{ WATERMARK FOR ts AS ts - INTERVAL '5' SECOND}}
{{ ) WITH (}}
{{ 'connector.type' = 'filesystem',}}
{{ 'format.type' = 'csv',}}
{{ 'connector.path' = '/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_2.csv'}}
{{ )}}
{{"""}}
{{)}}

{{print(table_env.from_path("simple_table").to_pandas())}}

Output:
  
 WARNING: An illegal reflective access operation has occurred
 WARNING: Illegal reflective access by org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil ([file:/home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar|file:///home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar]) to constructor java.nio.DirectByteBuffer(long,int)
 WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil
 WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
 WARNING: All illegal access operations will be denied in a future release
 Traceback (most recent call last):
 File "/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_903.py", line 20, in <module>
 print(table_env.from_path("simple_table").to_pandas())
 File "/home/alex/work/flink/flink-python/pyflink/table/table.py", line 839, in to_pandas
 table = pa.Table.from_batches(serializer.load_from_iterator(batches))
 File "pyarrow/table.pxi", line 1576, in pyarrow.lib.Table.from_batches
 File "/home/alex/work/flink/flink-python/pyflink/table/serializers.py", line 76, in load_from_iterator
 reader = pa.ipc.open_stream(
 File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/pyarrow/ipc.py", line 146, in open_stream
 return RecordBatchStreamReader(source)
 File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/pyarrow/ipc.py", line 62, in __init__
 self._open(source)
 File "pyarrow/ipc.pxi", line 360, in pyarrow.lib._RecordBatchStreamReader._open
 File "pyarrow/error.pxi", line 123, in pyarrow.lib.pyarrow_internal_check_status
 File "/home/alex/work/flink/flink-python/pyflink/table/serializers.py", line 69, in readinto
 input = self.leftover or (self.itor.next() if self.itor.hasNext() else None)
 File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
 return_value = get_return_value(
 File "/home/alex/work/flink/flink-python/pyflink/util/exceptions.py", line 147, in deco
 return f(*a, **kw)
 File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
 raise Py4JJavaError(
 py4j.protocol.Py4JJavaError: An error occurred while calling o39.next.
 : java.lang.RuntimeException: Failed to serialize the data of the table
 at org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:683)
 at org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:663)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)
 at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.base/java.lang.Thread.run(Thread.java:834)
 Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.sql.Timestamp (java.time.LocalDateTime is in module java.base of loader 'bootstrap'; java.sql.Timestamp is in module java.sql of loader 'platform')
 at org.apache.flink.table.data.util.DataFormatConverters$TimestampConverter.toInternalImpl(DataFormatConverters.java:897)
 at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toInternal(DataFormatConverters.java:381)
 at org.apache.flink.table.data.util.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1426)
 at org.apache.flink.table.data.util.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1414)
 at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toInternal(DataFormatConverters.java:381)
 at org.apache.flink.table.runtime.arrow.ArrowUtils$1.next(ArrowUtils.java:655)
 at org.apache.flink.table.runtime.arrow.ArrowUtils$1.next(ArrowUtils.java:641)
 at org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:675)
 ... 12 more

  was:
In a nutshell, if I create a table with an event time column:

{{CREATE TABLE simple_table (}}
 {{   ts TIMESTAMP(3),}}
 {{   WATERMARK FOR ts AS ts - INTERVAL '5' SECOND}}
 {{)}}

then it fails to serialize with .to_pandas(). This only happens with the watermark line and in streaming mode.

Full code:

{{from pyflink.table import EnvironmentSettings, StreamTableEnvironment}}{{env_settings = (}}
{{ EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()}}
{{ )}}
{{ table_env = StreamTableEnvironment.create(environment_settings=env_settings)}}
{{ table_env.execute_sql(}}
{{ """}}
{{ CREATE TABLE simple_table (}}
{{ ts TIMESTAMP(3),}}
{{ WATERMARK FOR ts AS ts - INTERVAL '5' SECOND}}
{{ ) WITH (}}
{{ 'connector.type' = 'filesystem',}}
{{ 'format.type' = 'csv',}}
{{ 'connector.path' = '/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_2.csv'}}
{{ )}}
{{ """}}
{{ )}}{{print(table_env.from_path("simple_table").to_pandas())}}

Output:
  
 WARNING: An illegal reflective access operation has occurred
 WARNING: Illegal reflective access by org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil ([file:/home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar|file:///home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar]) to constructor java.nio.DirectByteBuffer(long,int)
 WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil
 WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
 WARNING: All illegal access operations will be denied in a future release
 Traceback (most recent call last):
 File "/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_903.py", line 20, in <module>
 print(table_env.from_path("simple_table").to_pandas())
 File "/home/alex/work/flink/flink-python/pyflink/table/table.py", line 839, in to_pandas
 table = pa.Table.from_batches(serializer.load_from_iterator(batches))
 File "pyarrow/table.pxi", line 1576, in pyarrow.lib.Table.from_batches
 File "/home/alex/work/flink/flink-python/pyflink/table/serializers.py", line 76, in load_from_iterator
 reader = pa.ipc.open_stream(
 File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/pyarrow/ipc.py", line 146, in open_stream
 return RecordBatchStreamReader(source)
 File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/pyarrow/ipc.py", line 62, in __init__
 self._open(source)
 File "pyarrow/ipc.pxi", line 360, in pyarrow.lib._RecordBatchStreamReader._open
 File "pyarrow/error.pxi", line 123, in pyarrow.lib.pyarrow_internal_check_status
 File "/home/alex/work/flink/flink-python/pyflink/table/serializers.py", line 69, in readinto
 input = self.leftover or (self.itor.next() if self.itor.hasNext() else None)
 File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
 return_value = get_return_value(
 File "/home/alex/work/flink/flink-python/pyflink/util/exceptions.py", line 147, in deco
 return f(*a, **kw)
 File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
 raise Py4JJavaError(
 py4j.protocol.Py4JJavaError: An error occurred while calling o39.next.
 : java.lang.RuntimeException: Failed to serialize the data of the table
 at org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:683)
 at org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:663)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)
 at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.base/java.lang.Thread.run(Thread.java:834)
 Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.sql.Timestamp (java.time.LocalDateTime is in module java.base of loader 'bootstrap'; java.sql.Timestamp is in module java.sql of loader 'platform')
 at org.apache.flink.table.data.util.DataFormatConverters$TimestampConverter.toInternalImpl(DataFormatConverters.java:897)
 at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toInternal(DataFormatConverters.java:381)
 at org.apache.flink.table.data.util.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1426)
 at org.apache.flink.table.data.util.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1414)
 at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toInternal(DataFormatConverters.java:381)
 at org.apache.flink.table.runtime.arrow.ArrowUtils$1.next(ArrowUtils.java:655)
 at org.apache.flink.table.runtime.arrow.ArrowUtils$1.next(ArrowUtils.java:641)
 at org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:675)
 ... 12 more


> Error in to_pandas for table containing event time: class java.time.LocalDateTime cannot be cast to class java.sql.Timestamp
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19740
>                 URL: https://issues.apache.org/jira/browse/FLINK-19740
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python, Table SQL / API
>    Affects Versions: 1.12.0, 1.11.2
>         Environment: Ubuntu 18.04
> Python 3.8, jar built from master yesterday.
> Or Python 3.7, installed latest version from pip.
>            Reporter: Alex Hall
>            Priority: Major
>
> In a nutshell, if I create a table with an event time column:
> {{CREATE TABLE simple_table (}}
>  {{   ts TIMESTAMP(3),}}
>  {{   WATERMARK FOR ts AS ts - INTERVAL '5' SECOND}}
>  {{)}}
> then it fails to serialize with .to_pandas(). This only happens with the watermark line and in streaming mode.
> Full code:
> {{from pyflink.table import EnvironmentSettings, StreamTableEnvironment}}
> {{env_settings = (}}
> {{ EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()}}
> {{)}}
> {{table_env = StreamTableEnvironment.create(environment_settings=env_settings)}}
> {{table_env.execute_sql(}}
> {{ """}}
> {{ CREATE TABLE simple_table (}}
> {{ ts TIMESTAMP(3),}}
> {{ WATERMARK FOR ts AS ts - INTERVAL '5' SECOND}}
> {{ ) WITH (}}
> {{ 'connector.type' = 'filesystem',}}
> {{ 'format.type' = 'csv',}}
> {{ 'connector.path' = '/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_2.csv'}}
> {{ )}}
> {{"""}}
> {{)}}
> {{print(table_env.from_path("simple_table").to_pandas())}}
> Output:
>   
>  WARNING: An illegal reflective access operation has occurred
>  WARNING: Illegal reflective access by org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil ([file:/home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar|file:///home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar]) to constructor java.nio.DirectByteBuffer(long,int)
>  WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil
>  WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
>  WARNING: All illegal access operations will be denied in a future release
>  Traceback (most recent call last):
>  File "/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_903.py", line 20, in <module>
>  print(table_env.from_path("simple_table").to_pandas())
>  File "/home/alex/work/flink/flink-python/pyflink/table/table.py", line 839, in to_pandas
>  table = pa.Table.from_batches(serializer.load_from_iterator(batches))
>  File "pyarrow/table.pxi", line 1576, in pyarrow.lib.Table.from_batches
>  File "/home/alex/work/flink/flink-python/pyflink/table/serializers.py", line 76, in load_from_iterator
>  reader = pa.ipc.open_stream(
>  File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/pyarrow/ipc.py", line 146, in open_stream
>  return RecordBatchStreamReader(source)
>  File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/pyarrow/ipc.py", line 62, in __init__
>  self._open(source)
>  File "pyarrow/ipc.pxi", line 360, in pyarrow.lib._RecordBatchStreamReader._open
>  File "pyarrow/error.pxi", line 123, in pyarrow.lib.pyarrow_internal_check_status
>  File "/home/alex/work/flink/flink-python/pyflink/table/serializers.py", line 69, in readinto
>  input = self.leftover or (self.itor.next() if self.itor.hasNext() else None)
>  File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
>  return_value = get_return_value(
>  File "/home/alex/work/flink/flink-python/pyflink/util/exceptions.py", line 147, in deco
>  return f(*a, **kw)
>  File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
>  raise Py4JJavaError(
>  py4j.protocol.Py4JJavaError: An error occurred while calling o39.next.
>  : java.lang.RuntimeException: Failed to serialize the data of the table
>  at org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:683)
>  at org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:663)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>  at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>  at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>  at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>  at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>  at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>  at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>  at java.base/java.lang.Thread.run(Thread.java:834)
>  Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.sql.Timestamp (java.time.LocalDateTime is in module java.base of loader 'bootstrap'; java.sql.Timestamp is in module java.sql of loader 'platform')
>  at org.apache.flink.table.data.util.DataFormatConverters$TimestampConverter.toInternalImpl(DataFormatConverters.java:897)
>  at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toInternal(DataFormatConverters.java:381)
>  at org.apache.flink.table.data.util.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1426)
>  at org.apache.flink.table.data.util.DataFormatConverters$RowConverter.toInternalImpl(DataFormatConverters.java:1414)
>  at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toInternal(DataFormatConverters.java:381)
>  at org.apache.flink.table.runtime.arrow.ArrowUtils$1.next(ArrowUtils.java:655)
>  at org.apache.flink.table.runtime.arrow.ArrowUtils$1.next(ArrowUtils.java:641)
>  at org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:675)
>  ... 12 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)