You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/03/04 19:11:31 UTC

[kafka] branch 2.2 updated: KAFKA-7312: Change broker port used in testMinimumRequestTimeouts and testForceClose

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 72478d5  KAFKA-7312: Change broker port used in testMinimumRequestTimeouts and testForceClose
72478d5 is described below

commit 72478d5f1095941c3005ff2880d7bd2d554f9c8c
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Tue Mar 5 00:38:15 2019 +0530

    KAFKA-7312: Change broker port used in testMinimumRequestTimeouts and testForceClose
    
    Port 22 is used by ssh, which causes the AdminClient to throw an OOM:
    
    > java.lang.OutOfMemoryError: Java heap space
    > 	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    > 	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    > 	at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
    > 	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
    > 	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    > 	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    > 	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:640)
    > 	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:561)
    > 	at org.apache.kafka.common.network.Selector.poll(Selector.java:472)
    > 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
    > 	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1140)
    > 	at java.lang.Thread.run(Thread.java:748)
    >
    >
    
    Author: Manikumar Reddy <ma...@gmail.com>
    Author: Ismael Juma <is...@juma.me.uk>
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
    
    Closes #6360 from omkreddy/KAFKA-7312
    
    (cherry picked from commit 9ee5f920d5e4b837c3240dff948e120aaef7cd23)
    Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
 .../scala/integration/kafka/api/AdminClientIntegrationTest.scala    | 6 +++---
 core/src/test/scala/unit/kafka/utils/TestUtils.scala                | 4 ++++
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 8356ad5..e740283 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -1045,7 +1045,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
   @Test
   def testForceClose(): Unit = {
     val config = createConfig()
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
     client = AdminClient.create(config)
     // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be
     // cancelled by the close operation.
@@ -1062,7 +1062,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
   @Test
   def testMinimumRequestTimeouts(): Unit = {
     val config = createConfig()
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
     config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0")
     client = AdminClient.create(config)
     val startTimeMs = Time.SYSTEM.milliseconds()
@@ -1070,7 +1070,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
       new CreateTopicsOptions().timeoutMs(2)).all()
     assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
     val endTimeMs = Time.SYSTEM.milliseconds()
-    assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs);
+    assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ecdb8c8..c582bc5 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -71,6 +71,10 @@ object TestUtils extends Logging {
   /* 0 gives a random port; you can then retrieve the assigned port from the Socket object. */
   val RandomPort = 0
 
+  /* Incorrect broker port which can used by kafka clients in tests. This port should not be used
+   by any other service and hence we use a reserved port. */
+  val IncorrectBrokerPort = 225
+
   /** Port to use for unit tests that mock/don't require a real ZK server. */
   val MockZkPort = 1
   /** ZooKeeper connection string to use for unit tests that mock/don't require a real ZK server. */