You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by jonas eyob <jo...@gmail.com> on 2021/12/02 23:05:01 UTC

Cannot consum from Kinesalite using FlinkKinesisConsumer

Hi all, I have a really simple pipeline to consume events from a local
kinesis (kinesalite) and print them out to stdout. But struggling to make
sense of why it's failing almost immediately

The pipeline code:

/* Added this to verify it wasn't a problem with AWS CBOR which needs
to be disabled */
System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
"true")
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
"true")
System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking",
"true")

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val consumerConfig = new Properties()

consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO")
consumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"FAKE_ACCESS_KEY")
consumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"FAKE_SECRET_ACCESS_KEY")
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST")
consumerConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT,
"http://localhost:4567")

env
  .addSource(
    new FlinkKinesisConsumer[String](
      "user-profile-events-local",
      new SimpleStringSchema,
      consumerConfig
    )
  )
  .print()

env.execute("echo stream")

When running this I am getting this:

Error I get from running this locally:

22:27:23.372 [flink-akka.actor.default-dispatcher-5] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264)
switched from INITIALIZING to RUNNING.
Dec 02, 2021 10:27:23 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:23 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:23 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:24 PM
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient
createSocketFactoryRegistry
WARNING: SSL Certificate checking for endpoints has been explicitly
disabled.
Dec 02, 2021 10:27:24 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:24 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:24 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
22:27:24.328 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0]
WARN  org.apache.flink.runtime.taskmanager.Task - Source: Custom Source ->
Sink: Print to Std. Out (1/1)#0 (7e920c6918655278fbd09e7658847264) switched
from RUNNING to FAILED with failure cause:
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
null (Service: AmazonKinesis; Status Code: 400; Error Code:
UnknownOperationException; Request ID:
05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
Suppressed: java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
at
org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928)
at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
22:27:24.329 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0]
INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources
for Source: Custom Source -> Sink: Print to Std. Out (1/1)#0
(7e920c6918655278fbd09e7658847264).
22:27:24.341 [flink-akka.actor.default-dispatcher-9] INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
and sending final execution state FAILED to JobManager for task Source:
Custom Source -> Sink: Print to Std. Out (1/1)#0
7e920c6918655278fbd09e7658847264.
22:27:24.359 [flink-akka.actor.default-dispatcher-5] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264)
switched from RUNNING to FAILED on 948a82f7-e5e2-4e5f-b309-49ee92eb2006 @
localhost (dataPort=-1).
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
null (Service: AmazonKinesis; Status Code: 400; Error Code:
UnknownOperationException; Request ID:
05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
Suppressed: java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
~[flink-core-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
~[flink-runtime-1.14.0.jar:1.14.0]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-runtime-1.14.0.jar:1.14.0]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
~[flink-runtime-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-runtime-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-runtime-1.14.0.jar:1.14.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]

-- 
*Med Vänliga Hälsningar*
*Jonas Eyob*

Re: Cannot consum from Kinesalite using FlinkKinesisConsumer

Posted by jonas eyob <jo...@gmail.com>.
Hey Mika,

Were using kinesalite 1.11.5

Yeah, after bumping it to 3.3.3 the first issue which was related to not
being able to list shards disappeared and instead I'm seeing the same issue
you are mentioning

"The timestampInMillis parameter cannot be greater than the
currentTimestampInMillis"

Found a discussion on the same topic:
https://github.com/awslabs/amazon-kinesis-connector-flink/issues/13
It appears to be an issue with aws sdk and likely due to setting AWS_CBOR
to disabled (required when running locally).

Specifically, when using AT_TIMESTAMP (which LATEST is translated to
https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L310-L331)
it appears the Flink job is requesting the ShardIterator with a timestamp
in ms rather than seconds.

This raises the problem mentioned in kinesalite:
https://github.com/mhart/kinesalite/blob/4019d70a135226f33f1cdec4091f4391e631d2c9/actions/getShardIterator.js#L79-L89

Perhaps using TRIM_HORIZON locally will be the workaround for now - doesn't
appear a fix is in sight


Den fre 3 dec. 2021 kl 12:47 skrev Mika Naylor <ma...@autophagy.io>:

> Hey Jonas,
>
> May I ask what version of Kinesalite you're targeting? With 3.3.3 and
> STREAM_INITIAL_POSITION = "LATEST", I received a "The timestampInMillis
> parameter cannot be greater than the currentTimestampInMillis" which may
> be a misconfiguration on my setup, but with STREAM_INITIAL_POSITION =
> "TRIM_HORIZON" I was able to consume events from the stream.
>
> This was with 1.14.0 of the Kinesis Flink connector.
>
> Kind regards,
> Mika
>
>
> On 02.12.2021 23:05, jonas eyob wrote:
> >Hi all, I have a really simple pipeline to consume events from a local
> >kinesis (kinesalite) and print them out to stdout. But struggling to make
> >sense of why it's failing almost immediately
> >
> >The pipeline code:
> >
> >/* Added this to verify it wasn't a problem with AWS CBOR which needs
> >to be disabled */
>
> >System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
> >"true")
>
> >System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
> >"true")
>
> >System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking",
> >"true")
> >
> >val env = StreamExecutionEnvironment.getExecutionEnvironment
> >env.setParallelism(1)
> >
> >val consumerConfig = new Properties()
> >
> >consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1")
> >consumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
> "AUTO")
> >consumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
> >"FAKE_ACCESS_KEY")
> >consumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
> >"FAKE_SECRET_ACCESS_KEY")
>
> >consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> >"LATEST")
> >consumerConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT,
> >"http://localhost:4567")
> >
> >env
> >  .addSource(
> >    new FlinkKinesisConsumer[String](
> >      "user-profile-events-local",
> >      new SimpleStringSchema,
> >      consumerConfig
> >    )
> >  )
> >  .print()
> >
> >env.execute("echo stream")
> >
> >When running this I am getting this:
> >
> >Error I get from running this locally:
> >
> >22:27:23.372 [flink-akka.actor.default-dispatcher-5] INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
> >Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264)
> >switched from INITIALIZING to RUNNING.
> >Dec 02, 2021 10:27:23 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >Dec 02, 2021 10:27:23 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >Dec 02, 2021 10:27:23 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >Dec 02, 2021 10:27:24 PM
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient
> >createSocketFactoryRegistry
> >WARNING: SSL Certificate checking for endpoints has been explicitly
> >disabled.
> >Dec 02, 2021 10:27:24 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >Dec 02, 2021 10:27:24 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >Dec 02, 2021 10:27:24 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >22:27:24.328 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0]
> >WARN  org.apache.flink.runtime.taskmanager.Task - Source: Custom Source ->
> >Sink: Print to Std. Out (1/1)#0 (7e920c6918655278fbd09e7658847264)
> switched
> >from RUNNING to FAILED with failure cause:
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
> >null (Service: AmazonKinesis; Status Code: 400; Error Code:
> >UnknownOperationException; Request ID:
> >05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559)
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515)
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460)
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335)
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925)
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312)
> >at
>
> >org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
> >at
>
> >org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
> >at
>
> >org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
> >Suppressed: java.lang.NullPointerException
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
> >at
>
> >org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
> >at
>
> >org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
> >at
>
> >org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130)
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
> >at
>
> >org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035)
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021)
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928)
> >at
>
> >org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
> >at
>
> >org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> >at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
> >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> >at java.lang.Thread.run(Thread.java:748)
> >22:27:24.329 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0]
> >INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources
> >for Source: Custom Source -> Sink: Print to Std. Out (1/1)#0
> >(7e920c6918655278fbd09e7658847264).
> >22:27:24.341 [flink-akka.actor.default-dispatcher-9] INFO
> > org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
> >and sending final execution state FAILED to JobManager for task Source:
> >Custom Source -> Sink: Print to Std. Out (1/1)#0
> >7e920c6918655278fbd09e7658847264.
> >22:27:24.359 [flink-akka.actor.default-dispatcher-5] INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
> >Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264)
> >switched from RUNNING to FAILED on 948a82f7-e5e2-4e5f-b309-49ee92eb2006 @
> >localhost (dataPort=-1).
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
> >null (Service: AmazonKinesis; Status Code: 400; Error Code:
> >UnknownOperationException; Request ID:
> >05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >Suppressed: java.lang.NullPointerException
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
> >~[flink-core-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
> >~[flink-runtime-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> >~[flink-runtime-1.14.0.jar:1.14.0]
> >at
> >org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
> >~[flink-runtime-1.14.0.jar:1.14.0]
> >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> >~[flink-runtime-1.14.0.jar:1.14.0]
> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> >~[flink-runtime-1.14.0.jar:1.14.0]
> >at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> >
> >--
> >*Med Vänliga Hälsningar*
> >*Jonas Eyob*
>
> Mika Naylor
> https://autophagy.io
>


-- 
*Med Vänliga Hälsningar*
*Jonas Eyob*

Re: Cannot consum from Kinesalite using FlinkKinesisConsumer

Posted by Mika Naylor <ma...@autophagy.io>.
Hey Jonas,

May I ask what version of Kinesalite you're targeting? With 3.3.3 and
STREAM_INITIAL_POSITION = "LATEST", I received a "The timestampInMillis
parameter cannot be greater than the currentTimestampInMillis" which may
be a misconfiguration on my setup, but with STREAM_INITIAL_POSITION =
"TRIM_HORIZON" I was able to consume events from the stream.

This was with 1.14.0 of the Kinesis Flink connector.

Kind regards,
Mika


On 02.12.2021 23:05, jonas eyob wrote:
>Hi all, I have a really simple pipeline to consume events from a local
>kinesis (kinesalite) and print them out to stdout. But struggling to make
>sense of why it's failing almost immediately
>
>The pipeline code:
>
>/* Added this to verify it wasn't a problem with AWS CBOR which needs
>to be disabled */
>System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
>"true")
>System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
>"true")
>System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking",
>"true")
>
>val env = StreamExecutionEnvironment.getExecutionEnvironment
>env.setParallelism(1)
>
>val consumerConfig = new Properties()
>
>consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1")
>consumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO")
>consumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
>"FAKE_ACCESS_KEY")
>consumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
>"FAKE_SECRET_ACCESS_KEY")
>consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
>"LATEST")
>consumerConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT,
>"http://localhost:4567")
>
>env
>  .addSource(
>    new FlinkKinesisConsumer[String](
>      "user-profile-events-local",
>      new SimpleStringSchema,
>      consumerConfig
>    )
>  )
>  .print()
>
>env.execute("echo stream")
>
>When running this I am getting this:
>
>Error I get from running this locally:
>
>22:27:23.372 [flink-akka.actor.default-dispatcher-5] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
>Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264)
>switched from INITIALIZING to RUNNING.
>Dec 02, 2021 10:27:23 PM
>org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
>loadProfiles
>WARNING: Your profile name includes a 'profile ' prefix. This is considered
>part of the profile name in the Java SDK, so you will need to include this
>prefix in your profile name when you reference this profile from your Java
>code.
>Dec 02, 2021 10:27:23 PM
>org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
>loadProfiles
>WARNING: Your profile name includes a 'profile ' prefix. This is considered
>part of the profile name in the Java SDK, so you will need to include this
>prefix in your profile name when you reference this profile from your Java
>code.
>Dec 02, 2021 10:27:23 PM
>org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
>loadProfiles
>WARNING: Your profile name includes a 'profile ' prefix. This is considered
>part of the profile name in the Java SDK, so you will need to include this
>prefix in your profile name when you reference this profile from your Java
>code.
>Dec 02, 2021 10:27:24 PM
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient
>createSocketFactoryRegistry
>WARNING: SSL Certificate checking for endpoints has been explicitly
>disabled.
>Dec 02, 2021 10:27:24 PM
>org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
>loadProfiles
>WARNING: Your profile name includes a 'profile ' prefix. This is considered
>part of the profile name in the Java SDK, so you will need to include this
>prefix in your profile name when you reference this profile from your Java
>code.
>Dec 02, 2021 10:27:24 PM
>org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
>loadProfiles
>WARNING: Your profile name includes a 'profile ' prefix. This is considered
>part of the profile name in the Java SDK, so you will need to include this
>prefix in your profile name when you reference this profile from your Java
>code.
>Dec 02, 2021 10:27:24 PM
>org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
>loadProfiles
>WARNING: Your profile name includes a 'profile ' prefix. This is considered
>part of the profile name in the Java SDK, so you will need to include this
>prefix in your profile name when you reference this profile from your Java
>code.
>22:27:24.328 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0]
>WARN  org.apache.flink.runtime.taskmanager.Task - Source: Custom Source ->
>Sink: Print to Std. Out (1/1)#0 (7e920c6918655278fbd09e7658847264) switched
>from RUNNING to FAILED with failure cause:
>org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
>null (Service: AmazonKinesis; Status Code: 400; Error Code:
>UnknownOperationException; Request ID:
>05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559)
>at
>org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515)
>at
>org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460)
>at
>org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335)
>at
>org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925)
>at
>org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312)
>at
>org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
>at
>org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
>at
>org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
>Suppressed: java.lang.NullPointerException
>at
>org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
>at
>org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>at
>org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
>at
>org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130)
>at
>org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
>at
>org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
>at
>org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035)
>at
>org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021)
>at
>org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928)
>at
>org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
>at
>org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>at java.lang.Thread.run(Thread.java:748)
>22:27:24.329 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0]
>INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources
>for Source: Custom Source -> Sink: Print to Std. Out (1/1)#0
>(7e920c6918655278fbd09e7658847264).
>22:27:24.341 [flink-akka.actor.default-dispatcher-9] INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
>and sending final execution state FAILED to JobManager for task Source:
>Custom Source -> Sink: Print to Std. Out (1/1)#0
>7e920c6918655278fbd09e7658847264.
>22:27:24.359 [flink-akka.actor.default-dispatcher-5] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
>Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264)
>switched from RUNNING to FAILED on 948a82f7-e5e2-4e5f-b309-49ee92eb2006 @
>localhost (dataPort=-1).
>org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
>null (Service: AmazonKinesis; Status Code: 400; Error Code:
>UnknownOperationException; Request ID:
>05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null)
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
>~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
>~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
>~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
>Suppressed: java.lang.NullPointerException
>at
>org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
>~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>~[flink-core-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
>~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130)
>~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
>~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
>~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035)
>~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021)
>~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928)
>~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
>at
>org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
>~[flink-runtime-1.14.0.jar:1.14.0]
>at
>org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>~[flink-runtime-1.14.0.jar:1.14.0]
>at
>org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
>~[flink-runtime-1.14.0.jar:1.14.0]
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>~[flink-runtime-1.14.0.jar:1.14.0]
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>~[flink-runtime-1.14.0.jar:1.14.0]
>at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
>
>-- 
>*Med Vänliga Hälsningar*
>*Jonas Eyob*

Mika Naylor
https://autophagy.io