You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/10/24 19:47:21 UTC

svn commit: r1188255 - in /incubator/kafka/trunk/core/src/main/scala/kafka/server: KafkaServer.scala KafkaZooKeeper.scala

Author: jkreps
Date: Mon Oct 24 17:47:20 2011
New Revision: 1188255

URL: http://svn.apache.org/viewvc?rev=1188255&view=rev
Log:
KAFKA-150 Make the error message for duplicate node ids in zk more self-explanatory.


Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1188255&r1=1188254&r2=1188255&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Mon Oct 24 17:47:20 2011
@@ -82,8 +82,7 @@ class KafkaServer(val config: KafkaConfi
     }
     catch {
       case e =>
-        logger.fatal(e)
-        logger.fatal(Utils.stackTrace(e))
+        logger.fatal("Fatal error during startup.", e)
         shutdown
     }
   }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1188255&r1=1188254&r2=1188255&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Mon Oct 24 17:47:20 2011
@@ -21,6 +21,7 @@ import kafka.utils._
 import org.apache.log4j.Logger
 import kafka.cluster.Broker
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import kafka.log.LogManager
 import java.net.InetAddress
@@ -52,7 +53,15 @@ class KafkaZooKeeper(config: KafkaConfig
     val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
     val creatorId = hostName + "-" + System.currentTimeMillis
     val broker = new Broker(config.brokerId, creatorId, hostName, config.port)
-    ZkUtils.createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
+    try {
+      ZkUtils.createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
+    } catch {
+      case e: ZkNodeExistsException =>
+        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + 
+                                   "indicates that you either have configured a brokerid that is already in use, or " + 
+                                   "else you have shutdown this broker and restarted it faster than the zookeeper " + 
+                                   "timeout so it appears to be re-registering.")
+    }
     logger.info("Registering broker " + brokerIdPath + " succeeded with " + broker)
   }