You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/22 21:36:13 UTC

kafka git commit: MINOR: Fix ClassCastException in ConsumerGroupCommand

Repository: kafka
Updated Branches:
  refs/heads/trunk f4a1ca347 -> 71a65d95a


MINOR: Fix ClassCastException in ConsumerGroupCommand

Offset and partition are not converted from String to
long and int correctly.

Running the command line with --from-file option causes
the following exception:

java.lang.ClassCastException: java.lang.String cannot be
cast to java.lang.Integer

Reason: asInstanceOf used for the conversion.

Also, unit test is using --to-earliest and --from-file
together when executing the test. This is executing
--to-earliest option only and ignoring --from-file
option. Since the preparation part is also using
--to-earliest to create the file, this unit test
passes without testing --from-file option. Fixed
the unit test too.

Author: Erkan Unal <eu...@cisco.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3938 from eu657/eu657-patch-1


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71a65d95
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71a65d95
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71a65d95

Branch: refs/heads/trunk
Commit: 71a65d95a257d98925b5f0f6f2227504cf5043a2
Parents: f4a1ca3
Author: Erkan Unal <eu...@cisco.com>
Authored: Fri Sep 22 22:34:05 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Sep 22 22:34:11 2017 +0100

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/ConsumerGroupCommand.scala |  4 ++--
 .../unit/kafka/admin/ResetConsumerGroupOffsetTest.scala   | 10 +++++-----
 2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/71a65d95/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index d42a062..21f23b7 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -566,8 +566,8 @@ object ConsumerGroupCommand extends Logging {
       resetPlanCsv.split("\n")
         .map { line =>
           val Array(topic, partition, offset) = line.split(",").map(_.trim)
-          val topicPartition = new TopicPartition(topic, partition.asInstanceOf[Int])
-          val offsetAndMetadata = new OffsetAndMetadata(offset.asInstanceOf[Long])
+          val topicPartition = new TopicPartition(topic, partition.toInt)
+          val offsetAndMetadata = new OffsetAndMetadata(offset.toLong)
           (topicPartition, offsetAndMetadata)
         }.toMap
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/71a65d95/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 6594b6e..6853b16 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -544,7 +544,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsExportImportPlan() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--export")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset","2", "--export")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -559,18 +559,18 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
       val bw = new BufferedWriter(new FileWriter(file))
       bw.write(consumerGroupCommand.exportOffsetsToReset(assignmentsToReset))
       bw.close()
-      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 } && file.exists()
+      assignmentsToReset.exists { assignment => assignment._2.offset() == 2 } && file.exists()
     }, "Expected the consume all messages and save reset offsets plan to file")
 
 
-    val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--from-file", file.getCanonicalPath)
+    val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--from-file", file.getCanonicalPath)
     val optsExec = new ConsumerGroupCommandOptions(cgcArgsExec)
     val consumerGroupCommandExec = createConsumerGroupService(optsExec)
 
     TestUtils.waitUntilTrue(() => {
         val assignmentsToReset = consumerGroupCommandExec.resetOffsets()
-        assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
-    }, "Expected the consumer group to reset to offset 0 (earliest) by file.")
+        assignmentsToReset.exists { assignment => assignment._2.offset() == 2 }
+    }, "Expected the consumer group to reset to offset 2 according to the plan in the file.")
 
     file.deleteOnExit()