You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2018/08/25 19:27:48 UTC

[1/2] impala git commit: [DOCS] Known Issues for 3.1 (WIP)

Repository: impala
Updated Branches:
  refs/heads/master b206aeb71 -> a8f8c8d6f


[DOCS] Known Issues for 3.1 (WIP)

Change-Id: I247438f28835c1986deca39e98cd7deb4dc20351
Reviewed-on: http://gerrit.cloudera.org:8080/11323
Reviewed-by: Alex Rodoni <ar...@cloudera.com>
Tested-by: Alex Rodoni <ar...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/71b36fe0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/71b36fe0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/71b36fe0

Branch: refs/heads/master
Commit: 71b36fe08bedb2d9129e2d8d0560f0bc0a055e3e
Parents: b206aeb
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Fri Aug 24 15:25:50 2018 -0700
Committer: Alex Rodoni <ar...@cloudera.com>
Committed: Fri Aug 24 22:33:37 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_known_issues.xml | 166 -------------------------------
 1 file changed, 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/71b36fe0/docs/topics/impala_known_issues.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_known_issues.xml b/docs/topics/impala_known_issues.xml
index 5b02538..662132a 100644
--- a/docs/topics/impala_known_issues.xml
+++ b/docs/topics/impala_known_issues.xml
@@ -154,116 +154,6 @@ under the License.
 
     </concept>
 
-    <concept id="IMPALA-3316">
-
-      <title>Slow queries for Parquet tables with convert_legacy_hive_parquet_utc_timestamps=true</title>
-
-      <conbody>
-
-        <p>
-          The configuration setting
-          <codeph>convert_legacy_hive_parquet_utc_timestamps=true</codeph> uses an underlying
-          function that can be a bottleneck on high volume, highly concurrent queries due to the
-          use of a global lock while loading time zone information. This bottleneck can cause
-          slowness when querying Parquet tables, up to 30x for scan-heavy queries. The amount of
-          slowdown depends on factors such as the number of cores and number of threads involved
-          in the query.
-        </p>
-
-        <note>
-          <p>
-            The slowdown only occurs when accessing <codeph>TIMESTAMP</codeph> columns within
-            Parquet files that were generated by Hive, and therefore require the on-the-fly
-            timezone conversion processing.
-          </p>
-        </note>
-
-        <p>
-          <b>Bug:</b> <xref keyref="IMPALA-3316">IMPALA-3316</xref>
-        </p>
-
-        <p>
-          <b>Severity:</b> High
-        </p>
-
-        <p>
-          <b>Workaround:</b>Store the <codeph>TIMESTAMP</codeph> values as
-          strings in one of the following formats:
-          <ul>
-            <li><codeph>yyyy-MM-dd</codeph></li>
-            <li><codeph>yyyy-MM-dd HH:mm:ss</codeph></li>
-            <li><codeph>yyyy-MM-dd HH:mm:ss.SSSSSSSSS</codeph>
-              <p>The date can
-                have the 1-9 digits in the fractional part.
-              </p>
-            </li>
-          </ul>
-          Impala implicitly converts such string values to
-            <codeph>TIMESTAMP</codeph> in calls to date/time functions.
-        </p>
-
-      </conbody>
-
-    </concept>
-
-    <concept id="ki_file_handle_cache">
-
-      <title>Interaction of File Handle Cache with HDFS Appends and Short-Circuit Reads</title>
-
-      <conbody>
-
-        <p>
-          If a data file used by Impala is being continuously appended or overwritten in place
-          by an HDFS mechanism, such as <cmdname>hdfs dfs -appendToFile</cmdname>, interaction
-          with the file handle caching feature in <keyword keyref="impala210_full"/> and higher
-          could cause short-circuit reads to sometimes be disabled on some DataNodes. When a
-          mismatch is detected between the cached file handle and a data block that was
-          rewritten because of an append, short-circuit reads are turned off on the affected
-          host for a 10-minute period.
-        </p>
-
-        <p>
-          The possibility of encountering such an issue is the reason why the file handle
-          caching feature is currently turned off by default. See
-          <xref keyref="scalability_file_handle_cache"/> for information about this feature and
-          how to enable it.
-        </p>
-
-        <p>
-          <b>Bug:</b>
-          <xref href="https://issues.apache.org/jira/browse/HDFS-12528"
-            scope="external" format="html">HDFS-12528</xref>
-        </p>
-
-        <p>
-          <b>Severity:</b> High
-        </p>
-
-        <p>
-          <b>Workaround:</b> Verify whether your ETL process is susceptible to this issue before
-          enabling the file handle caching feature. You can set the <cmdname>impalad</cmdname>
-          configuration option <codeph>unused_file_handle_timeout_sec</codeph> to a time period
-          that is shorter than the HDFS setting
-          <codeph>dfs.client.read.shortcircuit.streams.cache.expiry.ms</codeph>. (Keep in mind
-          that the HDFS setting is in milliseconds while the Impala setting is in seconds.)
-        </p>
-
-        <p>
-          <b>Resolution:</b> Fixed in HDFS 2.10 and higher. Use the new HDFS parameter
-          <codeph>dfs.domain.socket.disable.interval.seconds</codeph> to specify the amount of
-          time that short circuit reads are disabled on encountering an error. The default value
-          is 10 minutes (<codeph>600</codeph> seconds). It is recommended that you set
-          <codeph>dfs.domain.socket.disable.interval.seconds</codeph> to a small value, such as
-          <codeph>1</codeph> second, when using the file handle cache. Setting <codeph>
-          dfs.domain.socket.disable.interval.seconds</codeph> to <codeph>0</codeph> is not
-          recommended as a non-zero interval protects the system if there is a persistent
-          problem with short circuit reads.
-        </p>
-
-      </conbody>
-
-    </concept>
-
   </concept>
 
 <!--<concept id="known_issues_usability"><title id="ki_usability">Impala Known Issues: Usability</title><conbody><p> These issues affect the convenience of interacting directly with Impala, typically through the Impala shell or Hue. </p></conbody></concept>-->
