You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/03 20:04:02 UTC

kafka git commit: KAFKA-2857; Retry querying the consumer group while initializing

Repository: kafka
Updated Branches:
  refs/heads/trunk a3c45b0c9 -> 573a6f398


KAFKA-2857; Retry querying the consumer group while initializing

This applies to new-consumer based groups and would avoid scenarios in which user issues a `--describe` query while the group is initializing.
Example: The following could occur for a newly created group.
```
kafkakafka:~/workspace/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

Error: Executing consumer group command failed due to The group coordinator is not available.
```

With this PR the group is queried repeatedly at specific intervals within a preset (and configurable) timeout `group-init-timeout` to circumvent unfortunate situations like above.

Author: Vahid Hashemian <va...@us.ibm.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2538 from vahidhashemian/KAFKA-2857


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/573a6f39
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/573a6f39
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/573a6f39

Branch: refs/heads/trunk
Commit: 573a6f39863061a6f38a0aca35f11470c3e8538e
Parents: a3c45b0
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Fri Mar 3 11:22:42 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Mar 3 12:01:38 2017 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/admin/AdminClient.scala    |  48 +++++--
 .../kafka/admin/ConsumerGroupCommand.scala      |  17 ++-
 .../main/scala/kafka/tools/StreamsResetter.java |   2 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala | 133 ++++++++-----------
 .../integration/ResetIntegrationTest.java       |   2 +-
 5 files changed, 110 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 7cfc91a..4b28460 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -16,18 +16,21 @@ import java.nio.ByteBuffer
 import java.util.{Collections, Properties}
 import java.util.concurrent.atomic.AtomicInteger
 
-import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
 import kafka.common.KafkaException
 import kafka.coordinator.GroupOverview
 import kafka.utils.Logging
+
 import org.apache.kafka.clients._
 import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
