You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by tc...@tutanota.com on 2020/09/30 20:56:20 UTC

SqsIO exception when moving to AWS2 SDK

I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache Spark 2.4.7.  However,when switching over to the new API and running it I keep getting the following exceptions:2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the context, marking it as stoppedorg.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException	at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
	at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
	at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
	at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
	
Examining the source of SqsUnboundedSource reveals a lambda where it's trying to chain a few references:
    read.sqsClientProvider().getSqsClient()

Which is odd as I explicitly set the client provider on the read transform.  This was working well enough with the old SqsIO API to connect and process messages off the queue.

Any thoughts on why this might be happening?  Or avenues to pursue in debugging this?

Thanks.


Re: SqsIO exception when moving to AWS2 SDK

Posted by tc...@tutanota.com.
Thanks for your work on this!  We've since gone back to the original AWS SDK, but I'll give AWS2 another try once 2.27.0 is out.

Dec 15, 2020, 04:17 by aromanenko.dev@gmail.com:

> Too fast “Send” button click =)
>
> You can find snapshot artifacts here:
> https://repository.apache.org/content/repositories/snapshots/org/apache/beam/beam-sdks-java-io-amazon-web-services2/2.27.0-SNAPSHOT/
>
>
>> On 15 Dec 2020, at 13:14, Alexey Romanenko <ar...@gmail.com> wrote:
>>
>> Quick update on this.
>>
>> We have fixed an issue with AwsCredentialsProvider serialisation [1] for AWS v2 IOs (well, why it’s not serialisable by default it’s a different question) in 2.27.0.
>> Since it’s not yet released, feel free to test it with snapshot artifacts. 
>>
>> [1] >> https://issues.apache.org/jira/browse/BEAM-11016
>>
>>
>>> On 6 Oct 2020, at 23:25, >>> tclemons@tutanota.com>>>  wrote:
>>>
>>> Yep, same stacktrace.  NPE originating from line 41 of SqsUnboundedSource.java.
>>>
>>> I'll see about getting a remote debugger attached to the process.
>>>
>>> Oct 6, 2020, 14:06 by >>> aromanenko.dev@gmail.com>>> :
>>>
>>>> Hmm, do you have the same stack trace in this case?
>>>>
>>>> Can you debug it in runtime and make sure that read.sqsClientProvider() returns null in SqsUnboundedSource(Read)? I’m curious if all calls to SqsUnboundedSource(Read) were done in the same JVM.
>>>>
>>>>
>>>>> On 6 Oct 2020, at 22:14, >>>>> tclemons@tutanota.com>>>>>  wrote:
>>>>>
>>>>> To test this, I tried a workaround of an implementation of AwsCredentialsProvider that also implemented Serializable.  The resolveCredentials method of this class would call that static create function of DefaultCredentialsProvider and forward the task to that.  There are no fields in the class that should actually need to be serialized.
>>>>>
>>>>> However, this approach is failing in the exact same manner.  I suspect something else might be the culprit here.
>>>>>
>>>>> Oct 5, 2020, 08:56 by >>>>> aromanenko.dev@gmail.com>>>>> :
>>>>>
>>>>>> Seems like there is an issue with non-serialisable AwsCredentialsProvider as a member of BasicSqsClientProvider (which is Serializable).
>>>>>>
>>>>>>
>>>>>>> On 2 Oct 2020, at 20:11, >>>>>>> tclemons@tutanota.com>>>>>>>  wrote:
>>>>>>>
>>>>>>> The app itself is developed in Clojure, but here's the gist of how it's getting configured:
>>>>>>>
>>>>>>>     AwsCredentialsProvider credProvider = EnvrionmentVariableCredentialsProvider.create();
>>>>>>>     
>>>>>>>     pipeline.apply(
>>>>>>>       SqsIO.read()
>>>>>>>         .withQueueUrl(url)
>>>>>>>         .withSqsClientProvider(credProvider, region, endpoint));
>>>>>>>
>>>>>>> Oct 1, 2020, 08:48 by >>>>>>> aromanenko.dev@gmail.com>>>>>>> :
>>>>>>>
>>>>>>>> Could you send a code snippet of your pipeline with SqsIO v2 Read transform configuration?
>>>>>>>>
>>>>>>>>> On 30 Sep 2020, at 22:56, >>>>>>>>> tclemons@tutanota.com>>>>>>>>>  wrote:
>>>>>>>>>
>>>>>>>>> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache Spark 2.4.7.  However,
>>>>>>>>> when switching over to the new API and running it I keep getting the following exceptions:
>>>>>>>>>
>>>>>>>>> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
>>>>>>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
>>>>>>>>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
>>>>>>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
>>>>>>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
>>>>>>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
>>>>>>>>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
>>>>>>>>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
>>>>>>>>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
>>>>>>>>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>>>>>>>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>>>>>>>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>>>>>>>> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>>>>>>>>
>>>>>>>>> Examining the source of SqsUnboundedSource reveals a lambda where it's trying to chain a few references:
>>>>>>>>> read.sqsClientProvider().getSqsClient()
>>>>>>>>>
>>>>>>>>> Which is odd as I explicitly set the client provider on the read transform.  This was working well enough with the old SqsIO API to connect and process messages off the queue.
>>>>>>>>>
>>>>>>>>> Any thoughts on why this might be happening?  Or avenues to pursue in debugging this?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>
>>>


