You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/21 01:26:42 UTC
svn commit: r1291536 -
/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
Author: junrao
Date: Tue Feb 21 00:26:42 2012
New Revision: 1291536
URL: http://svn.apache.org/viewvc?rev=1291536&view=rev
Log:
A tool to UPDATE Zookeeper partition-offset with input from a file; patched by John Fung; reviewed by Jun Rao; KAFKA-255
Added:
incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala?rev=1291536&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala Tue Feb 21 00:26:42 2012
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.BufferedReader
+import java.io.FileReader
+import joptsimple._
+import kafka.utils.{Logging, ZkUtils,ZKStringSerializer}
+import org.I0Itec.zkclient.ZkClient
+
+
+/**
+ * A utility that updates the offset of broker partitions in ZK.
+ *
+ * This utility expects 2 input files as arguments:
+ * 1. consumer properties file
+ * 2. a file contains partition offsets data such as:
+ * (This output data file can be obtained by running kafka.tools.ExportZkOffsets)
+ *
+ * /consumers/group1/offsets/topic1/3-0:285038193
+ * /consumers/group1/offsets/topic1/1-0:286894308
+ *
+ * To print debug message, add the following line to log4j.properties:
+ * log4j.logger.kafka.tools.ImportZkOffsets$=DEBUG
+ * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin)
+ */
+object ImportZkOffsets extends Logging {
+
+ def main(args: Array[String]) {
+ val parser = new OptionParser
+
+ val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.")
+ .withRequiredArg()
+ .defaultsTo("localhost:2181")
+ .ofType(classOf[String])
+ val inFileOpt = parser.accepts("input-file", "Input file")
+ .withRequiredArg()
+ .ofType(classOf[String])
+ parser.accepts("help", "Print this message.")
+
+ val options = parser.parse(args : _*)
+
+ if (options.has("help")) {
+ parser.printHelpOn(System.out)
+ System.exit(0)
+ }
+
+ for (opt <- List(inFileOpt)) {
+ if (!options.has(opt)) {
+ System.err.println("Missing required argument: %s".format(opt))
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+
+ val zkConnect = options.valueOf(zkConnectOpt)
+ val partitionOffsetFile = options.valueOf(inFileOpt)
+
+ val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+ val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile)
+
+ updateZkOffsets(zkClient, partitionOffsets)
+ }
+
+ private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = {
+ val fr = new FileReader(filename)
+ val br = new BufferedReader(fr)
+ var partOffsetsMap: Map[String,String] = Map()
+
+ var s: String = br.readLine()
+ while ( s != null && s.length() >= 1) {
+ val tokens = s.split(":")
+
+ partOffsetsMap += tokens(0) -> tokens(1)
+ debug("adding node path [" + s + "]")
+
+ s = br.readLine()
+ }
+
+ return partOffsetsMap
+ }
+
+ private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = {
+ val cluster = ZkUtils.getCluster(zkClient)
+ var partitions: List[String] = Nil
+
+ for ((partition, offset) <- partitionOffsets) {
+ debug("updating [" + partition + "] with offset [" + offset + "]")
+
+ try {
+ ZkUtils.updatePersistentPath(zkClient, partition, offset.toString)
+ } catch {
+ case e => e.printStackTrace()
+ }
+ }
+ }
+}