You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/03/04 19:11:51 UTC
[kafka] branch 2.1 updated: KAFKA-7312: Change broker port used in
testMinimumRequestTimeouts and testForceClose
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 9b28e96 KAFKA-7312: Change broker port used in testMinimumRequestTimeouts and testForceClose
9b28e96 is described below
commit 9b28e96a1942cddefdb0db7276f73e12d0b162e6
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Tue Mar 5 00:38:15 2019 +0530
KAFKA-7312: Change broker port used in testMinimumRequestTimeouts and testForceClose
Port 22 is used by ssh, which causes the AdminClient to throw an OOM:
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
> at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
> at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
> at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:640)
> at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:561)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:472)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
> at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1140)
> at java.lang.Thread.run(Thread.java:748)
>
>
Author: Manikumar Reddy <ma...@gmail.com>
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #6360 from omkreddy/KAFKA-7312
(cherry picked from commit 9ee5f920d5e4b837c3240dff948e120aaef7cd23)
Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
.../scala/integration/kafka/api/AdminClientIntegrationTest.scala | 6 +++---
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 4 ++++
2 files changed, 7 insertions(+), 3 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index ca23e1f..58de351 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -1054,7 +1054,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testForceClose(): Unit = {
val config = createConfig()
- config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
client = AdminClient.create(config)
// Because the bootstrap servers are set up incorrectly, this call will not complete, but must be
// cancelled by the close operation.
@@ -1071,7 +1071,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testMinimumRequestTimeouts(): Unit = {
val config = createConfig()
- config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0")
client = AdminClient.create(config)
val startTimeMs = Time.SYSTEM.milliseconds()
@@ -1079,7 +1079,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
new CreateTopicsOptions().timeoutMs(2)).all()
assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
val endTimeMs = Time.SYSTEM.milliseconds()
- assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs);
+ assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs)
}
/**
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 9d5906f..b4576b3 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -71,6 +71,10 @@ object TestUtils extends Logging {
/* 0 gives a random port; you can then retrieve the assigned port from the Socket object. */
val RandomPort = 0
+ /* Incorrect broker port which can used by kafka clients in tests. This port should not be used
+ by any other service and hence we use a reserved port. */
+ val IncorrectBrokerPort = 225
+
/** Port to use for unit tests that mock/don't require a real ZK server. */
val MockZkPort = 1
/** ZooKeeper connection string to use for unit tests that mock/don't require a real ZK server. */