You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Xander Song <ia...@gmail.com> on 2020/02/06 23:54:02 UTC

Running a Beam Pipeline on GCP Dataproc Flink Cluster

I am attempting to run a Beam pipeline on a GCP Dataproc Flink cluster. I
have followed the instructions at this repo
<https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/flink>
to
create a Flink cluster on Dataproc using an initialization action. However,
the resulting cluster uses version 1.5.6 of Flink, and my project requires
a more recent version (version 1.7, 1.8, or 1.9) for compatibility with Beam
<https://beam.apache.org/documentation/runners/flink/>.

Inside of the flink.sh script in the linked repo, there is a line for
installing Flink from a snapshot URL instead of apt
<https://github.com/GoogleCloudDataproc/initialization-actions/blob/81e453d8f8a036e371e144d5103aaa38ecb2c679/flink/flink.sh#L53>.
Is this the correct mechanism for installing a different version of Flink
using the initialization script? If so, how is it meant to be used?

Thank you in advance.

Fwd: Running a Beam Pipeline on GCP Dataproc Flink Cluster

Posted by Paweł Kordek <pa...@outlook.com>.
________________________________
From: Paweł Kordek <pa...@outlook.com>
Sent: Saturday, February 8, 2020, 08:48
To: Xander Song
Subject: Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

It's because 1.9.1 is no longer hosted under this particular link you can change to 1.9.2. BTW this link is just an example (sorry I wasn't clear enough), in fact you should go to the Flink releases page and get a link from there, not necessarily for the same mirror.

Cheers
Paweł

Get Outlook for Android<https://aka.ms/ghei36>
________________________________
From: Xander Song <ia...@gmail.com>
Sent: Saturday, February 8, 2020 6:40:26 AM
To: Paweł Kordek <pa...@outlook.com>
Subject: Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

Thanks for your suggestion. I tried to add the suggested flag, but now cluster creation fails. I executed


REGION=us-west1

CLUSTER_NAME=test-cluster

gcloud dataproc clusters create ${CLUSTER_NAME} \

    --region ${REGION} \

    --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/flink/flink.sh \

    --metadata flink-snapshot-url=http://mirrors.up.pt/pub/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz<https://nam10.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmirrors.up.pt%2Fpub%2Fapache%2Fflink%2Fflink-1.9.1%2Fflink-1.9.1-bin-scala_2.11.tgz&data=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637167408413530608&sdata=CLPaDBIEah9ex7D1E1eCCWN0O6zkSVfAT59cZxCth5Q%3D&reserved=0>




at the command line. I received the following terminal output.



Waiting on operation [projects/innate-life-265704/regions/us-west1/operations/b44911f9-3bca-3d8b-9cb7-897a24e1f3f6].

Waiting for cluster creation operation...⠶

WARNING: For PD-Standard without local SSDs, we strongly recommend provisioning 1TB or larger to ensure consistently high I/O performance. See https://cloud.google.com/compute/docs/disks/performance<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcloud.google.com%2Fcompute%2Fdocs%2Fdisks%2Fperformance&data=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637167408413540617&sdata=%2FOWVmk4ouDjhPgbdHgKXo0N1S08FYPt6zBNdrzVURCc%3D&reserved=0> for information on disk I/O performance.

Waiting for cluster creation operation...⠶

WARNING: Cluster test-cluster failed to create. Beginning automated resource cleanup process.

Waiting for cluster creation operation...done.

ERROR: (gcloud.dataproc.clusters.create) Operation [projects/innate-life-265704/regions/us-west1/operations/b44911f9-3bca-3d8b-9cb7-897a24e1f3f6] failed: Initialization action failed. Failed action 'gs://goog-dataproc-initialization-actions-us-west1/flink/flink.sh', see output in: gs://dataproc-bb4bc21b-9947-4fd7-bb15-f3e1a696483c-us-west1/google-cloud-dataproc-metainfo/696b98b6-afcd-4f7c-b566-4fdab6fe9374/test-cluster-m/dataproc-initialization-script-0_output.




The contents of the output file were:




-b566-4fdab6fe9374/test-cluster-m/dataproc-initialization-script-0_output

+ export PATH=/usr/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin

+ PATH=/usr/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin

+ readonly FLINK_INSTALL_DIR=/usr/lib/flink

+ FLINK_INSTALL_DIR=/usr/lib/flink

+ readonly FLINK_WORKING_DIR=/var/lib/flink

+ FLINK_WORKING_DIR=/var/lib/flink