@@ -572,62 +462,6 @@ explain SELECT 1 FROM alltypestiny a1
 
     </concept>
 
-    <concept id="IMPALA-3006" rev="IMPALA-3006">
-
-      <title>Impala may use incorrect bit order with BIT_PACKED encoding</title>
-
-      <conbody>
-
-        <p>
-          Parquet <codeph>BIT_PACKED</codeph> encoding as implemented by Impala is LSB first.
-          The parquet standard says it is MSB first.
-        </p>
-
-        <p>
-          <b>Bug:</b> <xref keyref="IMPALA-3006">IMPALA-3006</xref>
-        </p>
-
-        <p>
-          <b>Severity:</b> High, but rare in practice because BIT_PACKED is infrequently used,
-          is not written by Impala, and is deprecated in Parquet 2.0.
-        </p>
-
-      </conbody>
-
-    </concept>
-
-    <concept id="IMPALA-3082" rev="IMPALA-3082">
-
-      <title>BST between 1972 and 1995</title>
-
-      <conbody>
-
-        <p>
-          The calculation of start and end times for the BST (British Summer Time) time zone
-          could be incorrect between 1972 and 1995. Between 1972 and 1995, BST began and ended
-          at 02:00 GMT on the third Sunday in March (or second Sunday when Easter fell on the
-          third) and fourth Sunday in October. For example, both function calls should return
-          13, but actually return 12, in a query such as:
-        </p>
-
-<codeblock>
-select
-  extract(from_utc_timestamp(cast('1970-01-01 12:00:00' as timestamp), 'Europe/London'), "hour") summer70start,
-  extract(from_utc_timestamp(cast('1970-12-31 12:00:00' as timestamp), 'Europe/London'), "hour") summer70end;
-</codeblock>
-
-        <p>
-          <b>Bug:</b> <xref keyref="IMPALA-3082">IMPALA-3082</xref>
-        </p>
-
-        <p>
-          <b>Severity:</b> High
-        </p>
-
-      </conbody>
-
-    </concept>
-
     <concept id="IMPALA-2422" rev="IMPALA-2422">
 
       <title>% escaping does not work correctly when occurs at the end in a LIKE clause</title>


[2/2] impala git commit: IMPALA-7476: Use context to limit scope of StatestoreSubscriber

Posted by jo...@apache.org.
IMPALA-7476: Use context to limit scope of StatestoreSubscriber

StatestoreSubscribers are outliving the end of
test_statestore.py, causing continued interaction with
the statestored with a lot of log spew in statestored.INFO
and the main test log.

This change converts StatestoreSubscriber to use a
Python context manager. It updates all the tests to
use this context manager. This ensures that the
StatestoreSubscriber runs kill() at the end of
each test.

