You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/21 01:26:42 UTC

svn commit: r1291536 - /incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala

Author: junrao
Date: Tue Feb 21 00:26:42 2012
New Revision: 1291536

URL: http://svn.apache.org/viewvc?rev=1291536&view=rev
Log:
A tool to UPDATE Zookeeper partition-offset with input from a file; patched by John Fung; reviewed by Jun Rao; KAFKA-255

Added:
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala

Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala?rev=1291536&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala Tue Feb 21 00:26:42 2012
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.BufferedReader
+import java.io.FileReader
+import joptsimple._
+import kafka.utils.{Logging, ZkUtils,ZKStringSerializer}
+import org.I0Itec.zkclient.ZkClient
+
+
+/**
+ *  A utility that updates the offset of broker partitions in ZK.
+ *  
+ *  This utility expects 2 input files as arguments:
+ *  1. consumer properties file
+ *  2. a file contains partition offsets data such as:
+ *     (This output data file can be obtained by running kafka.tools.ExportZkOffsets)
+ *  
+ *     /consumers/group1/offsets/topic1/3-0:285038193
+ *     /consumers/group1/offsets/topic1/1-0:286894308
+ *     
+ *  To print debug message, add the following line to log4j.properties:
+ *  log4j.logger.kafka.tools.ImportZkOffsets$=DEBUG
+ *  (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin)
+ */
+object ImportZkOffsets extends Logging {
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+    
+    val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.")
+                            .withRequiredArg()
+                            .defaultsTo("localhost:2181")
+                            .ofType(classOf[String])
+    val inFileOpt = parser.accepts("input-file", "Input file")
+                            .withRequiredArg()
+                            .ofType(classOf[String])
+    parser.accepts("help", "Print this message.")
+            
+    val options = parser.parse(args : _*)
+    
+    if (options.has("help")) {
+       parser.printHelpOn(System.out)
+       System.exit(0)
+    }
+    
+    for (opt <- List(inFileOpt)) {
+      if (!options.has(opt)) {
+        System.err.println("Missing required argument: %s".format(opt))
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+    
+    val zkConnect           = options.valueOf(zkConnectOpt)
+    val partitionOffsetFile = options.valueOf(inFileOpt)
+
+    val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+    val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile)
+
+    updateZkOffsets(zkClient, partitionOffsets)
+  }
+
+  private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = {
+    val fr = new FileReader(filename)
+    val br = new BufferedReader(fr)
+    var partOffsetsMap: Map[String,String] = Map()
+    
+    var s: String = br.readLine()
+    while ( s != null && s.length() >= 1) {
+      val tokens = s.split(":")
+      
+      partOffsetsMap += tokens(0) -> tokens(1)
+      debug("adding node path [" + s + "]")
+      
+      s = br.readLine()
+    }
+    
+    return partOffsetsMap
+  }
+  
+  private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = {
+    val cluster = ZkUtils.getCluster(zkClient)
+    var partitions: List[String] = Nil
+
+    for ((partition, offset) <- partitionOffsets) {
+      debug("updating [" + partition + "] with offset [" + offset + "]")
+      
+      try {
+        ZkUtils.updatePersistentPath(zkClient, partition, offset.toString)
+      } catch {
+        case e => e.printStackTrace()
+      }
+    }
+  }
+}