Re: SqsIO exception when moving to AWS2 SDK

Posted by Alexey Romanenko <ar...@gmail.com>.
Too fast “Send” button click =)

You can find snapshot artifacts here:
https://repository.apache.org/content/repositories/snapshots/org/apache/beam/beam-sdks-java-io-amazon-web-services2/2.27.0-SNAPSHOT/

> On 15 Dec 2020, at 13:14, Alexey Romanenko <ar...@gmail.com> wrote:
> 
> Quick update on this.
> 
> We have fixed an issue with AwsCredentialsProvider serialisation [1] for AWS v2 IOs (well, why it’s not serialisable by default it’s a different question) in 2.27.0.
> Since it’s not yet released, feel free to test it with snapshot artifacts. 
> 
> [1] https://issues.apache.org/jira/browse/BEAM-11016 <https://issues.apache.org/jira/browse/BEAM-11016>
> 
>> On 6 Oct 2020, at 23:25, tclemons@tutanota.com <ma...@tutanota.com> wrote:
>> 
>> Yep, same stacktrace.  NPE originating from line 41 of SqsUnboundedSource.java.
>> 
>> I'll see about getting a remote debugger attached to the process.
>> 
>> Oct 6, 2020, 14:06 by aromanenko.dev@gmail.com <ma...@gmail.com>:
>> Hmm, do you have the same stack trace in this case?
>> 
>> Can you debug it in runtime and make sure that read.sqsClientProvider() returns null in SqsUnboundedSource(Read)? I’m curious if all calls to SqsUnboundedSource(Read) were done in the same JVM.
>> 
>>> On 6 Oct 2020, at 22:14, tclemons@tutanota.com <ma...@tutanota.com> wrote:
>>> 
>>> To test this, I tried a workaround of an implementation of AwsCredentialsProvider that also implemented Serializable.  The resolveCredentials method of this class would call that static create function of DefaultCredentialsProvider and forward the task to that.  There are no fields in the class that should actually need to be serialized.
>>> 
>>> However, this approach is failing in the exact same manner.  I suspect something else might be the culprit here.
>>> 
>>> Oct 5, 2020, 08:56 by aromanenko.dev@gmail.com <ma...@gmail.com>:
>>> Seems like there is an issue with non-serialisable AwsCredentialsProvider as a member of BasicSqsClientProvider (which is Serializable).
>>> 
>>>> On 2 Oct 2020, at 20:11, tclemons@tutanota.com <ma...@tutanota.com> wrote:
>>>> 
>>>> The app itself is developed in Clojure, but here's the gist of how it's getting configured:
>>>> 
>>>>     AwsCredentialsProvider credProvider = EnvrionmentVariableCredentialsProvider.create();
>>>>     
>>>>     pipeline.apply(
>>>>       SqsIO.read()
>>>>         .withQueueUrl(url)
>>>>         .withSqsClientProvider(credProvider, region, endpoint));
>>>> 
>>>> Oct 1, 2020, 08:48 by aromanenko.dev@gmail.com <ma...@gmail.com>:
>>>> Could you send a code snippet of your pipeline with SqsIO v2 Read transform configuration?
>>>> On 30 Sep 2020, at 22:56, tclemons@tutanota.com <ma...@tutanota.com> wrote:
>>>> 
>>>> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache Spark 2.4.7. However,
>>>> when switching over to the new API and running it I keep getting the following exceptions:
>>>> 
>>>> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
>>>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
>>>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
>>>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
>>>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
>>>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>>> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>>> 
>>>> Examining the source of SqsUnboundedSource reveals a lambda where it's trying to chain a few references:
>>>> read.sqsClientProvider().getSqsClient()
>>>> 
>>>> Which is odd as I explicitly set the client provider on the read transform. This was working well enough with the old SqsIO API to connect and process messages off the queue.
>>>> 
>>>> Any thoughts on why this might be happening? Or avenues to pursue in debugging this?
>>>> 
>>>> Thanks.
>>>> 
>>> 
>>> 
>> 
>> 
> 