+ readonly FLINK_YARN_SCRIPT=/usr/bin/flink-yarn-daemon

+ FLINK_YARN_SCRIPT=/usr/bin/flink-yarn-daemon

+ readonly FLINK_WORKING_USER=yarn

+ FLINK_WORKING_USER=yarn

+ readonly HADOOP_CONF_DIR=/etc/hadoop/conf

+ HADOOP_CONF_DIR=/etc/hadoop/conf

+ readonly FLINK_NETWORK_NUM_BUFFERS=2048

+ FLINK_NETWORK_NUM_BUFFERS=2048

+ readonly FLINK_JOBMANAGER_MEMORY_FRACTION=1.0

+ FLINK_JOBMANAGER_MEMORY_FRACTION=1.0

+ readonly FLINK_TASKMANAGER_MEMORY_FRACTION=1.0

+ FLINK_TASKMANAGER_MEMORY_FRACTION=1.0

+ readonly START_FLINK_YARN_SESSION_METADATA_KEY=flink-start-yarn-session

+ START_FLINK_YARN_SESSION_METADATA_KEY=flink-start-yarn-session

+ readonly START_FLINK_YARN_SESSION_DEFAULT=true

+ START_FLINK_YARN_SESSION_DEFAULT=true

+ readonly FLINK_SNAPSHOT_URL_METADATA_KEY=flink-snapshot-url

+ FLINK_SNAPSHOT_URL_METADATA_KEY=flink-snapshot-url

+ main

+ local role

++ /usr/share/google/get_metadata_value attributes/dataproc-role

+ role=Master

+ /usr/share/google/get_metadata_value attributes/flink-snapshot-url

http://mirrors.up.pt/pub/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz+<https://nam10.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmirrors.up.pt%2Fpub%2Fapache%2Fflink%2Fflink-1.9.1%2Fflink-1.9.1-bin-scala_2.11.tgz%2B&data=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637167408413540617&sdata=O3HQSbUeTu%2BJjgzndIXsyad3LOJS3jdV438hwRcwnJw%3D&reserved=0> install_flink_snapshot

+ local work_dir

++ mktemp -d

+ work_dir=/tmp/tmp.6vPgP5mYq4

+ local flink_url

++ /usr/share/google/get_metadata_value attributes/flink-snapshot-url

+ flink_url=http://mirrors.up.pt/pub/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz<https://nam10.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmirrors.up.pt%2Fpub%2Fapache%2Fflink%2Fflink-1.9.1%2Fflink-1.9.1-bin-scala_2.11.tgz&data=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637167408413550622&sdata=jPoXtDR16JKv73r2aufM7llFRLsJVzsC1Usn2FdaHpM%3D&reserved=0>

+ local flink_local=/tmp/tmp.6vPgP5mYq4/flink.tgz

+ local 'flink_toplevel_pattern=/tmp/tmp.6vPgP5mYq4/flink-*'

+ pushd /tmp/tmp.6vPgP5mYq4

/tmp/tmp.6vPgP5mYq4 /

+ curl -o /tmp/tmp.6vPgP5mYq4/flink.tgz http://mirrors.up.pt/pub/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz<https://nam10.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmirrors.up.pt%2Fpub%2Fapache%2Fflink%2Fflink-1.9.1%2Fflink-1.9.1-bin-scala_2.11.tgz&data=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637167408413550622&sdata=jPoXtDR16JKv73r2aufM7llFRLsJVzsC1Usn2FdaHpM%3D&reserved=0>

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current

                                 Dload  Upload   Total   Spent    Left  Speed

100   259  100   259    0     0    440      0 --:--:-- --:--:-- --:--:--   440

+ tar -xzvf /tmp/tmp.6vPgP5mYq4/flink.tgz


gzip: stdin: not in gzip format

tar: Child returned status 1

tar: Error is not recoverable: exiting now

+ rm /tmp/tmp.6vPgP5mYq4/flink.tgz

+ local flink_toplevel

++ compgen -G '/tmp/tmp.6vPgP5mYq4/flink-*'

++ head -n1

+ flink_toplevel=

+ mv '' /usr/lib/flink

mv: cannot stat '': No such file or directory

+ popd

/

+ configure_flink

+ local num_workers

++ /usr/share/google/get_metadata_value attributes/dataproc-worker-count

+ num_workers=2

+ local num_taskmanagers=1

+ local spark_executor_cores

++ grep 'spark\.executor\.cores' /etc/spark/conf/spark-defaults.conf

