You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2023/03/17 15:17:00 UTC

[jira] [Closed] (FLINK-31478) TypeError: a bytes-like object is required, not 'JavaList' is thrown when ds.execute_and_collect() is called on a KeyedStream

     [ https://issues.apache.org/jira/browse/FLINK-31478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dian Fu closed FLINK-31478.
---------------------------
    Fix Version/s: 1.16.2
                   1.18.0
                   1.17.1
       Resolution: Fixed

Fixed in:
- master via 5e059efee864e17939a33f29272a848d00598531
- release-1.17 via ec5a09b3ce56426d1bdc8eeac4bf52cac9be015b
- release-1.16 via cadf4b35fb6f20c8cba310fa54626d0b9bae1361

> TypeError: a bytes-like object is required, not 'JavaList' is thrown when ds.execute_and_collect() is called on a KeyedStream
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-31478
>                 URL: https://issues.apache.org/jira/browse/FLINK-31478
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.16.2, 1.18.0, 1.17.1
>
>
> {code}
> ################################################################################
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #      http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> import argparse
> import logging
> import sys
> from pyflink.common import WatermarkStrategy, Encoder, Types
> from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
> from pyflink.datastream.connectors.file_system import (FileSource, StreamFormat, FileSink,
>                                                        OutputFileConfig, RollingPolicy)
> word_count_data = ["To be, or not to be,--that is the question:--",
>                    "Whether 'tis nobler in the mind to suffer",
>                    "The slings and arrows of outrageous fortune",
>                    "Or to take arms against a sea of troubles,",
>                    "And by opposing end them?--To die,--to sleep,--",
>                    "No more; and by a sleep to say we end",
>                    "The heartache, and the thousand natural shocks",
>                    "That flesh is heir to,--'tis a consummation",
>                    "Devoutly to be wish'd. To die,--to sleep;--",
>                    "To sleep! perchance to dream:--ay, there's the rub;",
>                    "For in that sleep of death what dreams may come,",
>                    "When we have shuffled off this mortal coil,",
>                    "Must give us pause: there's the respect",
>                    "That makes calamity of so long life;",
>                    "For who would bear the whips and scorns of time,",
>                    "The oppressor's wrong, the proud man's contumely,",
>                    "The pangs of despis'd love, the law's delay,",
>                    "The insolence of office, and the spurns",
>                    "That patient merit of the unworthy takes,",
>                    "When he himself might his quietus make",
>                    "With a bare bodkin? who would these fardels bear,",
>                    "To grunt and sweat under a weary life,",
>                    "But that the dread of something after death,--",
>                    "The undiscover'd country, from whose bourn",
>                    "No traveller returns,--puzzles the will,",
>                    "And makes us rather bear those ills we have",
>                    "Than fly to others that we know not of?",
>                    "Thus conscience does make cowards of us all;",
>                    "And thus the native hue of resolution",
>                    "Is sicklied o'er with the pale cast of thought;",
>                    "And enterprises of great pith and moment,",
>                    "With this regard, their currents turn awry,",
>                    "And lose the name of action.--Soft you now!",
>                    "The fair Ophelia!--Nymph, in thy orisons",
>                    "Be all my sins remember'd."]
> def word_count(input_path, output_path):
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_runtime_mode(RuntimeExecutionMode.BATCH)
>     # write all the data to one file
>     env.set_parallelism(1)
>     # define the source
>     if input_path is not None:
>         ds = env.from_source(
>             source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
>                                                        input_path)
>                              .process_static_file_set().build(),
>             watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
>             source_name="file_source"
>         )
>     else:
>         print("Executing word_count example with default input data set.")
>         print("Use --input to specify file input.")
>         ds = env.from_collection(word_count_data)
>     def split(line):
>         yield from line.split()
>     # compute word count
>     ds = ds.flat_map(split) \
>            .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
>            .key_by(lambda i: i[0])
>            # .reduce(lambda i, j: (i[0], i[1] + j[1]))
>     # define the sink
>     if output_path is not None:
>         ds.sink_to(
>             sink=FileSink.for_row_format(
>                 base_path=output_path,
>                 encoder=Encoder.simple_string_encoder())
>             .with_output_file_config(
>                 OutputFileConfig.builder()
>                 .with_part_prefix("prefix")
>                 .with_part_suffix(".ext")
>                 .build())
>             .with_rolling_policy(RollingPolicy.default_rolling_policy())
>             .build()
>         )
>     else:
>         print("Printing result to stdout. Use --output to specify output path.")
>         a = list(ds.execute_and_collect())
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
>     parser = argparse.ArgumentParser()
>     parser.add_argument(
>         '--input',
>         dest='input',
>         required=False,
>         help='Input file to process.')
>     parser.add_argument(
>         '--output',
>         dest='output',
>         required=False,
>         help='Output file to write results to.')
>     argv = sys.argv[1:]
>     known_args, _ = parser.parse_known_args(argv)
>     word_count(known_args.input, known_args.output)
> {code}
> For the above job, the following exception will be thrown:
> {code}
> Traceback (most recent call last):
>   File "/Users/dianfu/code/src/workspace/pyflink-examples/udf/test_udf_perf.py", line 131, in <module>
>     word_count(known_args.input, known_args.output)
>   File "/Users/dianfu/code/src/workspace/pyflink-examples/udf/test_udf_perf.py", line 110, in word_count
>     a = list(ds.execute_and_collect())
>   File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py", line 2920, in __next__
>     return self.next()
>   File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py", line 2931, in next
>     return convert_to_python_obj(self._j_closeable_iterator.next(), self._type_info)
>   File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/utils.py", line 72, in convert_to_python_obj
>     fields.append(pickled_bytes_to_python_converter(data, field_type))
>   File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/utils.py", line 91, in pickled_bytes_to_python_converter
>     data = pickle.loads(data)
> TypeError: a bytes-like object is required, not 'JavaList'
> {code}
> See more details on https://apache-flink.slack.com/archives/C03G7LJTS2G/p1678894062180649



--
This message was sent by Atlassian Jira
(v8.20.10#820010)