Re: SqsIO exception when moving to AWS2 SDK

Posted by Alexey Romanenko <ar...@gmail.com>.
Quick update on this.

We have fixed an issue with AwsCredentialsProvider serialisation [1] for AWS v2 IOs (well, why it’s not serialisable by default it’s a different question) in 2.27.0.
Since it’s not yet released, feel free to test it with snapshot artifacts. 

[1] https://issues.apache.org/jira/browse/BEAM-11016

> On 6 Oct 2020, at 23:25, tclemons@tutanota.com wrote:
> 
> Yep, same stacktrace.  NPE originating from line 41 of SqsUnboundedSource.java.
> 
> I'll see about getting a remote debugger attached to the process.
> 
> Oct 6, 2020, 14:06 by aromanenko.dev@gmail.com:
> Hmm, do you have the same stack trace in this case?
> 
> Can you debug it in runtime and make sure that read.sqsClientProvider() returns null in SqsUnboundedSource(Read)? I’m curious if all calls to SqsUnboundedSource(Read) were done in the same JVM.
> 
>> On 6 Oct 2020, at 22:14, tclemons@tutanota.com <ma...@tutanota.com> wrote:
>> 
>> To test this, I tried a workaround of an implementation of AwsCredentialsProvider that also implemented Serializable.  The resolveCredentials method of this class would call that static create function of DefaultCredentialsProvider and forward the task to that.  There are no fields in the class that should actually need to be serialized.
>> 
>> However, this approach is failing in the exact same manner.  I suspect something else might be the culprit here.
>> 
>> Oct 5, 2020, 08:56 by aromanenko.dev@gmail.com <ma...@gmail.com>:
>> Seems like there is an issue with non-serialisable AwsCredentialsProvider as a member of BasicSqsClientProvider (which is Serializable).
>> 
>>> On 2 Oct 2020, at 20:11, tclemons@tutanota.com <ma...@tutanota.com> wrote:
>>> 
>>> The app itself is developed in Clojure, but here's the gist of how it's getting configured:
>>> 
>>>     AwsCredentialsProvider credProvider = EnvrionmentVariableCredentialsProvider.create();
>>>     
>>>     pipeline.apply(
>>>       SqsIO.read()
>>>         .withQueueUrl(url)
>>>         .withSqsClientProvider(credProvider, region, endpoint));
>>> 
>>> Oct 1, 2020, 08:48 by aromanenko.dev@gmail.com <ma...@gmail.com>:
>>> Could you send a code snippet of your pipeline with SqsIO v2 Read transform configuration?
>>> On 30 Sep 2020, at 22:56, tclemons@tutanota.com <ma...@tutanota.com> wrote:
>>> 
>>> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache Spark 2.4.7. However,
>>> when switching over to the new API and running it I keep getting the following exceptions:
>>> 
>>> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
>>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
>>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
>>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
>>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
>>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>> 
>>> Examining the source of SqsUnboundedSource reveals a lambda where it's trying to chain a few references:
>>> read.sqsClientProvider().getSqsClient()
>>> 
>>> Which is odd as I explicitly set the client provider on the read transform. This was working well enough with the old SqsIO API to connect and process messages off the queue.
>>> 
>>> Any thoughts on why this might be happening? Or avenues to pursue in debugging this?
>>> 
>>> Thanks.
>>> 
>> 
>> 
> 
> 


Re: SqsIO exception when moving to AWS2 SDK

Posted by tc...@tutanota.com.
Yep, same stacktrace.  NPE originating from line 41 of SqsUnboundedSource.java.

I'll see about getting a remote debugger attached to the process.

Oct 6, 2020, 14:06 by aromanenko.dev@gmail.com:

> Hmm, do you have the same stack trace in this case?
>
> Can you debug it in runtime and make sure that read.sqsClientProvider() returns null in SqsUnboundedSource(Read)? I’m curious if all calls to SqsUnboundedSource(Read) were done in the same JVM.
>
>
>> On 6 Oct 2020, at 22:14, >> tclemons@tutanota.com>>  wrote:
>>
>> To test this, I tried a workaround of an implementation of AwsCredentialsProvider that also implemented Serializable.  The resolveCredentials method of this class would call that static create function of DefaultCredentialsProvider and forward the task to that.  There are no fields in the class that should actually need to be serialized.
>>
>> However, this approach is failing in the exact same manner.  I suspect something else might be the culprit here.
>>
>> Oct 5, 2020, 08:56 by >> aromanenko.dev@gmail.com>> :
>>
>>> Seems like there is an issue with non-serialisable AwsCredentialsProvider as a member of BasicSqsClientProvider (which is Serializable).
>>>
>>>
>>>> On 2 Oct 2020, at 20:11, >>>> tclemons@tutanota.com>>>>  wrote:
>>>>
>>>> The app itself is developed in Clojure, but here's the gist of how it's getting configured:
>>>>
>>>>     AwsCredentialsProvider credProvider = EnvrionmentVariableCredentialsProvider.create();
>>>>     
>>>>     pipeline.apply(
>>>>       SqsIO.read()
>>>>         .withQueueUrl(url)
>>>>         .withSqsClientProvider(credProvider, region, endpoint));
>>>>
>>>> Oct 1, 2020, 08:48 by >>>> aromanenko.dev@gmail.com>>>> :
>>>>
>>>>> Could you send a code snippet of your pipeline with SqsIO v2 Read transform configuration?
>>>>>
>>>>>> On 30 Sep 2020, at 22:56, >>>>>> tclemons@tutanota.com>>>>>>  wrote:
>>>>>>
>>>>>> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache Spark 2.4.7.  However,
>>>>>> when switching over to the new API and running it I keep getting the following exceptions:
>>>>>>
>>>>>> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
>>>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
>>>>>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
>>>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
>>>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
>>>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
>>>>>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
>>>>>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
>>>>>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
>>>>>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>>>>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>>>>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>>>>> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>>>>>
>>>>>> Examining the source of SqsUnboundedSource reveals a lambda where it's trying to chain a few references:
>>>>>> read.sqsClientProvider().getSqsClient()
>>>>>>
>>>>>> Which is odd as I explicitly set the client provider on the read transform.  This was working well enough with the old SqsIO API to connect and process messages off the queue.
>>>>>>
>>>>>> Any thoughts on why this might be happening?  Or avenues to pursue in debugging this?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>
>>>>
>>
>>


Re: SqsIO exception when moving to AWS2 SDK

Posted by Alexey Romanenko <ar...@gmail.com>.
Hmm, do you have the same stack trace in this case?

Can you debug it in runtime and make sure that read.sqsClientProvider() returns null in SqsUnboundedSource(Read)? I’m curious if all calls to SqsUnboundedSource(Read) were done in the same JVM.

> On 6 Oct 2020, at 22:14, tclemons@tutanota.com wrote:
> 
> To test this, I tried a workaround of an implementation of AwsCredentialsProvider that also implemented Serializable.  The resolveCredentials method of this class would call that static create function of DefaultCredentialsProvider and forward the task to that.  There are no fields in the class that should actually need to be serialized.
> 
> However, this approach is failing in the exact same manner.  I suspect something else might be the culprit here.
> 
> Oct 5, 2020, 08:56 by aromanenko.dev@gmail.com:
> Seems like there is an issue with non-serialisable AwsCredentialsProvider as a member of BasicSqsClientProvider (which is Serializable).
> 
>> On 2 Oct 2020, at 20:11, tclemons@tutanota.com <ma...@tutanota.com> wrote:
>> 
>> The app itself is developed in Clojure, but here's the gist of how it's getting configured:
>> 
>>     AwsCredentialsProvider credProvider = EnvrionmentVariableCredentialsProvider.create();
>>     
>>     pipeline.apply(
>>       SqsIO.read()
>>         .withQueueUrl(url)
>>         .withSqsClientProvider(credProvider, region, endpoint));
>> 
>> Oct 1, 2020, 08:48 by aromanenko.dev@gmail.com <ma...@gmail.com>:
>> Could you send a code snippet of your pipeline with SqsIO v2 Read transform configuration?
>> On 30 Sep 2020, at 22:56, tclemons@tutanota.com <ma...@tutanota.com> wrote:
>> 
>> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache Spark 2.4.7. However,
>> when switching over to the new API and running it I keep getting the following exceptions:
>> 
>> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>> 
>> Examining the source of SqsUnboundedSource reveals a lambda where it's trying to chain a few references:
>> read.sqsClientProvider().getSqsClient()
>> 
>> Which is odd as I explicitly set the client provider on the read transform. This was working well enough with the old SqsIO API to connect and process messages off the queue.
>> 
>> Any thoughts on why this might be happening? Or avenues to pursue in debugging this?
>> 
>> Thanks.
>> 
> 
> 