++ tail -n1

++ cut -d= -f2

+ spark_executor_cores=2

+ local flink_taskmanager_slots=4

+ local flink_parallelism

++ python -c 'print 1 * 4'

+ flink_parallelism=4

+ local worker_total_mem

++ hdfs getconf -confKey yarn.nodemanager.resource.memory-mb

+ worker_total_mem=12288

+ local flink_jobmanager_memory

++ python -c 'print int(12288 * 1.0)'

+ flink_jobmanager_memory=12288

+ local flink_taskmanager_memory

++ python -c 'print int(12288 * 1.0)'

+ flink_taskmanager_memory=12288

+ local master_hostname

++ /usr/share/google/get_metadata_value attributes/dataproc-master

+ master_hostname=test-cluster-m

+ mkdir -p /var/lib/flink

+ cat

/etc/google-dataproc/startup-scripts/dataproc-initialization-script-0: line 150: /usr/lib/flink/conf/flink-conf.yaml: No such file or directory

+ cat

+ chmod +x /usr/bin/flink-yarn-daemon

+ [[ Master == \M\a\s\t\e\r ]]

+ start_flink_master

+ local master_hostname

++ /usr/share/google/get_metadata_value attributes/dataproc-master

+ master_hostname=test-cluster-m

+ local start_yarn_session

++ /usr/share/google/get_metadata_value attributes/flink-start-yarn-session

++ echo true

+ start_yarn_session=true

+ [[ true == \t\r\u\e ]]

+ [[ test-cluster-m == \t\e\s\t\-\c\l\u\s\t\e\r\-\m ]]

+ /usr/bin/flink-yarn-daemon

+ sudo -u yarn -i HADOOP_CONF_DIR=/etc/hadoop/conf /usr/lib/flink/bin/yarn-session.sh -n 1 -s 4 -jm 12288 -tm 12288 -nm flink-dataproc --detached

-bash: /usr/lib/flink/bin/yarn-session.sh: No such file or directory

+ err 'Unable to start Flink master'

++ date +%Y-%m-%dT%H:%M:%S%z

+ echo '[2020-02-08T06:21:30+0000]: Unable to start Flink master'

[2020-02-08T06:21:30+0000]: Unable to start Flink master

+ return 1


Any suggestions?

On Fri, Feb 7, 2020 at 7:19 AM Paweł Kordek <pa...@outlook.com>> wrote:
Hi

I had similar use-case recently, and adding a metadata key solved the issue https://github.com/GoogleCloudDataproc/initialization-actions/pull/334<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FGoogleCloudDataproc%2Finitialization-actions%2Fpull%2F334&data=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637167408413560627&sdata=2ptHNsgkBqguj8OguuVsHs%2B4%2BwhPq%2Bk6rHuvyxlCi4Y%3D&reserved=0>. You keep the original initialization action and add for example (using gcloud) '--metadata flink-snapshot-url=http://mirrors.up.pt/pub/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz<https://nam10.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmirrors.up.pt%2Fpub%2Fapache%2Fflink%2Fflink-1.9.1%2Fflink-1.9.1-bin-scala_2.11.tgz&data=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637167408413560627&sdata=427UvFt67sSkXud2QB%2F%2BwS0Rd1PgO4qcyu31BzbWnXw%3D&reserved=0>'

Cheers
Pawel
________________________________
From: Ismaël Mejía <ie...@gmail.com>>
Sent: Friday, February 7, 2020 2:24 PM
To: Xander Song <ia...@gmail.com>>; user@beam.apache.org<ma...@beam.apache.org> <us...@beam.apache.org>>
Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

+user@beam.apache.org<ma...@beam.apache.org>


On Fri, Feb 7, 2020 at 12:54 AM Xander Song <ia...@gmail.com>> wrote:
I am attempting to run a Beam pipeline on a GCP Dataproc Flink cluster. I have followed the instructions at this repo<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FGoogleCloudDataproc%2Finitialization-actions%2Ftree%2Fmaster%2Fflink&data=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637167408413570636&sdata=3qv68%2FfxbimzABTKKPtQQ4u9HxFkQS8wFRb78HoLRuc%3D&reserved=0> to create a Flink cluster on Dataproc using an initialization action. However, the resulting cluster uses version 1.5.6 of Flink, and my project requires a more recent version (version 1.7, 1.8, or 1.9) for compatibility with Beam<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Frunners%2Fflink%2F&data=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637167408413570636&sdata=vGMhUYgTU0ACfFluVghiaTzIqTOxdp18xo9NKEtqM3s%3D&reserved=0>.