There is also a circular reference between
StatestoreSubscriber and KillableThreadedServer that
prevents cleanup. This resolves the circular reference
by nulling out some variables on
KillableThreadedServer::shutdown().

Testing:
 - Verified log spew is gone
 - StatestoreSubscriber::__del__ is called for 16 out
   of 17 tests (exception: test_hung_heartbeat) where
   before it was called for none

Change-Id: Ie71fac6095cd94343f68b586238aed410ebbca39
Reviewed-on: http://gerrit.cloudera.org:8080/11326
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a8f8c8d6
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a8f8c8d6
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a8f8c8d6

Branch: refs/heads/master
Commit: a8f8c8d6f58d750c6949ff89e6f88f608d1ce462
Parents: 71b36fe
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Thu Aug 23 15:24:09 2018 -0700
Committer: Joe McDonnell <jo...@cloudera.com>
Committed: Sat Aug 25 19:26:34 2018 +0000

----------------------------------------------------------------------
 tests/statestore/test_statestore.py | 262 ++++++++++++++++---------------
 1 file changed, 137 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a8f8c8d6/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index d45deeb..8f26b63 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -104,6 +104,9 @@ class KillableThreadedServer(TServer):
     self.is_shutdown = True
     self.serverTransport.close()
     self.wait_until_down()
+    # The processor contains a reference to a StatestoreSubscriber. Clean up that
+    # reference to avoid a circular reference that would prevent object deletion.
+    self.processor = None
 
   def wait_until_up(self, num_tries=10):
     for i in xrange(num_tries):
@@ -180,6 +183,13 @@ class StatestoreSubscriber(object):
     self.subscriber_id = "python-test-client-%s" % uuid.uuid1()
     self.exception = None
 
+  def __enter__(self):
+    return self
+
+  def __exit__(self, *args):
+    self.kill()
+    self.wait_for_failure()
+
   def Heartbeat(self, args):
     """Heartbeat RPC handler. Calls heartbeat callback if one exists."""
     self.heartbeat_event.acquire()
@@ -241,8 +251,10 @@ class StatestoreSubscriber(object):
   def kill(self):
     """Closes both the server and client sockets, and waits for the server to become
     unavailable"""
-    self.client_transport.close()
-    self.server.shutdown()
+    if self.client_transport:
+      self.client_transport.close()
+    if self.server:
+      self.server.shutdown()
     return self
 
   def start(self):
@@ -335,17 +347,17 @@ class TestStatestore():
   def test_registration_ids_different(self):
     """Test that if a subscriber with the same id registers twice, the registration ID is
     different"""
-    sub = StatestoreSubscriber()
-    sub.start().register()
-    old_reg_id = sub.registration_id
-    sub.register()
-    assert old_reg_id != sub.registration_id
+    with StatestoreSubscriber() as sub:
+      sub.start().register()
+      old_reg_id = sub.registration_id
+      sub.register()
+      assert old_reg_id != sub.registration_id
 
   def test_receive_heartbeats(self):
     """Smoke test to confirm that heartbeats get sent to a correctly registered
     subscriber"""
-    sub = StatestoreSubscriber()
-    sub.start().register().wait_for_heartbeat(5)
+    with StatestoreSubscriber() as sub:
+      sub.start().register().wait_for_heartbeat(5)
 
   def test_receive_updates(self):
     """Test that updates are correctly received when a subscriber alters a topic"""
@@ -370,13 +382,13 @@ class TestStatestore():
 
       return DEFAULT_UPDATE_STATE_RESPONSE
 
-    sub = StatestoreSubscriber(update_cb=topic_update_correct)
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
-    (
-      sub.start()
-         .register(topics=[reg])
-         .wait_for_update(topic_name, 3)
-    )
+    with StatestoreSubscriber(update_cb=topic_update_correct) as sub:
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 3)
+      )
 
   def test_filter_prefix(self):
     topic_name = "topic_delta_%s" % uuid.uuid1()
@@ -418,14 +430,14 @@ class TestStatestore():
 
       return DEFAULT_UPDATE_STATE_RESPONSE
 