Re: SqsIO exception when moving to AWS2 SDK

Posted by tc...@tutanota.com.
To test this, I tried a workaround of an implementation of AwsCredentialsProvider that also implemented Serializable.  The resolveCredentials method of this class would call that static create function of DefaultCredentialsProvider and forward the task to that.  There are no fields in the class that should actually need to be serialized.

However, this approach is failing in the exact same manner.  I suspect something else might be the culprit here.

Oct 5, 2020, 08:56 by aromanenko.dev@gmail.com:

> Seems like there is an issue with non-serialisable AwsCredentialsProvider as a member of BasicSqsClientProvider (which is Serializable).
>
>
>> On 2 Oct 2020, at 20:11, >> tclemons@tutanota.com>>  wrote:
>>
>> The app itself is developed in Clojure, but here's the gist of how it's getting configured:
>>
>>     AwsCredentialsProvider credProvider = EnvrionmentVariableCredentialsProvider.create();
>>     
>>     pipeline.apply(
>>       SqsIO.read()
>>         .withQueueUrl(url)
>>         .withSqsClientProvider(credProvider, region, endpoint));
>>
>> Oct 1, 2020, 08:48 by >> aromanenko.dev@gmail.com>> :
>>
>>> Could you send a code snippet of your pipeline with SqsIO v2 Read transform configuration?
>>>
>>>> On 30 Sep 2020, at 22:56, >>>> tclemons@tutanota.com>>>>  wrote:
>>>>
>>>> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache Spark 2.4.7.  However,
>>>> when switching over to the new API and running it I keep getting the following exceptions:
>>>>
>>>> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
>>>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
>>>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
>>>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
>>>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
>>>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
>>>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>>> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>>>
>>>> Examining the source of SqsUnboundedSource reveals a lambda where it's trying to chain a few references:
>>>> read.sqsClientProvider().getSqsClient()
>>>>
>>>> Which is odd as I explicitly set the client provider on the read transform.  This was working well enough with the old SqsIO API to connect and process messages off the queue.
>>>>
>>>> Any thoughts on why this might be happening?  Or avenues to pursue in debugging this?
>>>>
>>>> Thanks.
>>>>
>>
>>


Re: SqsIO exception when moving to AWS2 SDK

Posted by Alexey Romanenko <ar...@gmail.com>.
I created a Jira for that:
https://issues.apache.org/jira/browse/BEAM-11016

> On 5 Oct 2020, at 17:56, Alexey Romanenko <ar...@gmail.com> wrote:
> 
> Seems like there is an issue with non-serialisable AwsCredentialsProvider as a member of BasicSqsClientProvider (which is Serializable).
> 
>> On 2 Oct 2020, at 20:11, tclemons@tutanota.com <ma...@tutanota.com> wrote:
>> 
>> The app itself is developed in Clojure, but here's the gist of how it's getting configured:
>> 
>>     AwsCredentialsProvider credProvider = EnvrionmentVariableCredentialsProvider.create();
>>     
>>     pipeline.apply(
>>       SqsIO.read()
>>         .withQueueUrl(url)
>>         .withSqsClientProvider(credProvider, region, endpoint));
>> 
>> Oct 1, 2020, 08:48 by aromanenko.dev@gmail.com <ma...@gmail.com>:
>> Could you send a code snippet of your pipeline with SqsIO v2 Read transform configuration?
>> On 30 Sep 2020, at 22:56, tclemons@tutanota.com <ma...@tutanota.com> wrote:
>> 
>> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache Spark 2.4.7. However,
>> when switching over to the new API and running it I keep getting the following exceptions:
>> 
>> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
>> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
>> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
>> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>> 
>> Examining the source of SqsUnboundedSource reveals a lambda where it's trying to chain a few references:
>> read.sqsClientProvider().getSqsClient()
>> 
>> Which is odd as I explicitly set the client provider on the read transform. This was working well enough with the old SqsIO API to connect and process messages off the queue.
>> 
>> Any thoughts on why this might be happening? Or avenues to pursue in debugging this?
>> 
>> Thanks.
>> 
> 


