You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Stuart Reynolds <st...@stureynolds.com> on 2015/03/09 21:56:15 UTC

kafka log ERROR Closing socket for IP -- Connection reset by peer

I'm calling ConsumerConnector.shutdown to close a consumer connection
and kafka's log reports an error?

I don't see a similar error when using SimpleConsumer.

Is there a way to close ConsumerConnector so that the errors aren't
reported in the kafka log (this is making it very difficult to sift
through the log a find real errors).

Found this -- but I didn't see a fix
https://github.com/claudemamo/kafka-web-console/issues/37


Here's what the kafka log produces:

[2015-03-09 13:47:40,308] ERROR Closing socket for /172.18.251.1
because of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:375)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:347)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Thread.java:744)


Here's how I'm using a consumer connector (in scala) -- did I forget
to close something?

  @Test
  def consumerCloseTest(): Unit = {
    val topic = "UserEvent"

    val consumerConfig = KafkaUtil.makeConsumerConfig("UserEventTest",
testProperties)
    var consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
    try {
      // Read one item
      val numThreads: Int = 1
      val topicCountMap = Map[String, Int](topic -> numThreads);
      val topicMessageStreams: scala.collection.Map[String,
List[KafkaStream[Array[Byte], Array[Byte]]]] =
consumerConnector.createMessageStreams(topicCountMap);
      val streams = topicMessageStreams.get(topic);
      val stream = streams.get(0);
      val it = stream.iterator();

      while (it.hasNext()) {
        logger.info("DONE")
        return
      }
    } finally { consumerConnector.shutdown }
  }