You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aaron Langford (JIRA)" <ji...@apache.org> on 2018/06/20 00:16:00 UTC

[jira] [Created] (FLINK-9618) NullPointerException in FlinkKinesisProducer when aws.region is not set and aws.endpoint is set

Aaron Langford created FLINK-9618:
-------------------------------------

             Summary: NullPointerException in FlinkKinesisProducer when aws.region is not set and aws.endpoint is set
                 Key: FLINK-9618
                 URL: https://issues.apache.org/jira/browse/FLINK-9618
             Project: Flink
          Issue Type: Bug
          Components: Kinesis Connector
    Affects Versions: 1.5.0
         Environment: N/A
            Reporter: Aaron Langford


This problem arose while trying to write to a local kinesalite instance. Specifying the aws.region and the aws.endpoint is not allowed. However when the aws.region is not present, a NullPointer exception is thrown.

Here is some example Scala code:
{code:java}
/**
  *
  * @param region the AWS region the stream lives in
  * @param streamName the stream to write records to
  * @param endpoint if in local dev, this points to a kinesalite instance
  * @return
  */
def getSink(region: String,
            streamName: String,
            endpoint: Option[String]): FlinkKinesisProducer[ProcessedMobilePageView] = {
  val props = new Properties()
  props.put(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO")

  endpoint match {
    case Some(uri) => props.put(AWSConfigConstants.AWS_ENDPOINT, uri)
    case None => props.put(AWSConfigConstants.AWS_REGION, region)
  }

  val producer = new FlinkKinesisProducer[ProcessedMobilePageView](
    new JsonSerializer[ProcessedMobilePageView](DefaultSerializationBuilder),
    props
  )
  producer.setDefaultStream(streamName)

  producer
}
{code}
To produce the NullPointerException, pass in `Some("localhost:4567")` for endpoint.

The source of the error is found at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.java, on line 194. This line should perform some kind of check if aws.endpoint is present before grabbing it from the Properties object.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)