You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Gayan Weerakutti <ga...@linuxdeveloper.space> on 2021/08/14 09:49:52 UTC
Fwd: Authenticating with Google Cloud from Apache Beam application
via code
Hi,
I'm trying to deploy an Apache Beam application in a managed Apache
Flink Cluster (Kinesis Data Analytics
<https://aws.amazon.com/kinesis/data-analytics/>). The pipeline uses
thePubsubIO
<https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.html>connector.
But I can't get the application to authenticate with Google Cloud,
because Kinesis Data Analytics does not allow to export environment
variables, so exportingGOOGLE_APPLICATION_CREDENTIALS
<https://cloud.google.com/docs/authentication/getting-started>environment
variable doesn't seem to be an option.
I tried to authenticate from code, as below.
|GoogleCredentials credential =
GoogleCredentials.fromStream(credentialJsonInputStream);
options.as(GcpOptions.class).setGcpCredential(credential); Pipeline
pipeline = Pipeline.create(options)|
| |
But that didn't work. I'm not sure whether theGcpOptions
<https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.html>interface
is only supposed to be used with Dataflow runner.
I'd really appreciate any insights on how to authenticate in this scenario.
Thanks and regards,
*Gayan Weerakutti*
<https://www.linkedin.com/in/reversiblean/>linkedin.com/in/gayanweerakutti
<https://www.linkedin.com/in/gayanweerakutti/>
Re: Fwd: Authenticating with Google Cloud from Apache Beam
application via code
Posted by Luke Cwik <lc...@google.com>.
The credentials object isn't serializable (which is common for credential
objects). The PipelineOptions javadoc[1] says that methods marked with
@JsonIgnore aren't serialized and available on other instances
(getGcpCredentials is annotated with @JsonIgnore).
1:
https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/options/PipelineOptions.html
On Sun, Aug 15, 2021 at 11:02 AM Gayan Weerakutti <
gayan@linuxdeveloper.space> wrote:
> I was finally able to authenticate with GCP by using
> GcpOptions#credentialFactoryClass
> <https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.html#setCredentialFactoryClass-java.lang.Class->
> method instead of calling options.setGcpCredential(credential).
>
> When setGcpCredential is used, GcpOptions#getGcpCredential returns null
> when called from both Direct and the Flink runner.
>
> Thanks.
> On 15/08/2021 02:17, Gayan Weerakutti wrote:
>
> Hi,
>
> I want to update my question with few new findings.
>
> GoogleCredentials instance is being passed to the pipeline as below.
>
> GoogleCredentials credential = GoogleCredentials
> .fromStream(credentialJsonInputStream)
> .createScoped("https://www.googleapis.com/auth/cloud-platform" <https://www.googleapis.com/auth/cloud-platform>, "https://www.googleapis.com/auth/pubsub" <https://www.googleapis.com/auth/pubsub>);
> credential.refreshIfExpired();
>
> options.setGcpCredential(credential);
> options.setProject("gcp-project-id")
>
>
> The *options* reference above extends PubsubOptions
> <https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubOptions.html>
> .
>
>
> The exception returned when running the application is:
>
>
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 403
> Forbidden POST
> https://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish
> { "code" : 403, "errors" : [ { "domain" : "global", "message" : "The
> request is missing a valid API key.", "reason" : "forbidden" } ], "message"
> : "The request is missing a valid API key.", "status" : "PERMISSION_DENIED"
> } at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) at
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) at
> com.amazonaws.kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:67)
> Caused by:
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 403
> Forbidden POST
> https://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish
> { "code" : 403, "errors" : [ { "domain" : "global", "message" : "The
> request is missing a valid API key.", "reason" : "forbidden" } ], "message"
> : "The request is missing a valid API key.", "status" : "PERMISSION_DENIED"
> } at
> com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:149)
> at
> com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:112)
> at
> com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:39)
> at
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:443)
> at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1108) at
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541)
> at
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474)
> at
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
> at
> org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.publish(PubsubJsonClient.java:141)
> at
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink$WriterFn.publishBatch(PubsubUnboundedSink.java:226)
> at
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink$WriterFn.processElement(PubsubUnboundedSink.java:266)
>
>
>
> While debugging I noticed that the PubsubOptions reference passed to the
> org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.PubsubJsonClientFactory#newClient
> <https://github.com/apache/beam/blob/af59192ac95a564ca3d29b6385a6d1448f5a407d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java#L82>
> returns null when calling GcpOptions#getGcpCredential
>
> This project includes following dependencies:
>
> com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.10.2
>
> com.amazonaws:aws-kinesisanalytics-runtime:1.2.0
>
> org.apache.beam:beam-sdks-java-core:2.28.0
>
> org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.28.0
>
> org.apache.beam:beam-runners-flink-1.11:2.28.0
>
> org.apache.beam:beam-sdks-java-io-amazon-web-services:2.28.0
>
> org.apache.beam:beam-sdks-java-io-kinesis:2.28.0
>
> com.amazonaws:aws-java-sdk-logs:1.11.903
>
>
> com.amazonaws:aws-java-sdk-bom:1.11.903
>
> com.google.cloud:libraries-bom:13.2.0
>
>
> I highly appreciate any help!
>
>
> On 14/08/2021 15:19, Gayan Weerakutti wrote:
>
> Hi,
>
>
> I'm trying to deploy an Apache Beam application in a managed Apache Flink
> Cluster (Kinesis Data Analytics
> <https://aws.amazon.com/kinesis/data-analytics/>). The pipeline uses the
> PubsubIO
> <https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.html>
> connector. But I can't get the application to authenticate with Google
> Cloud, because Kinesis Data Analytics does not allow to export environment
> variables, so exporting GOOGLE_APPLICATION_CREDENTIALS
> <https://cloud.google.com/docs/authentication/getting-started> environment
> variable doesn't seem to be an option.
>
> I tried to authenticate from code, as below.
>
> GoogleCredentials credential = GoogleCredentials.fromStream(credentialJsonInputStream);
> options.as(GcpOptions.class).setGcpCredential(credential);
>
> Pipeline pipeline = Pipeline.create(options)
>
>
>
>
> But that didn't work. I'm not sure whether the GcpOptions
> <https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.html>
> interface is only supposed to be used with Dataflow runner.
>
> I'd really appreciate any insights on how to authenticate in this scenario.
>
> Thanks and regards,
> *Gayan Weerakutti*
> <https://www.linkedin.com/in/reversiblean/>linkedin.com/in/gayanweerakutti
> <https://www.linkedin.com/in/gayanweerakutti/>
>
>
Re: Fwd: Authenticating with Google Cloud from Apache Beam
application via code
Posted by Gayan Weerakutti <ga...@linuxdeveloper.space>.
I was finally able to authenticate with GCP by using
GcpOptions#credentialFactoryClass
<https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.html#setCredentialFactoryClass-java.lang.Class->method
instead of calling options.setGcpCredential(credential).
When setGcpCredential is used, GcpOptions#getGcpCredential returns null
when called from both Direct and the Flink runner.
Thanks.
On 15/08/2021 02:17, Gayan Weerakutti wrote:
>
> Hi,
>
> I want to update my question with few new findings.
>
> GoogleCredentials instance is being passed to the pipeline as below.
>
>
> |GoogleCredentials credential = GoogleCredentials
> .fromStream(credentialJsonInputStream)
> .createScoped("https://www.googleapis.com/auth/cloud-platform",
> "https://www.googleapis.com/auth/pubsub");
> credential.refreshIfExpired(); options.setGcpCredential(credential);
> options.setProject("gcp-project-id") |
>
> The /options/ reference above extendsPubsubOptions
> <https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubOptions.html>.
>
>
> The exception returned when running the application is:
>
>
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 403
> Forbidden
> POSThttps://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish
> <https://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish>{
> "code" : 403, "errors" : [ { "domain" : "global", "message" : "The
> request is missing a valid API key.", "reason" : "forbidden" } ],
> "message" : "The request is missing a valid API key.", "status" :
> "PERMISSION_DENIED" } at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339)
> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219)
> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) at
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) at
> com.amazonaws.kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:67)
> Caused by:
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 403
> Forbidden
> POSThttps://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish
> <https://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish>{
> "code" : 403, "errors" : [ { "domain" : "global", "message" : "The
> request is missing a valid API key.", "reason" : "forbidden" } ],
> "message" : "The request is missing a valid API key.", "status" :
> "PERMISSION_DENIED" } at
> com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:149)
> at
> com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:112)
> at
> com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:39)
> at
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:443)
> at
> com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1108)
> at
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541)
> at
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474)
> at
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
> at
> org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.publish(PubsubJsonClient.java:141)
> at
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink$WriterFn.publishBatch(PubsubUnboundedSink.java:226)
> at
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink$WriterFn.processElement(PubsubUnboundedSink.java:266)
>
>
>
> While debugging I noticed that the|PubsubOptions|reference passed to
> theorg.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.PubsubJsonClientFactory#newClient
> <https://github.com/apache/beam/blob/af59192ac95a564ca3d29b6385a6d1448f5a407d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java#L82>returns|null|when
> calling|GcpOptions#getGcpCredential|
>
> This project includes following dependencies:
>
> com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.10.2
>
> com.amazonaws:aws-kinesisanalytics-runtime:1.2.0
>
> org.apache.beam:beam-sdks-java-core:2.28.0
>
> org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.28.0
>
> org.apache.beam:beam-runners-flink-1.11:2.28.0
>
> org.apache.beam:beam-sdks-java-io-amazon-web-services:2.28.0
>
> org.apache.beam:beam-sdks-java-io-kinesis:2.28.0
>
> com.amazonaws:aws-java-sdk-logs:1.11.903
>
>
> com.amazonaws:aws-java-sdk-bom:1.11.903
>
> com.google.cloud:libraries-bom:13.2.0
>
>
> I highly appreciate any help!
>
>
> On 14/08/2021 15:19, Gayan Weerakutti wrote:
>> Hi,
>>
>>
>> I'm trying to deploy an Apache Beam application in a managed Apache
>> Flink Cluster (Kinesis Data Analytics
>> <https://aws.amazon.com/kinesis/data-analytics/>). The pipeline uses
>> thePubsubIO
>> <https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.html>connector.
>> But I can't get the application to authenticate with Google Cloud,
>> because Kinesis Data Analytics does not allow to export environment
>> variables, so exportingGOOGLE_APPLICATION_CREDENTIALS
>> <https://cloud.google.com/docs/authentication/getting-started>environment
>> variable doesn't seem to be an option.
>>
>> I tried to authenticate from code, as below.
>>
>>
>> |GoogleCredentials credential =
>> GoogleCredentials.fromStream(credentialJsonInputStream);
>> options.as(GcpOptions.class).setGcpCredential(credential); Pipeline
>> pipeline = Pipeline.create(options)|
>>
>> | |
>>
>> But that didn't work. I'm not sure whether theGcpOptions
>> <https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.html>interface
>> is only supposed to be used with Dataflow runner.
>>
>>
>> I'd really appreciate any insights on how to authenticate in this
>> scenario.
>>
>>
>> Thanks and regards,
>> *Gayan Weerakutti*
>> <https://www.linkedin.com/in/reversiblean/>linkedin.com/in/gayanweerakutti
>> <https://www.linkedin.com/in/gayanweerakutti/>
Re: Fwd: Authenticating with Google Cloud from Apache Beam
application via code
Posted by Gayan Weerakutti <ga...@linuxdeveloper.space>.
Hi,
I want to update my question with few new findings.
GoogleCredentials instance is being passed to the pipeline as below.
|GoogleCredentials credential = GoogleCredentials
.fromStream(credentialJsonInputStream)
.createScoped("https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/pubsub");
credential.refreshIfExpired(); options.setGcpCredential(credential);
options.setProject("gcp-project-id") |
The /options/ reference above extendsPubsubOptions
<https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubOptions.html>.
The exception returned when running the application is:
Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 403
Forbidden
POSThttps://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish
<https://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish>{
"code" : 403, "errors" : [ { "domain" : "global", "message" : "The
request is missing a valid API key.", "reason" : "forbidden" } ],
"message" : "The request is missing a valid API key.", "status" :
"PERMISSION_DENIED" } at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339)
at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) at
com.amazonaws.kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:67)
Caused by:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 403
Forbidden
POSThttps://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish
<https://pubsub.googleapis.com/v1/projects/my-project/topics/my-topic:publish>{
"code" : 403, "errors" : [ { "domain" : "global", "message" : "The
request is missing a valid API key.", "reason" : "forbidden" } ],
"message" : "The request is missing a valid API key.", "status" :
"PERMISSION_DENIED" } at
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:149)
at
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:112)
at
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:39)
at
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:443)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1108)
at
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541)
at
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474)
at
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
at
org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.publish(PubsubJsonClient.java:141)
at
org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink$WriterFn.publishBatch(PubsubUnboundedSink.java:226)
at
org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink$WriterFn.processElement(PubsubUnboundedSink.java:266)
While debugging I noticed that the|PubsubOptions|reference passed to
theorg.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.PubsubJsonClientFactory#newClient
<https://github.com/apache/beam/blob/af59192ac95a564ca3d29b6385a6d1448f5a407d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java#L82>returns|null|when
calling|GcpOptions#getGcpCredential|
This project includes following dependencies:
com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.10.2
com.amazonaws:aws-kinesisanalytics-runtime:1.2.0
org.apache.beam:beam-sdks-java-core:2.28.0
org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.28.0
org.apache.beam:beam-runners-flink-1.11:2.28.0
org.apache.beam:beam-sdks-java-io-amazon-web-services:2.28.0
org.apache.beam:beam-sdks-java-io-kinesis:2.28.0
com.amazonaws:aws-java-sdk-logs:1.11.903
com.amazonaws:aws-java-sdk-bom:1.11.903
com.google.cloud:libraries-bom:13.2.0
I highly appreciate any help!
On 14/08/2021 15:19, Gayan Weerakutti wrote:
> Hi,
>
>
> I'm trying to deploy an Apache Beam application in a managed Apache
> Flink Cluster (Kinesis Data Analytics
> <https://aws.amazon.com/kinesis/data-analytics/>). The pipeline uses
> thePubsubIO
> <https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.html>connector.
> But I can't get the application to authenticate with Google Cloud,
> because Kinesis Data Analytics does not allow to export environment
> variables, so exportingGOOGLE_APPLICATION_CREDENTIALS
> <https://cloud.google.com/docs/authentication/getting-started>environment
> variable doesn't seem to be an option.
>
> I tried to authenticate from code, as below.
>
>
> |GoogleCredentials credential =
> GoogleCredentials.fromStream(credentialJsonInputStream);
> options.as(GcpOptions.class).setGcpCredential(credential); Pipeline
> pipeline = Pipeline.create(options)|
>
> | |
>
> But that didn't work. I'm not sure whether theGcpOptions
> <https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.html>interface
> is only supposed to be used with Dataflow runner.
>
>
> I'd really appreciate any insights on how to authenticate in this
> scenario.
>
>
> Thanks and regards,
> *Gayan Weerakutti*
> <https://www.linkedin.com/in/reversiblean/>linkedin.com/in/gayanweerakutti
> <https://www.linkedin.com/in/gayanweerakutti/>