You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/09/27 00:31:27 UTC

git commit: KAFKA-1618 Exception thrown when running console producer with no port number for the broker; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk 9c17747ba -> a9c7d7724


KAFKA-1618 Exception thrown when running console producer with no port number for the broker; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a9c7d772
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a9c7d772
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a9c7d772

Branch: refs/heads/trunk
Commit: a9c7d77249224666b1e5fcded213c516fdd46ac0
Parents: 9c17747
Author: BalajiSeshadri <ba...@dish.com>
Authored: Fri Sep 26 15:30:47 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Sep 26 15:30:49 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleProducer.scala     | 11 +++---
 .../main/scala/kafka/tools/GetOffsetShell.scala |  6 ++--
 .../scala/kafka/tools/ProducerPerformance.scala |  3 +-
 .../scala/kafka/tools/ReplayLogProducer.scala   |  3 +-
 .../kafka/tools/ReplicaVerificationTool.scala   |  4 ++-
 .../scala/kafka/tools/SimpleConsumerShell.scala |  4 ++-
 .../src/main/scala/kafka/utils/ToolsUtils.scala | 38 ++++++++++++++++++++
 7 files changed, 58 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index da4dad4..8e9ba0b 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -20,7 +20,7 @@ package kafka.tools
 import kafka.common._
 import kafka.message._
 import kafka.serializer._
-import kafka.utils.CommandLineUtils
+import kafka.utils.{ToolsUtils, CommandLineUtils}
 import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage}
 
 import java.util.Properties
@@ -129,24 +129,24 @@ object ConsoleProducer {
       .defaultsTo(3)
     val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.")
       .withRequiredArg
-      .ofType(classOf[java.lang.Long])
+      .ofType(classOf[java.lang.Integer])
       .defaultsTo(100)
     val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
       " a message will queue awaiting suffient batch size. The value is given in ms.")
       .withRequiredArg
       .describedAs("timeout_ms")
-      .ofType(classOf[java.lang.Long])
+      .ofType(classOf[java.lang.Integer])
       .defaultsTo(1000)
     val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " +
       " messages will queue awaiting suffient batch size.")
       .withRequiredArg
       .describedAs("queue_size")
-      .ofType(classOf[java.lang.Long])
+      .ofType(classOf[java.lang.Integer])
       .defaultsTo(10000)
     val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
       .withRequiredArg
       .describedAs("queue enqueuetimeout ms")
-      .ofType(classOf[java.lang.Long])
+      .ofType(classOf[java.lang.Integer])
       .defaultsTo(Int.MaxValue)
     val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
       .withRequiredArg
@@ -220,6 +220,7 @@ object ConsoleProducer {
     val useNewProducer = options.has(useNewProducerOpt)
     val topic = options.valueOf(topicOpt)
     val brokerList = options.valueOf(brokerListOpt)
+    ToolsUtils.validatePortOrDie(parser,brokerList)
     val sync = options.has(syncOpt)
     val compressionCodecOptionValue = options.valueOf(compressionCodecOpt)
     val compressionCodec = if (options.has(compressionCodecOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/tools/GetOffsetShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 9c6064e..3d9293e 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -23,7 +23,7 @@ import joptsimple._
 import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
 import kafka.common.TopicAndPartition
 import kafka.client.ClientUtils
-import kafka.utils.CommandLineUtils
+import kafka.utils.{ToolsUtils, CommandLineUtils}
 
 
 object GetOffsetShell {
@@ -66,7 +66,9 @@ object GetOffsetShell {
     CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt)
 
     val clientId = "GetOffsetShell"
-    val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
+    val brokerList = options.valueOf(brokerListOpt)
+    ToolsUtils.validatePortOrDie(parser, brokerList)
+    val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
     val topic = options.valueOf(topicOpt)
     var partitionList = options.valueOf(partitionOpt)
     var time = options.valueOf(timeOpt).longValue

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index fc3e724..f61c7c7 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -19,7 +19,7 @@ package kafka.tools
 
 import kafka.metrics.KafkaMetricsReporter
 import kafka.producer.{OldProducer, NewShinyProducer}
-import kafka.utils.{VerifiableProperties, Logging, CommandLineUtils}
+import kafka.utils.{ToolsUtils, VerifiableProperties, Logging, CommandLineUtils}
 import kafka.message.CompressionCodec
 import kafka.serializer._
 
@@ -132,6 +132,7 @@ object ProducerPerformance extends Logging {
     val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
     val hideHeader = options.has(hideHeaderOpt)
     val brokerList = options.valueOf(brokerListOpt)
+    ToolsUtils.validatePortOrDie(parser,brokerList)
     val messageSize = options.valueOf(messageSizeOpt).intValue
     var isFixedSize = !options.has(varyMessageSizeOpt)
     var isSync = options.has(syncOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 69be31c..3393a3d 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -21,7 +21,7 @@ import joptsimple.OptionParser
 import java.util.concurrent.{Executors, CountDownLatch}
 import java.util.Properties
 import kafka.consumer._
-import kafka.utils.{CommandLineUtils, Logging, ZkUtils}
+import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils}
 import kafka.api.OffsetRequest
 import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
 
@@ -110,6 +110,7 @@ object ReplayLogProducer extends Logging {
 
     val zkConnect = options.valueOf(zkConnectOpt)
     val brokerList = options.valueOf(brokerListOpt)
+    ToolsUtils.validatePortOrDie(parser,brokerList)
     val numMessages = options.valueOf(numMessagesOpt).intValue
     val numThreads = options.valueOf(numThreadsOpt).intValue
     val inputTopic = options.valueOf(inputTopicOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index af47836..ba6ddd7 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -116,7 +116,9 @@ object ReplicaVerificationTool extends Logging {
     val reportInterval = options.valueOf(reportIntervalOpt).longValue
     // getting topic metadata
     info("Getting topic metatdata...")
-    val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
+    val brokerList = options.valueOf(brokerListOpt)
+    ToolsUtils.validatePortOrDie(parser,brokerList)
+    val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
     val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs)
     val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap
     val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter(

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 36314f4..b4f903b 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -124,7 +124,9 @@ object SimpleConsumerShell extends Logging {
 
     // getting topic metadata
     info("Getting topic metatdata...")
-    val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
+    val brokerList = options.valueOf(brokerListOpt)
+    ToolsUtils.validatePortOrDie(parser,brokerList)
+    val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
     if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/utils/ToolsUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala
new file mode 100644
index 0000000..cfbf279
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.utils
+
+import joptsimple.OptionParser
+import scala.util.matching.Regex
+
+object ToolsUtils {
+
+  def validatePortOrDie(parser: OptionParser, hostPort: String) = {
+    val regex = new Regex(":[0-9]")
+    val hostPorts: Array[String] = if(hostPort.contains(','))
+      hostPort.split(",")
+    else
+      Array(hostPort)
+    val validHostPort = hostPorts.filter {
+      hostPortData =>
+        regex.findAllMatchIn(hostPortData).size > 0
+    }
+    val isValid = !(validHostPort.isEmpty) && validHostPort.size == hostPorts.length
+    if(!isValid)
+      CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092\n ")
+  }
+}