-    sub = StatestoreSubscriber(update_cb=topic_update_correct)
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=False,
-                             filter_prefix="bar")
-    (
-      sub.start()
-         .register(topics=[reg])
-         .wait_for_update(topic_name, 5)
-    )
+    with StatestoreSubscriber(update_cb=topic_update_correct) as sub:
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=False,
+                               filter_prefix="bar")
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 5)
+      )
 
   def test_update_is_delta(self):
     """Test that the 'is_delta' flag is correctly set. The first update for a topic should
@@ -452,13 +464,13 @@ class TestStatestore():
 
       return DEFAULT_UPDATE_STATE_RESPONSE
 
-    sub = StatestoreSubscriber(update_cb=check_delta)
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
-    (
-      sub.start()
-         .register(topics=[reg])
-         .wait_for_update(topic_name, 3)
-    )
+    with StatestoreSubscriber(update_cb=check_delta) as sub:
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 3)
+      )
 
   def test_skipped(self):
     """Test that skipping an update causes it to be resent"""
@@ -478,39 +490,39 @@ class TestStatestore():
       assert len(args.topic_deltas[topic_name].topic_entries) == 1
       return TUpdateStateResponse(status=STATUS_OK, skipped=True)
 
-    sub = StatestoreSubscriber(update_cb=check_skipped)
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
-    (
-      sub.start()
-         .register(topics=[reg])
-         .wait_for_update(topic_name, 3)
-    )
+    with StatestoreSubscriber(update_cb=check_skipped) as sub:
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 3)
+      )
 
   def test_failure_detected(self):
-    sub = StatestoreSubscriber()
-    topic_name = "test_failure_detected"
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
-    (
-      sub.start()
-         .register(topics=[reg])
-         .wait_for_update(topic_name, 1)
-         .kill()
-         .wait_for_failure()
-    )
+    with StatestoreSubscriber() as sub:
+      topic_name = "test_failure_detected"
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 1)
+           .kill()
+           .wait_for_failure()
+       )
 
   def test_hung_heartbeat(self):
     """Test for IMPALA-1712: If heartbeats hang (which we simulate by sleeping for five
     minutes) the statestore should time them out every 3s and then eventually fail after
     40s (10 times (3 + 1), where the 1 is the inter-heartbeat delay)"""
-    sub = StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(300))
-    topic_name = "test_hung_heartbeat"
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
-    (
-      sub.start()
-         .register(topics=[reg])
-         .wait_for_update(topic_name, 1)
-         .wait_for_failure(timeout=60)
-    )
+    with StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(300)) as sub:
+      topic_name = "test_hung_heartbeat"
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 1)
+           .wait_for_failure(timeout=60)
+       )
 
   def test_topic_persistence(self):
     """Test that persistent topic entries survive subscriber failure, but transent topic
@@ -551,23 +563,23 @@ class TestStatestore():
     reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),
            TTopicRegistration(topic_name=transient_topic_name, is_transient=True)]
 
