You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/09/27 00:31:27 UTC
git commit: KAFKA-1618 Exception thrown when running console producer
with no port number for the broker; reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk 9c17747ba -> a9c7d7724
KAFKA-1618 Exception thrown when running console producer with no port number for the broker; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a9c7d772
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a9c7d772
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a9c7d772
Branch: refs/heads/trunk
Commit: a9c7d77249224666b1e5fcded213c516fdd46ac0
Parents: 9c17747
Author: BalajiSeshadri <ba...@dish.com>
Authored: Fri Sep 26 15:30:47 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Sep 26 15:30:49 2014 -0700
----------------------------------------------------------------------
.../scala/kafka/tools/ConsoleProducer.scala | 11 +++---
.../main/scala/kafka/tools/GetOffsetShell.scala | 6 ++--
.../scala/kafka/tools/ProducerPerformance.scala | 3 +-
.../scala/kafka/tools/ReplayLogProducer.scala | 3 +-
.../kafka/tools/ReplicaVerificationTool.scala | 4 ++-
.../scala/kafka/tools/SimpleConsumerShell.scala | 4 ++-
.../src/main/scala/kafka/utils/ToolsUtils.scala | 38 ++++++++++++++++++++
7 files changed, 58 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index da4dad4..8e9ba0b 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -20,7 +20,7 @@ package kafka.tools
import kafka.common._
import kafka.message._
import kafka.serializer._
-import kafka.utils.CommandLineUtils
+import kafka.utils.{ToolsUtils, CommandLineUtils}
import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage}
import java.util.Properties
@@ -129,24 +129,24 @@ object ConsoleProducer {
.defaultsTo(3)
val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.")
.withRequiredArg
- .ofType(classOf[java.lang.Long])
+ .ofType(classOf[java.lang.Integer])
.defaultsTo(100)
val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
" a message will queue awaiting suffient batch size. The value is given in ms.")
.withRequiredArg
.describedAs("timeout_ms")
- .ofType(classOf[java.lang.Long])
+ .ofType(classOf[java.lang.Integer])
.defaultsTo(1000)
val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " +
" messages will queue awaiting suffient batch size.")
.withRequiredArg
.describedAs("queue_size")
- .ofType(classOf[java.lang.Long])
+ .ofType(classOf[java.lang.Integer])
.defaultsTo(10000)
val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
.withRequiredArg
.describedAs("queue enqueuetimeout ms")
- .ofType(classOf[java.lang.Long])
+ .ofType(classOf[java.lang.Integer])
.defaultsTo(Int.MaxValue)
val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
.withRequiredArg
@@ -220,6 +220,7 @@ object ConsoleProducer {
val useNewProducer = options.has(useNewProducerOpt)
val topic = options.valueOf(topicOpt)
val brokerList = options.valueOf(brokerListOpt)
+ ToolsUtils.validatePortOrDie(parser,brokerList)
val sync = options.has(syncOpt)
val compressionCodecOptionValue = options.valueOf(compressionCodecOpt)
val compressionCodec = if (options.has(compressionCodecOpt))
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/tools/GetOffsetShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 9c6064e..3d9293e 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -23,7 +23,7 @@ import joptsimple._
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
import kafka.common.TopicAndPartition
import kafka.client.ClientUtils
-import kafka.utils.CommandLineUtils
+import kafka.utils.{ToolsUtils, CommandLineUtils}
object GetOffsetShell {
@@ -66,7 +66,9 @@ object GetOffsetShell {
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt)
val clientId = "GetOffsetShell"
- val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
+ val brokerList = options.valueOf(brokerListOpt)
+ ToolsUtils.validatePortOrDie(parser, brokerList)
+ val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val topic = options.valueOf(topicOpt)
var partitionList = options.valueOf(partitionOpt)
var time = options.valueOf(timeOpt).longValue
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index fc3e724..f61c7c7 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -19,7 +19,7 @@ package kafka.tools
import kafka.metrics.KafkaMetricsReporter
import kafka.producer.{OldProducer, NewShinyProducer}
-import kafka.utils.{VerifiableProperties, Logging, CommandLineUtils}
+import kafka.utils.{ToolsUtils, VerifiableProperties, Logging, CommandLineUtils}
import kafka.message.CompressionCodec
import kafka.serializer._
@@ -132,6 +132,7 @@ object ProducerPerformance extends Logging {
val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
val hideHeader = options.has(hideHeaderOpt)
val brokerList = options.valueOf(brokerListOpt)
+ ToolsUtils.validatePortOrDie(parser,brokerList)
val messageSize = options.valueOf(messageSizeOpt).intValue
var isFixedSize = !options.has(varyMessageSizeOpt)
var isSync = options.has(syncOpt)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 69be31c..3393a3d 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -21,7 +21,7 @@ import joptsimple.OptionParser
import java.util.concurrent.{Executors, CountDownLatch}
import java.util.Properties
import kafka.consumer._
-import kafka.utils.{CommandLineUtils, Logging, ZkUtils}
+import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils}
import kafka.api.OffsetRequest
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
@@ -110,6 +110,7 @@ object ReplayLogProducer extends Logging {
val zkConnect = options.valueOf(zkConnectOpt)
val brokerList = options.valueOf(brokerListOpt)
+ ToolsUtils.validatePortOrDie(parser,brokerList)
val numMessages = options.valueOf(numMessagesOpt).intValue
val numThreads = options.valueOf(numThreadsOpt).intValue
val inputTopic = options.valueOf(inputTopicOpt)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index af47836..ba6ddd7 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -116,7 +116,9 @@ object ReplicaVerificationTool extends Logging {
val reportInterval = options.valueOf(reportIntervalOpt).longValue
// getting topic metadata
info("Getting topic metatdata...")
- val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
+ val brokerList = options.valueOf(brokerListOpt)
+ ToolsUtils.validatePortOrDie(parser,brokerList)
+ val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs)
val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap
val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter(
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 36314f4..b4f903b 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -124,7 +124,9 @@ object SimpleConsumerShell extends Logging {
// getting topic metadata
info("Getting topic metatdata...")
- val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
+ val brokerList = options.valueOf(brokerListOpt)
+ ToolsUtils.validatePortOrDie(parser,brokerList)
+ val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9c7d772/core/src/main/scala/kafka/utils/ToolsUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala
new file mode 100644
index 0000000..cfbf279
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.utils
+
+import joptsimple.OptionParser
+import scala.util.matching.Regex
+
+object ToolsUtils {
+
+ def validatePortOrDie(parser: OptionParser, hostPort: String) = {
+ val regex = new Regex(":[0-9]")
+ val hostPorts: Array[String] = if(hostPort.contains(','))
+ hostPort.split(",")
+ else
+ Array(hostPort)
+ val validHostPort = hostPorts.filter {
+ hostPortData =>
+ regex.findAllMatchIn(hostPortData).size > 0
+ }
+ val isValid = !(validHostPort.isEmpty) && validHostPort.size == hostPorts.length
+ if(!isValid)
+ CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092\n ")
+ }
+}