Inside of the flink.sh script in the linked repo, there is a line for installing Flink from a snapshot URL instead of apt<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FGoogleCloudDataproc%2Finitialization-actions%2Fblob%2F81e453d8f8a036e371e144d5103aaa38ecb2c679%2Fflink%2Fflink.sh%23L53&data=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637167408413580646&sdata=JdEWzbMxIugAmhBToaFjpnFxu94g3NmpuD%2FGHdHMGeU%3D&reserved=0>. Is this the correct mechanism for installing a different version of Flink using the initialization script? If so, how is it meant to be used?

Thank you in advance.


Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

Posted by Paweł Kordek <pa...@outlook.com>.
Hi

I had similar use-case recently, and adding a metadata key solved the issue https://github.com/GoogleCloudDataproc/initialization-actions/pull/334. You keep the original initialization action and add for example (using gcloud) '--metadata flink-snapshot-url=http://mirrors.up.pt/pub/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz'

Cheers
Pawel
________________________________
From: Ismaël Mejía <ie...@gmail.com>
Sent: Friday, February 7, 2020 2:24 PM
To: Xander Song <ia...@gmail.com>; user@beam.apache.org <us...@beam.apache.org>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

+user@beam.apache.org<ma...@beam.apache.org>


On Fri, Feb 7, 2020 at 12:54 AM Xander Song <ia...@gmail.com>> wrote:
I am attempting to run a Beam pipeline on a GCP Dataproc Flink cluster. I have followed the instructions at this repo<https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/flink> to create a Flink cluster on Dataproc using an initialization action. However, the resulting cluster uses version 1.5.6 of Flink, and my project requires a more recent version (version 1.7, 1.8, or 1.9) for compatibility with Beam<https://beam.apache.org/documentation/runners/flink/>.

Inside of the flink.sh script in the linked repo, there is a line for installing Flink from a snapshot URL instead of apt<https://github.com/GoogleCloudDataproc/initialization-actions/blob/81e453d8f8a036e371e144d5103aaa38ecb2c679/flink/flink.sh#L53>. Is this the correct mechanism for installing a different version of Flink using the initialization script? If so, how is it meant to be used?

Thank you in advance.

Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

Posted by Ismaël Mejía <ie...@gmail.com>.
+user@beam.apache.org <us...@beam.apache.org>


On Fri, Feb 7, 2020 at 12:54 AM Xander Song <ia...@gmail.com> wrote:

> I am attempting to run a Beam pipeline on a GCP Dataproc Flink cluster. I
> have followed the instructions at this repo
> <https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/flink> to
> create a Flink cluster on Dataproc using an initialization action. However,
> the resulting cluster uses version 1.5.6 of Flink, and my project requires
> a more recent version (version 1.7, 1.8, or 1.9) for compatibility with
> Beam <https://beam.apache.org/documentation/runners/flink/>.
>
> Inside of the flink.sh script in the linked repo, there is a line for
> installing Flink from a snapshot URL instead of apt
> <https://github.com/GoogleCloudDataproc/initialization-actions/blob/81e453d8f8a036e371e144d5103aaa38ecb2c679/flink/flink.sh#L53>.
> Is this the correct mechanism for installing a different version of Flink
> using the initialization script? If so, how is it meant to be used?
>
> Thank you in advance.
>

Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

Posted by Ismaël Mejía <ie...@gmail.com>.
+user@beam.apache.org <us...@beam.apache.org>


On Fri, Feb 7, 2020 at 12:54 AM Xander Song <ia...@gmail.com> wrote:

> I am attempting to run a Beam pipeline on a GCP Dataproc Flink cluster. I
> have followed the instructions at this repo
> <https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/flink> to
> create a Flink cluster on Dataproc using an initialization action. However,
> the resulting cluster uses version 1.5.6 of Flink, and my project requires
> a more recent version (version 1.7, 1.8, or 1.9) for compatibility with
> Beam <https://beam.apache.org/documentation/runners/flink/>.
>
> Inside of the flink.sh script in the linked repo, there is a line for
> installing Flink from a snapshot URL instead of apt
> <https://github.com/GoogleCloudDataproc/initialization-actions/blob/81e453d8f8a036e371e144d5103aaa38ecb2c679/flink/flink.sh#L53>.
> Is this the correct mechanism for installing a different version of Flink
> using the initialization script? If so, how is it meant to be used?
>
> Thank you in advance.
>