+import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.Selector
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
+import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
@@ -37,6 +40,7 @@ import scala.util.Try
 
 class AdminClient(val time: Time,
                   val requestTimeoutMs: Int,
+                  val retryBackoffMs: Int,
                   val client: ConsumerNetworkClient,
                   val bootstrapBrokers: List[Node]) extends Logging {
 
@@ -66,9 +70,19 @@ class AdminClient(val time: Time,
     throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers")
   }
 
-  def findCoordinator(groupId: String): Node = {
+  def findCoordinator(groupId: String, timeoutMs: Long = 0): Node = {
+    val startTime = time.milliseconds
     val requestBuilder = new GroupCoordinatorRequest.Builder(groupId)
-    val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
+    var response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
+
+    while (response.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE && time.milliseconds - startTime < timeoutMs) {
+      Thread.sleep(retryBackoffMs)
+      response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
+    }
+
+    if (response.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+      throw new TimeoutException("The consumer group command timed out while waiting for group to initialize: ", response.error.exception)
+
     response.error.maybeThrow()
     response.node
   }
@@ -165,18 +179,34 @@ class AdminClient(val time: Time,
                                   consumers: Option[List[ConsumerSummary]],
                                   coordinator: Node)
 
-  def describeConsumerGroup(groupId: String): ConsumerGroupSummary = {
-    val coordinator = findCoordinator(groupId)
+  def describeConsumerGroupHandler(coordinator: Node, groupId: String): GroupMetadata = {
     val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS,
         new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)))
     val response = responseBody.asInstanceOf[DescribeGroupsResponse]
     val metadata = response.groups.get(groupId)
     if (metadata == null)
       throw new KafkaException(s"Response from broker contained no metadata for group $groupId")
-    if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
-      throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group")
+    metadata
+  }
+
+  def describeConsumerGroup(groupId: String, timeoutMs: Long = 0): ConsumerGroupSummary = {
+
+    def isValidConsumerGroupResponse(metadata: DescribeGroupsResponse.GroupMetadata): Boolean =
+      metadata.error == Errors.NONE && (metadata.state == "Dead" || metadata.state == "Empty" || metadata.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+
+    val startTime = time.milliseconds
+    val coordinator = findCoordinator(groupId, timeoutMs)
+    var metadata = describeConsumerGroupHandler(coordinator, groupId)
+
+    while (!isValidConsumerGroupResponse(metadata) && time.milliseconds - startTime < timeoutMs) {
+      debug(s"The consumer group response for group '$groupId' is invalid. Retrying the request as the group is initializing ...")
+      Thread.sleep(retryBackoffMs)
+      metadata = describeConsumerGroupHandler(coordinator, groupId)
+    }
+
+    if (!isValidConsumerGroupResponse(metadata))
+      throw new TimeoutException("The consumer group command timed out while waiting for group to initialize")
 
-    metadata.error.maybeThrow()
     val consumers = metadata.members.asScala.map { consumer =>
       ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match {
         case "Stable" =>
@@ -204,6 +234,7 @@ object AdminClient {
   val DefaultSendBufferBytes = 128 * 1024
   val DefaultReceiveBufferBytes = 32 * 1024
   val DefaultRetryBackoffMs = 100
+
   val AdminClientIdSequence = new AtomicInteger(1)
   val AdminConfigDef = {
     val config = new ConfigDef()
@@ -274,6 +305,7 @@ object AdminClient {
     new AdminClient(
       time,
       DefaultRequestTimeoutMs,
+      DefaultRetryBackoffMs,
       highLevelClient,
       bootstrapCluster.nodes.asScala.toList)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 11f4f89..caad62a 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -396,7 +396,7 @@ object ConsumerGroupCommand extends Logging {
     }
 
     protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
-      val consumerGroupSummary = adminClient.describeConsumerGroup(group)
+      val consumerGroupSummary = adminClient.describeConsumerGroup(group, opts.options.valueOf(opts.timeoutMsOpt))
       (Some(consumerGroupSummary.state),
         consumerGroupSummary.consumers match {
           case None =>
@@ -502,7 +502,11 @@ object ConsumerGroupCommand extends Logging {
       "for every consumer group. For instance --topic t1" + nl +
       "WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active."
     val NewConsumerDoc = "Use new consumer. This is the default."
+    val TimeoutMsDoc = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
+      "to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " +
+      "or is going through some changes)."
     val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer."
+
     val parser = new OptionParser
     val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
                              .withRequiredArg
@@ -524,6 +528,11 @@ object ConsumerGroupCommand extends Logging {
     val describeOpt = parser.accepts("describe", DescribeDoc)
     val deleteOpt = parser.accepts("delete", DeleteDoc)
     val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc)
+    val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc)
+                             .withRequiredArg
+                             .describedAs("timeout (ms)")
+                             .ofType(classOf[Long])
+                             .defaultsTo(5000)
     val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
                                   .withRequiredArg
                                   .describedAs("command config property file")
@@ -531,11 +540,15 @@ object ConsumerGroupCommand extends Logging {
     val options = parser.parse(args : _*)
 
     val useOldConsumer = options.has(zkConnectOpt)
+    val describeOptPresent = options.has(describeOpt)
 
     val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
 
     def checkArgs() {
       // check required args
+      if (options.has(timeoutMsOpt) && (!describeOptPresent || useOldConsumer))
+        debug(s"Option '$timeoutMsOpt' is applicable only when both '$bootstrapServerOpt' and '$describeOpt' are used.")
+
       if (useOldConsumer) {
         if (options.has(bootstrapServerOpt))
           CommandLineUtils.printUsageAndDie(parser, s"Option '$bootstrapServerOpt' is not valid with '$zkConnectOpt'.")
@@ -550,7 +563,7 @@ object ConsumerGroupCommand extends Logging {
             "committed offset for that group expires.")
       }
 
-      if (options.has(describeOpt))
+      if (describeOptPresent)
         CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
       if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))
         CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index a61c092..83166cd 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -91,7 +91,7 @@ public class StreamsResetter {
 
             adminClient = AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
             final String groupId = options.valueOf(applicationIdOption);
-            if (!adminClient.describeConsumerGroup(groupId).consumers().get().isEmpty()) {
+            if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) {
                 throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " +
                     "Make sure to stop all running application instances before running the reset tool.");
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 8e10a87..905d113 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -23,6 +23,7 @@ import java.util.Collections
 import java.util.Properties
 
 import org.easymock.EasyMock
+import org.junit.Assert._
 import org.junit.Before
 import org.junit.Test
 
@@ -35,10 +36,11 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 
+import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException
+import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.serialization.StringDeserializer
-import org.apache.kafka.clients.consumer.KafkaConsumer
 
 
 class DescribeConsumerGroupTest extends KafkaServerTestHarness {
@@ -179,21 +181,8 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
-    TestUtils.waitUntilTrue(() => {
-        try {
-          val (state, assignments) = consumerGroupCommand.describeGroup()
-          println(state == Some("Dead") && assignments == Some(List()))
-          state == Some("Dead") && assignments == Some(List())
-        } catch {
-          case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
-            // Do nothing while the group initializes
-            false
-          case e: Throwable =>
-            e.printStackTrace()
-            throw e
-        }
-      }, "Expected the state to be 'Dead' with no members in the group.")
-
+    val (state, assignments) = consumerGroupCommand.describeGroup()
+    assertTrue("Expected the state to be 'Dead' with no members in the group.", state == Some("Dead") && assignments == Some(List()))
     consumerGroupCommand.close()
   }
 
@@ -207,21 +196,13 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
-        try {
-          val (state, assignments) = consumerGroupCommand.describeGroup()
-          state == Some("Stable") &&
-          assignments.isDefined &&
-          assignments.get.count(_.group == group) == 1 &&
-          assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
-          assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
-          assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-        } catch {
-          case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
-            // Do nothing while the group initializes
-            false
-          case e: Throwable =>
-            throw e
-        }
+        val (state, assignments) = consumerGroupCommand.describeGroup()
+        state == Some("Stable") &&
+        assignments.isDefined &&
+        assignments.get.count(_.group == group) == 1 &&
+        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+        assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+        assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
       }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe group results.")
 
     consumerGroupCommand.close()
@@ -237,40 +218,24 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
-        try {
-          val (state, _) = consumerGroupCommand.describeGroup()
-          state == Some("Stable")
-        } catch {
-          case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
-            // Do nothing while the group initializes
-            false
-          case e: Throwable =>
-            throw e
-        }
+        val (state, _) = consumerGroupCommand.describeGroup()
+        state == Some("Stable")
       }, "Expected the group to initially become stable.")
 
     // stop the consumer so the group has no active member anymore
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
-        try {
-          val (state, assignments) = consumerGroupCommand.describeGroup()
-          state == Some("Empty") &&
-          assignments.isDefined &&
-          assignments.get.count(_.group == group) == 1 &&
-          assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
-          assignments.get.filter(_.group == group).head.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
-          assignments.get.filter(_.group == group).head.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-        } catch {
-          case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
-            // Do nothing while the group initializes
-            false
-          case e: Throwable =>
-            throw e
-        } finally {
-          consumerGroupCommand.close()
-        }
+        val (state, assignments) = consumerGroupCommand.describeGroup()
+        state == Some("Empty") &&
+        assignments.isDefined &&
+        assignments.get.count(_.group == group) == 1 &&
+        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
+        assignments.get.filter(_.group == group).head.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+        assignments.get.filter(_.group == group).head.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
       }, "Expected no active member in describe group results.")
+
+    consumerGroupCommand.close()
   }
 
   @Test
@@ -283,20 +248,12 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
-        try {
-          val (state, assignments) = consumerGroupCommand.describeGroup()
-          state == Some("Stable") &&
-          assignments.isDefined &&
-          assignments.get.count(_.group == group) == 2 &&
-          assignments.get.count{ x => x.group == group && x.partition.isDefined} == 1 &&
-          assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 1
-        } catch {
-          case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
-            // Do nothing while the group initializes
-            false
-          case e: Throwable =>
-            throw e
-        }
+        val (state, assignments) = consumerGroupCommand.describeGroup()
+        state == Some("Stable") &&
+        assignments.isDefined &&
+        assignments.get.count(_.group == group) == 2 &&
+        assignments.get.count{ x => x.group == group && x.partition.isDefined} == 1 &&
+        assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 1
       }, "Expected rows for consumers with no assigned partitions in describe group results.")
 
     consumerGroupCommand.close()
@@ -315,24 +272,40 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
-        try {
           val (state, assignments) = consumerGroupCommand.describeGroup()
           state == Some("Stable") &&
           assignments.isDefined &&
           assignments.get.count(_.group == group) == 2 &&
           assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
           assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 0
-        } catch {
-          case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
-            // Do nothing while the group initializes
-            false
-          case e: Throwable =>
-            throw e
-        }
       }, "Expected two rows (one row per consumer) in describe group results.")
 
     consumerGroupCommand.close()
   }
+
+  @Test
+  def testDescribeGroupWithNewConsumerWithShortInitializationTimeout() {
+    // run one consumer in the group consuming from a single-partition topic
+    val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+
+    // set the group initialization timeout too low for the group to stabilize
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "group", "--timeout", "10")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    try {
+      val (state, assignments) = consumerGroupCommand.describeGroup()
+      fail("The consumer group command should fail due to low initialization timeout")
+    } catch {
+      case e: TimeoutException =>
+        // OK
+      case e: Throwable =>
+        fail("An unexpected exception occurred: " + e.getMessage)
+        throw e
+    } finally {
+      consumerGroupCommand.close()
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 3248b2a..4804bfb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -421,7 +421,7 @@ public class ResetIntegrationTest {
     private class WaitUntilConsumerGroupGotClosed implements TestCondition {
         @Override
         public boolean conditionMet() {
-            return adminClient.describeConsumerGroup(APP_ID + testNo).consumers().get().isEmpty();
+            return adminClient.describeConsumerGroup(APP_ID + testNo, 0).consumers().get().isEmpty();
         }
     }