You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Varsha Thanooj (JIRA)" <ji...@apache.org> on 2018/08/02 09:29:00 UTC

[jira] [Created] (BEAM-5060) Issues with aws KPL while writing to kinesis using beam

Varsha Thanooj created BEAM-5060:
------------------------------------

             Summary: Issues with aws KPL while writing to kinesis using beam
                 Key: BEAM-5060
                 URL: https://issues.apache.org/jira/browse/BEAM-5060
             Project: Beam
          Issue Type: Bug
          Components: io-java-aws
    Affects Versions: 2.5.0
            Reporter: Varsha Thanooj
            Assignee: Ismaël Mejía


I am trying to write data to kinesis using apache beam kinesis IO. But I am having some issues.

PS: I am using aws sts.

 

The console output shows....

 
{code:java}
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.producer.IKinesisProducer.addUserRecord(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/nio/ByteBuffer;)Lorg/apache/beam/repackaged/beam_sdks_java_io_kinesis/com/google/common/util/concurrent/ListenableFuture;

at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)

at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)

at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)

at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)

at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)

at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)

at com.nestaway.beam_demo.KinesisSql.main(KinesisSql.java:153)

Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.producer.IKinesisProducer.addUserRecord(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/nio/ByteBuffer;)Lorg/apache/beam/repackaged/beam_sdks_java_io_kinesis/com/google/common/util/concurrent/ListenableFuture;

at org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.processElement(KinesisIO.java:568)


{code}
 

 

Code.......

data is a Pcollection in byte[ ] format.

 
{code:java}
data.apply(KinesisIO.write()
    .withStreamName("stagWatchBallEventStream")
    .withPartitionKey("a")
    .withAWSClientsProvider(new CustomKinesisClientProvider()));
{code}
 

 Custom Kinesis Client :
{code:java}
public class CustomKinesisClientProvider implements AWSClientsProvider {
private static final long serialVersionUID = 1L;

private static String ID = "XXXXX";

private static String SECRET = "XXXXX";

private static String TOKEN = "XXXXX";

private static BasicSessionCredentials sessionCredentials = new BasicSessionCredentials(

  ID,

  SECRET,

  TOKEN);



private static KinesisProducerConfiguration config = new KinesisProducerConfiguration()

           .setRecordMaxBufferedTime(3000)

           .setMaxConnections(1)

           .setRequestTimeout(60000)

           .setRegion("us-west-2")

           .setCredentialsProvider(new AWSStaticCredentialsProvider(sessionCredentials));

@Override
public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration config) {
return new KinesisProducer(config);
}

}{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)