-    sub = StatestoreSubscriber(update_cb=add_entries)
-    (
-      sub.start()
-         .register(topics=reg)
-         .wait_for_update(persistent_topic_name, 2)
-         .wait_for_update(transient_topic_name, 2)
-         .kill()
-         .wait_for_failure()
-    )
-
-    sub2 = StatestoreSubscriber(update_cb=check_entries)
-    (
-      sub2.start()
-          .register(topics=reg)
-          .wait_for_update(persistent_topic_name, 1)
-          .wait_for_update(transient_topic_name, 1)
-    )
+    with StatestoreSubscriber(update_cb=add_entries) as sub:
+      (
+        sub.start()
+           .register(topics=reg)
+           .wait_for_update(persistent_topic_name, 2)
+           .wait_for_update(transient_topic_name, 2)
+           .kill()
+           .wait_for_failure()
+      )
+
+    with StatestoreSubscriber(update_cb=check_entries) as sub2:
+      (
+         sub2.start()
+             .register(topics=reg)
+             .wait_for_update(persistent_topic_name, 1)
+             .wait_for_update(transient_topic_name, 1)
+       )
 
   def test_update_with_clear_entries_flag(self):
     """Test that the statestore clears all topic entries when a subscriber
@@ -598,47 +610,47 @@ class TestStatestore():
       return DEFAULT_UPDATE_STATE_RESPONSE
 
     reg = [TTopicRegistration(topic_name=topic_name, is_transient=False)]
-    sub1 = StatestoreSubscriber(update_cb=add_entries)
-    (
-      sub1.start()
-        .register(topics=reg)
-        .wait_for_update(topic_name, 1)
-        .kill()
-        .wait_for_failure()
-        .start()
-        .register(topics=reg)
-        .wait_for_update(topic_name, 2)
-    )
-
-    sub2 = StatestoreSubscriber(update_cb=check_entries)
-    (
-      sub2.start()
-        .register(topics=reg)
-        .wait_for_update(topic_name, 2)
-    )
+    with StatestoreSubscriber(update_cb=add_entries) as sub1:
+      (
+        sub1.start()
+            .register(topics=reg)
+            .wait_for_update(topic_name, 1)
+            .kill()
+            .wait_for_failure()
+            .start()
+            .register(topics=reg)
+            .wait_for_update(topic_name, 2)
+      )
+
+    with StatestoreSubscriber(update_cb=check_entries) as sub2:
+      (
+        sub2.start()
+            .register(topics=reg)
+            .wait_for_update(topic_name, 2)
+      )
 
   def test_heartbeat_failure_reset(self):
     """Regression test for IMPALA-6785: the heartbeat failure count for the subscriber ID
     should be reset when it resubscribes, not after the first successful heartbeat. Delay
     the heartbeat to force the topic update to finish first."""
 
-    sub = StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(0.5))
-    topic_name = "test_heartbeat_failure_reset"
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
-    sub.start()
-    sub.register(topics=[reg])
-    LOG.info("Registered with id {0}".format(sub.subscriber_id))
-    sub.wait_for_heartbeat(1)
-    sub.kill()
-    LOG.info("Killed, waiting for statestore to detect failure via heartbeats")
-    sub.wait_for_failure()
-    # IMPALA-6785 caused only one topic update to be send. Wait for multiple updates to
-    # be received to confirm that the subsequent updates are being scheduled repeatedly.
-    target_updates = sub.update_counts[topic_name] + 5
-    sub.start()
-    sub.register(topics=[reg])
-    LOG.info("Re-registered with id {0}, waiting for update".format(sub.subscriber_id))
-    sub.wait_for_update(topic_name, target_updates)
+    with StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(0.5)) as sub:
+      topic_name = "test_heartbeat_failure_reset"
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+      sub.start()
+      sub.register(topics=[reg])
+      LOG.info("Registered with id {0}".format(sub.subscriber_id))
+      sub.wait_for_heartbeat(1)
+      sub.kill()
+      LOG.info("Killed, waiting for statestore to detect failure via heartbeats")
+      sub.wait_for_failure()
+      # IMPALA-6785 caused only one topic update to be send. Wait for multiple updates to
+      # be received to confirm that the subsequent updates are being scheduled repeatedly.
+      target_updates = sub.update_counts[topic_name] + 5
+      sub.start()
+      sub.register(topics=[reg])
+      LOG.info("Re-registered with id {0}, waiting for update".format(sub.subscriber_id))
+      sub.wait_for_update(topic_name, target_updates)
 
   def test_min_subscriber_topic_version(self):
     self._do_test_min_subscriber_topic_version(False)
@@ -716,19 +728,19 @@ class TestStatestore():
     # version, the other which just consumes the updates.
     def producer_callback(sub, args): return callback(sub, args, True, "producer")
     def consumer_callback(sub, args): return callback(sub, args, False, "consumer")
-    consumer_sub = StatestoreSubscriber(update_cb=consumer_callback)
-    consumer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
-    producer_sub = StatestoreSubscriber(update_cb=producer_callback)
-    producer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True,
-        populate_min_subscriber_topic_version=True)
-    NUM_UPDATES = 6
-    (
-      consumer_sub.start()
-          .register(topics=[consumer_reg])
-    )
-    (
-      producer_sub.start()
-          .register(topics=[producer_reg])
-          .wait_for_update(topic_name, NUM_UPDATES)
-    )
-    consumer_sub.wait_for_update(topic_name, NUM_UPDATES)
+    with StatestoreSubscriber(update_cb=consumer_callback) as consumer_sub:
+      with StatestoreSubscriber(update_cb=producer_callback) as producer_sub:
+        consumer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+        producer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True,
+            populate_min_subscriber_topic_version=True)
+        NUM_UPDATES = 6
+        (
+          consumer_sub.start()
+                      .register(topics=[consumer_reg])
+        )
+        (
+          producer_sub.start()
+                      .register(topics=[producer_reg])
+                      .wait_for_update(topic_name, NUM_UPDATES)
+        )
+        consumer_sub.wait_for_update(topic_name, NUM_UPDATES)