You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Nishkam Ravi (JIRA)" <ji...@apache.org> on 2017/06/27 21:04:00 UTC
[jira] [Created] (KAFKA-5528) Error while reading topic, offset,
partition info from process method
Nishkam Ravi created KAFKA-5528:
-----------------------------------
Summary: Error while reading topic, offset, partition info from process method
Key: KAFKA-5528
URL: https://issues.apache.org/jira/browse/KAFKA-5528
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.10.2.0
Reporter: Nishkam Ravi
We are encountering an IllegalStateException while trying to access context.topic() from process function. The code is written in Scala and is being launched using sbt (spring isn't involved). Here's the code sketch:
class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] with LazyLogging {
private var hsmClient: HSMClient = _
override def init(processorContext: ProcessorContext): Unit = {
super.init(processorContext)
hsmClient = HSMClient(config).getOrElse(null)
}
override def process(key: Array[Byte], value: Array[Byte]): Unit = {
val topic: String = this.context.topic()
partition: Int = this.context.partition()
val offset: Long = this.context.offset()
val timestamp: Long = this.context.timestamp()
// business logic
}
}
The exception is thrown only for the multi-consumer case (when number of partitions for a topic > 1 and parallelism > 1).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)