You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Cullen <ci...@gmail.com> on 2021/03/23 18:40:21 UTC

Pyflink tutorial output

I’m running this script taken from the Flink website: tutorial.py

python tutorial.py

from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink

def tutorial():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    ds = env.from_collection(
        collection=[(1, 'aaa'), (2, 'bbb')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))
    ds.add_sink(StreamingFileSink
                .for_row_format('/tmp/output', SimpleStringEncoder())
                .build())
    env.execute("tutorial_job")

if __name__ == '__main__':
    tutorial()

It correctly outputs a part file to the /tmp/output directory when I run it
locally. However when I run this on my kubernetes session cluster there is
no output. Any ideas?

./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
--pyModule tutorial \
--pyFiles /opt/flink-1.12.0/examples/tutorial.py \
--detached

-- 
Robert Cullen
240-475-4490

Re: Pyflink tutorial output

Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi Robert,

Have you tried exploring the /tmp/output directory in the task manager pods
on you kubernetes cluster? The StreamingFileSink will create the output
directory on the host of task manager in which the sink tasks are executed.

Best,
Shuiqiang

Robert Cullen <ci...@gmail.com> 于2021年3月24日周三 上午2:48写道:

> I’m running this script taken from the Flink website: tutorial.py
>
> python tutorial.py
>
> from pyflink.common.serialization import SimpleStringEncoder
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import StreamingFileSink
>
> def tutorial():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_parallelism(1)
>     ds = env.from_collection(
>         collection=[(1, 'aaa'), (2, 'bbb')],
>         type_info=Types.ROW([Types.INT(), Types.STRING()]))
>     ds.add_sink(StreamingFileSink
>                 .for_row_format('/tmp/output', SimpleStringEncoder())
>                 .build())
>     env.execute("tutorial_job")
>
> if __name__ == '__main__':
>     tutorial()
>
> It correctly outputs a part file to the /tmp/output directory when I run
> it locally. However when I run this on my kubernetes session cluster there
> is no output. Any ideas?
>
> ./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
> --pyModule tutorial \
> --pyFiles /opt/flink-1.12.0/examples/tutorial.py \
> --detached
>
> --
> Robert Cullen
> 240-475-4490
>

Re: Pyflink tutorial output

Posted by Robert Cullen <ci...@gmail.com>.
Ah, there they are.  Thanks!

On Tue, Mar 23, 2021 at 10:26 PM Dian Fu <di...@gmail.com> wrote:

> How did you check the output when submitting to the kubernetes session
> cluster? I ask this because the output should be written to the local
> directory “/tmp/output” on the TaskManagers where the jobs are running on.
>
> Regards,
> Dian
>
> 2021年3月24日 上午2:40,Robert Cullen <ci...@gmail.com> 写道:
>
> I’m running this script taken from the Flink website: tutorial.py
>
> python tutorial.py
>
> from pyflink.common.serialization import SimpleStringEncoder
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import StreamingFileSink
>
> def tutorial():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_parallelism(1)
>     ds = env.from_collection(
>         collection=[(1, 'aaa'), (2, 'bbb')],
>         type_info=Types.ROW([Types.INT(), Types.STRING()]))
>     ds.add_sink(StreamingFileSink
>                 .for_row_format('/tmp/output', SimpleStringEncoder())
>                 .build())
>     env.execute("tutorial_job")
>
> if __name__ == '__main__':
>     tutorial()
>
> It correctly outputs a part file to the /tmp/output directory when I run
> it locally. However when I run this on my kubernetes session cluster there
> is no output. Any ideas?
>
> ./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
> --pyModule tutorial \
> --pyFiles /opt/flink-1.12.0/examples/tutorial.py \
> --detached
>
> --
> Robert Cullen
> 240-475-4490
>
>
>

-- 
Robert Cullen
240-475-4490

Re: Pyflink tutorial output

Posted by Dian Fu <di...@gmail.com>.
How did you check the output when submitting to the kubernetes session cluster? I ask this because the output should be written to the local directory “/tmp/output” on the TaskManagers where the jobs are running on. 

Regards,
Dian

> 2021年3月24日 上午2:40,Robert Cullen <ci...@gmail.com> 写道:
> 
> I’m running this script taken from the Flink website: tutorial.py
> 
> python tutorial.py
> from pyflink.common.serialization import SimpleStringEncoder
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import StreamingFileSink
> 
> def tutorial():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_parallelism(1)
>     ds = env.from_collection(
>         collection=[(1, 'aaa'), (2, 'bbb')],
>         type_info=Types.ROW([Types.INT(), Types.STRING()]))
>     ds.add_sink(StreamingFileSink
>                 .for_row_format('/tmp/output', SimpleStringEncoder())
>                 .build())
>     env.execute("tutorial_job")
> 
> if __name__ == '__main__':
>     tutorial()
> It correctly outputs a part file to the /tmp/output directory when I run it locally. However when I run this on my kubernetes session cluster there is no output. Any ideas?
> 
> ./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
> --pyModule tutorial \          
> --pyFiles /opt/flink-1.12.0/examples/tutorial.py \          
> --detached
> -- 
> Robert Cullen
> 240-475-4490