You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2023/03/17 15:17:00 UTC
[jira] [Closed] (FLINK-31478) TypeError: a bytes-like object is required, not 'JavaList' is thrown when ds.execute_and_collect() is called on a KeyedStream
[ https://issues.apache.org/jira/browse/FLINK-31478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dian Fu closed FLINK-31478.
---------------------------
Fix Version/s: 1.16.2
1.18.0
1.17.1
Resolution: Fixed
Fixed in:
- master via 5e059efee864e17939a33f29272a848d00598531
- release-1.17 via ec5a09b3ce56426d1bdc8eeac4bf52cac9be015b
- release-1.16 via cadf4b35fb6f20c8cba310fa54626d0b9bae1361
> TypeError: a bytes-like object is required, not 'JavaList' is thrown when ds.execute_and_collect() is called on a KeyedStream
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-31478
> URL: https://issues.apache.org/jira/browse/FLINK-31478
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Reporter: Dian Fu
> Assignee: Dian Fu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
>
> {code}
> ################################################################################
> # Licensed to the Apache Software Foundation (ASF) under one
> # or more contributor license agreements. See the NOTICE file
> # distributed with this work for additional information
> # regarding copyright ownership. The ASF licenses this file
> # to you under the Apache License, Version 2.0 (the
> # "License"); you may not use this file except in compliance
> # with the License. You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> import argparse
> import logging
> import sys
> from pyflink.common import WatermarkStrategy, Encoder, Types
> from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
> from pyflink.datastream.connectors.file_system import (FileSource, StreamFormat, FileSink,
> OutputFileConfig, RollingPolicy)
> word_count_data = ["To be, or not to be,--that is the question:--",
> "Whether 'tis nobler in the mind to suffer",
> "The slings and arrows of outrageous fortune",
> "Or to take arms against a sea of troubles,",
> "And by opposing end them?--To die,--to sleep,--",
> "No more; and by a sleep to say we end",
> "The heartache, and the thousand natural shocks",
> "That flesh is heir to,--'tis a consummation",
> "Devoutly to be wish'd. To die,--to sleep;--",
> "To sleep! perchance to dream:--ay, there's the rub;",
> "For in that sleep of death what dreams may come,",
> "When we have shuffled off this mortal coil,",
> "Must give us pause: there's the respect",
> "That makes calamity of so long life;",
> "For who would bear the whips and scorns of time,",
> "The oppressor's wrong, the proud man's contumely,",
> "The pangs of despis'd love, the law's delay,",
> "The insolence of office, and the spurns",
> "That patient merit of the unworthy takes,",
> "When he himself might his quietus make",
> "With a bare bodkin? who would these fardels bear,",
> "To grunt and sweat under a weary life,",
> "But that the dread of something after death,--",
> "The undiscover'd country, from whose bourn",
> "No traveller returns,--puzzles the will,",
> "And makes us rather bear those ills we have",
> "Than fly to others that we know not of?",
> "Thus conscience does make cowards of us all;",
> "And thus the native hue of resolution",
> "Is sicklied o'er with the pale cast of thought;",
> "And enterprises of great pith and moment,",
> "With this regard, their currents turn awry,",
> "And lose the name of action.--Soft you now!",
> "The fair Ophelia!--Nymph, in thy orisons",
> "Be all my sins remember'd."]
> def word_count(input_path, output_path):
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_runtime_mode(RuntimeExecutionMode.BATCH)
> # write all the data to one file
> env.set_parallelism(1)
> # define the source
> if input_path is not None:
> ds = env.from_source(
> source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
> input_path)
> .process_static_file_set().build(),
> watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
> source_name="file_source"
> )
> else:
> print("Executing word_count example with default input data set.")
> print("Use --input to specify file input.")
> ds = env.from_collection(word_count_data)
> def split(line):
> yield from line.split()
> # compute word count
> ds = ds.flat_map(split) \
> .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
> .key_by(lambda i: i[0])
> # .reduce(lambda i, j: (i[0], i[1] + j[1]))
> # define the sink
> if output_path is not None:
> ds.sink_to(
> sink=FileSink.for_row_format(
> base_path=output_path,
> encoder=Encoder.simple_string_encoder())
> .with_output_file_config(
> OutputFileConfig.builder()
> .with_part_prefix("prefix")
> .with_part_suffix(".ext")
> .build())
> .with_rolling_policy(RollingPolicy.default_rolling_policy())
> .build()
> )
> else:
> print("Printing result to stdout. Use --output to specify output path.")
> a = list(ds.execute_and_collect())
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
> parser = argparse.ArgumentParser()
> parser.add_argument(
> '--input',
> dest='input',
> required=False,
> help='Input file to process.')
> parser.add_argument(
> '--output',
> dest='output',
> required=False,
> help='Output file to write results to.')
> argv = sys.argv[1:]
> known_args, _ = parser.parse_known_args(argv)
> word_count(known_args.input, known_args.output)
> {code}
> For the above job, the following exception will be thrown:
> {code}
> Traceback (most recent call last):
> File "/Users/dianfu/code/src/workspace/pyflink-examples/udf/test_udf_perf.py", line 131, in <module>
> word_count(known_args.input, known_args.output)
> File "/Users/dianfu/code/src/workspace/pyflink-examples/udf/test_udf_perf.py", line 110, in word_count
> a = list(ds.execute_and_collect())
> File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py", line 2920, in __next__
> return self.next()
> File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py", line 2931, in next
> return convert_to_python_obj(self._j_closeable_iterator.next(), self._type_info)
> File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/utils.py", line 72, in convert_to_python_obj
> fields.append(pickled_bytes_to_python_converter(data, field_type))
> File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/utils.py", line 91, in pickled_bytes_to_python_converter
> data = pickle.loads(data)
> TypeError: a bytes-like object is required, not 'JavaList'
> {code}
> See more details on https://apache-flink.slack.com/archives/C03G7LJTS2G/p1678894062180649
--
This message was sent by Atlassian Jira
(v8.20.10#820010)