You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2017/07/05 18:23:00 UTC

[jira] [Reopened] (KAFKA-5528) Error while reading topic, offset, partition info from process method

     [ https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax reopened KAFKA-5528:
------------------------------------

> Error while reading topic, offset, partition info from process method
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-5528
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5528
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access {{context.topic()}} from process function. The code is written in Scala and is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
>     super.init(processorContext) 
>     hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
>     val topic: String = this.context.topic() 
>     partition: Int = this.context.partition() 
>     val offset: Long = this.context.offset() 
>     val timestamp: Long = this.context.timestamp() 
>     // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)