You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2014/01/14 03:16:45 UTC

git commit: KAFKA-1200 inconsistent log levels when consumed offset is reset patch by Dima Pekar reviewed by Joe Stein

Updated Branches:
  refs/heads/trunk d2e2c607d -> d401292ab


KAFKA-1200 inconsistent log levels when consumed offset is reset patch by Dima Pekar reviewed by Joe Stein


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d401292a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d401292a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d401292a

Branch: refs/heads/trunk
Commit: d401292abb0f3660895173d4613f712058ae097f
Parents: d2e2c60
Author: Joe Stein <jo...@stealth.ly>
Authored: Mon Jan 13 21:16:32 2014 -0500
Committer: Joe Stein <jo...@stealth.ly>
Committed: Mon Jan 13 21:16:32 2014 -0500

----------------------------------------------------------------------
 .../main/scala/kafka/server/AbstractFetcherThread.scala   | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d401292a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index bb2dd90..db7017b 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -97,7 +97,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
     } catch {
       case t: Throwable =>
         if (isRunning.get) {
-          warn("Error in fetch %s".format(fetchRequest), t)
+          error("Error in fetch %s".format(fetchRequest), t)
           partitionMapLock synchronized {
             partitionsWithError ++= partitionMap.keys
           }
@@ -134,7 +134,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                       // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
                       // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                       //    should get fixed in the subsequent fetches
-                      logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
+                      logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
                     case e: Throwable =>
                       throw new KafkaException("error processing data for partition [%s,%d] offset %d"
                                                .format(topic, partitionId, currentOffset.get), e)
@@ -143,16 +143,16 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                   try {
                     val newOffset = handleOffsetOutOfRange(topicAndPartition)
                     partitionMap.put(topicAndPartition, newOffset)
-                    warn("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
+                    error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
                       .format(currentOffset.get, topic, partitionId, newOffset))
                   } catch {
                     case e: Throwable =>
-                      warn("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
+                      error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
                       partitionsWithError += topicAndPartition
                   }
                 case _ =>
                   if (isRunning.get) {
-                    warn("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
+                    error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
                       ErrorMapping.exceptionFor(partitionData.error).getClass))
                     partitionsWithError += topicAndPartition
                   }