Re: SqsIO exception when moving to AWS2 SDK

Posted by Alexey Romanenko <ar...@gmail.com>.
Seems like there is an issue with non-serialisable AwsCredentialsProvider as a member of BasicSqsClientProvider (which is Serializable).

> On 2 Oct 2020, at 20:11, tclemons@tutanota.com wrote:
> 
> The app itself is developed in Clojure, but here's the gist of how it's getting configured:
> 
>     AwsCredentialsProvider credProvider = EnvrionmentVariableCredentialsProvider.create();
>     
>     pipeline.apply(
>       SqsIO.read()
>         .withQueueUrl(url)
>         .withSqsClientProvider(credProvider, region, endpoint));
> 
> Oct 1, 2020, 08:48 by aromanenko.dev@gmail.com:
> Could you send a code snippet of your pipeline with SqsIO v2 Read transform configuration?
> On 30 Sep 2020, at 22:56, tclemons@tutanota.com wrote:
> 
> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache Spark 2.4.7. However,
> when switching over to the new API and running it I keep getting the following exceptions:
> 
> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
> at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
> 
> Examining the source of SqsUnboundedSource reveals a lambda where it's trying to chain a few references:
> read.sqsClientProvider().getSqsClient()
> 
> Which is odd as I explicitly set the client provider on the read transform. This was working well enough with the old SqsIO API to connect and process messages off the queue.
> 
> Any thoughts on why this might be happening? Or avenues to pursue in debugging this?
> 
> Thanks.
> 


Re: SqsIO exception when moving to AWS2 SDK

Posted by tc...@tutanota.com.
The app itself is developed in Clojure, but here's the gist of how it's getting configured:

    AwsCredentialsProvider credProvider = EnvrionmentVariableCredentialsProvider.create();
    
    pipeline.apply(
      SqsIO.read()
        .withQueueUrl(url)
        .withSqsClientProvider(credProvider, region, endpoint));

Oct 1, 2020, 08:48 by aromanenko.dev@gmail.com:

> Could you send a code snippet of your pipeline with SqsIO v2 Read transform configuration?
>
>> On 30 Sep 2020, at 22:56, tclemons@tutanota.com wrote:
>>
>> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache Spark 2.4.7.  However,
>> when switching over to the new API and running it I keep getting the following exceptions:
>>
>> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
>>  at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
>>  at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
>>  at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
>>  at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
>>  at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
>>  at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
>>  at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
>>  at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
>>  at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>  at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>  at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>  at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>  
>> Examining the source of SqsUnboundedSource reveals a lambda where it's trying to chain a few references:
>>  read.sqsClientProvider().getSqsClient()
>>
>> Which is odd as I explicitly set the client provider on the read transform.  This was working well enough with the old SqsIO API to connect and process messages off the queue.
>>
>> Any thoughts on why this might be happening?  Or avenues to pursue in debugging this?
>>
>> Thanks.
>>


Re: SqsIO exception when moving to AWS2 SDK

Posted by Alexey Romanenko <ar...@gmail.com>.
Could you send a code snippet of your pipeline with SqsIO v2 Read transform configuration?

> On 30 Sep 2020, at 22:56, tclemons@tutanota.com wrote:
> 
> I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache Spark 2.4.7.  However,
> when switching over to the new API and running it I keep getting the following exceptions:
> 
> 2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException
> 	at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
> 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
> 	at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
> 	at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
> 	at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
> 	
> Examining the source of SqsUnboundedSource reveals a lambda where it's trying to chain a few references:
>     read.sqsClientProvider().getSqsClient()
> 
> Which is odd as I explicitly set the client provider on the read transform.  This was working well enough with the old SqsIO API to connect and process messages off the queue.
> 
> Any thoughts on why this might be happening?  Or avenues to pursue in debugging this?
> 
> Thanks.
>