You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Michael Ravits <mi...@gmail.com> on 2016/09/06 19:50:13 UTC

kafka.common.UnknownException

Hi,

We are getting the error i've pasted at the bottom of the mail in a Samza
task, it never recovers and keeps printing messages like this in the log:

2016-07-27 17:46:20 BrokerProxy [DEBUG] Removed [event,1]
2016-07-27 17:46:20 KafkaSystemConsumer [INFO] Abdicating for [event,1]
2016-07-27 17:46:20 KafkaSystemConsumer [INFO] Refreshing brokers for:
Map([event,1] -> 146737816)
2016-07-27 17:46:20 BrokerProxy [DEBUG] Adding new topic and partition
[event,1] to queue for dockerc3.adjs.net
2016-07-27 17:46:20 GetOffset [INFO] Validating offset 146737816 for topic
and partition [event,1]
2016-07-27 17:46:20 GetOffset [INFO] Able to successfully read from offset
146737816 for topic and partition [event,1]. Using it to instantiate
consumer.
2016-07-27 17:46:20 BrokerProxy [DEBUG] Got offset 146737816 for new topic
and partition [event,1].
2016-07-27 17:46:20 BrokerProxy [DEBUG] Tried to start an already started
broker proxy (BrokerProxy for dockerc3.adjs.net:9092). Ignoring.
2016-07-27 17:46:20 KafkaSystemConsumer [DEBUG] Claimed topic-partition
([event,1]) for (BrokerProxy for dockerc3.adjs.net:9092)


Redeploying the task resolves the problem until the next time it happens.

We are on Java 1.8, Samza 0.9.0 and Kafka 0.9.

Could the problem be related to the difference in versions between the
Kafka consumer used in Samza and the version of the Kafka broker?

Could upgrading to Samza 10 help in this case?

Would appreciate any advice.

Thanks,
Michael

2016-07-27 17:25:04 BrokerProxy [WARN] Restarting consumer due to
kafka.common.UnknownException. Releasing ownership of all partitions, and
restarting consumer. Turn on debugging to get a full stack trace.
2016-07-27 17:25:04 BrokerProxy [DEBUG] Exception detail:
kafka.common.UnknownException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
    at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(
SimpleConsumer.scala:169)
    at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.
earliestOrLatestOffset(DefaultFetchSimpleConsumer.scala:56)
    at org.apache.samza.system.kafka.BrokerProxy$$anonfun$
refreshLatencyMetrics$1.apply(BrokerProxy.scala:310)
    at org.apache.samza.system.kafka.BrokerProxy$$anonfun$
refreshLatencyMetrics$1.apply(BrokerProxy.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.samza.system.kafka.BrokerProxy.refreshLatencyMetrics(
BrokerProxy.scala:308)
    at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$
system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:187)
    at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$
run$1.apply(BrokerProxy.scala:146)
    at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$
run$1.apply(BrokerProxy.scala:133)
    at org.apache.samza.util.ExponentialSleepStrategy.run(
ExponentialSleepStrategy.scala:82)
    at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(
BrokerProxy.scala:132)
    at java.lang.Thread.run(Thread.java:745)
2016-07-27 17:25:04 BrokerProxy [DEBUG] Removed [event,3]