You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2018/08/25 19:27:49 UTC

[2/2] impala git commit: IMPALA-7476: Use context to limit scope of StatestoreSubscriber

IMPALA-7476: Use context to limit scope of StatestoreSubscriber

StatestoreSubscribers are outliving the end of
test_statestore.py, causing continued interaction with
the statestored with a lot of log spew in statestored.INFO
and the main test log.

This change converts StatestoreSubscriber to use a
Python context manager. It updates all the tests to
use this context manager. This ensures that the
StatestoreSubscriber runs kill() at the end of
each test.

There is also a circular reference between
StatestoreSubscriber and KillableThreadedServer that
prevents cleanup. This resolves the circular reference
by nulling out some variables on
KillableThreadedServer::shutdown().

Testing:
 - Verified log spew is gone
 - StatestoreSubscriber::__del__ is called for 16 out
   of 17 tests (exception: test_hung_heartbeat) where
   before it was called for none

Change-Id: Ie71fac6095cd94343f68b586238aed410ebbca39
Reviewed-on: http://gerrit.cloudera.org:8080/11326
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a8f8c8d6
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a8f8c8d6
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a8f8c8d6

Branch: refs/heads/master
Commit: a8f8c8d6f58d750c6949ff89e6f88f608d1ce462
Parents: 71b36fe
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Thu Aug 23 15:24:09 2018 -0700
Committer: Joe McDonnell <jo...@cloudera.com>
Committed: Sat Aug 25 19:26:34 2018 +0000

----------------------------------------------------------------------
 tests/statestore/test_statestore.py | 262 ++++++++++++++++---------------
 1 file changed, 137 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a8f8c8d6/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index d45deeb..8f26b63 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -104,6 +104,9 @@ class KillableThreadedServer(TServer):
     self.is_shutdown = True
     self.serverTransport.close()
     self.wait_until_down()
+    # The processor contains a reference to a StatestoreSubscriber. Clean up that
+    # reference to avoid a circular reference that would prevent object deletion.
+    self.processor = None
 
   def wait_until_up(self, num_tries=10):
     for i in xrange(num_tries):
@@ -180,6 +183,13 @@ class StatestoreSubscriber(object):
     self.subscriber_id = "python-test-client-%s" % uuid.uuid1()
     self.exception = None
 
+  def __enter__(self):
+    return self
+
+  def __exit__(self, *args):
+    self.kill()
+    self.wait_for_failure()
+
   def Heartbeat(self, args):
     """Heartbeat RPC handler. Calls heartbeat callback if one exists."""
     self.heartbeat_event.acquire()
@@ -241,8 +251,10 @@ class StatestoreSubscriber(object):
   def kill(self):
     """Closes both the server and client sockets, and waits for the server to become
     unavailable"""
-    self.client_transport.close()
-    self.server.shutdown()
+    if self.client_transport:
+      self.client_transport.close()
+    if self.server:
+      self.server.shutdown()
     return self
 
   def start(self):
@@ -335,17 +347,17 @@ class TestStatestore():
   def test_registration_ids_different(self):
     """Test that if a subscriber with the same id registers twice, the registration ID is
     different"""
-    sub = StatestoreSubscriber()
-    sub.start().register()
-    old_reg_id = sub.registration_id
-    sub.register()
-    assert old_reg_id != sub.registration_id
+    with StatestoreSubscriber() as sub:
+      sub.start().register()
+      old_reg_id = sub.registration_id
+      sub.register()
+      assert old_reg_id != sub.registration_id
 
   def test_receive_heartbeats(self):
     """Smoke test to confirm that heartbeats get sent to a correctly registered
     subscriber"""
-    sub = StatestoreSubscriber()
-    sub.start().register().wait_for_heartbeat(5)
+    with StatestoreSubscriber() as sub:
+      sub.start().register().wait_for_heartbeat(5)
 
   def test_receive_updates(self):
     """Test that updates are correctly received when a subscriber alters a topic"""
@@ -370,13 +382,13 @@ class TestStatestore():
 
       return DEFAULT_UPDATE_STATE_RESPONSE
 
-    sub = StatestoreSubscriber(update_cb=topic_update_correct)
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
-    (
-      sub.start()
-         .register(topics=[reg])
-         .wait_for_update(topic_name, 3)
-    )
+    with StatestoreSubscriber(update_cb=topic_update_correct) as sub:
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 3)
+      )
 
   def test_filter_prefix(self):
     topic_name = "topic_delta_%s" % uuid.uuid1()
@@ -418,14 +430,14 @@ class TestStatestore():
 
       return DEFAULT_UPDATE_STATE_RESPONSE
 
-    sub = StatestoreSubscriber(update_cb=topic_update_correct)
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=False,
-                             filter_prefix="bar")
-    (
-      sub.start()
-         .register(topics=[reg])
-         .wait_for_update(topic_name, 5)
-    )
+    with StatestoreSubscriber(update_cb=topic_update_correct) as sub:
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=False,
+                               filter_prefix="bar")
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 5)
+      )
 
   def test_update_is_delta(self):
     """Test that the 'is_delta' flag is correctly set. The first update for a topic should
@@ -452,13 +464,13 @@ class TestStatestore():
 
       return DEFAULT_UPDATE_STATE_RESPONSE
 
-    sub = StatestoreSubscriber(update_cb=check_delta)
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
-    (
-      sub.start()
-         .register(topics=[reg])
-         .wait_for_update(topic_name, 3)
-    )
+    with StatestoreSubscriber(update_cb=check_delta) as sub:
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 3)
+      )
 
   def test_skipped(self):
     """Test that skipping an update causes it to be resent"""
@@ -478,39 +490,39 @@ class TestStatestore():
       assert len(args.topic_deltas[topic_name].topic_entries) == 1
       return TUpdateStateResponse(status=STATUS_OK, skipped=True)
 
-    sub = StatestoreSubscriber(update_cb=check_skipped)
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
-    (
-      sub.start()
-         .register(topics=[reg])
-         .wait_for_update(topic_name, 3)
-    )
+    with StatestoreSubscriber(update_cb=check_skipped) as sub:
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 3)
+      )
 
   def test_failure_detected(self):
-    sub = StatestoreSubscriber()
-    topic_name = "test_failure_detected"
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
-    (
-      sub.start()
-         .register(topics=[reg])
-         .wait_for_update(topic_name, 1)
-         .kill()
-         .wait_for_failure()
-    )
+    with StatestoreSubscriber() as sub:
+      topic_name = "test_failure_detected"
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 1)
+           .kill()
+           .wait_for_failure()
+       )
 
   def test_hung_heartbeat(self):
     """Test for IMPALA-1712: If heartbeats hang (which we simulate by sleeping for five
     minutes) the statestore should time them out every 3s and then eventually fail after
     40s (10 times (3 + 1), where the 1 is the inter-heartbeat delay)"""
-    sub = StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(300))
-    topic_name = "test_hung_heartbeat"
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
-    (
-      sub.start()
-         .register(topics=[reg])
-         .wait_for_update(topic_name, 1)
-         .wait_for_failure(timeout=60)
-    )
+    with StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(300)) as sub:
+      topic_name = "test_hung_heartbeat"
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 1)
+           .wait_for_failure(timeout=60)
+       )
 
   def test_topic_persistence(self):
     """Test that persistent topic entries survive subscriber failure, but transent topic
@@ -551,23 +563,23 @@ class TestStatestore():
     reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),
            TTopicRegistration(topic_name=transient_topic_name, is_transient=True)]
 
-    sub = StatestoreSubscriber(update_cb=add_entries)
-    (
-      sub.start()
-         .register(topics=reg)
-         .wait_for_update(persistent_topic_name, 2)
-         .wait_for_update(transient_topic_name, 2)
-         .kill()
-         .wait_for_failure()
-    )
-
-    sub2 = StatestoreSubscriber(update_cb=check_entries)
-    (
-      sub2.start()
-          .register(topics=reg)
-          .wait_for_update(persistent_topic_name, 1)
-          .wait_for_update(transient_topic_name, 1)
-    )
+    with StatestoreSubscriber(update_cb=add_entries) as sub:
+      (
+        sub.start()
+           .register(topics=reg)
+           .wait_for_update(persistent_topic_name, 2)
+           .wait_for_update(transient_topic_name, 2)
+           .kill()
+           .wait_for_failure()
+      )
+
+    with StatestoreSubscriber(update_cb=check_entries) as sub2:
+      (
+         sub2.start()
+             .register(topics=reg)
+             .wait_for_update(persistent_topic_name, 1)
+             .wait_for_update(transient_topic_name, 1)
+       )
 
   def test_update_with_clear_entries_flag(self):
     """Test that the statestore clears all topic entries when a subscriber
@@ -598,47 +610,47 @@ class TestStatestore():
       return DEFAULT_UPDATE_STATE_RESPONSE
 
     reg = [TTopicRegistration(topic_name=topic_name, is_transient=False)]
-    sub1 = StatestoreSubscriber(update_cb=add_entries)
-    (
-      sub1.start()
-        .register(topics=reg)
-        .wait_for_update(topic_name, 1)
-        .kill()
-        .wait_for_failure()
-        .start()
-        .register(topics=reg)
-        .wait_for_update(topic_name, 2)
-    )
-
-    sub2 = StatestoreSubscriber(update_cb=check_entries)
-    (
-      sub2.start()
-        .register(topics=reg)
-        .wait_for_update(topic_name, 2)
-    )
+    with StatestoreSubscriber(update_cb=add_entries) as sub1:
+      (
+        sub1.start()
+            .register(topics=reg)
+            .wait_for_update(topic_name, 1)
+            .kill()
+            .wait_for_failure()
+            .start()
+            .register(topics=reg)
+            .wait_for_update(topic_name, 2)
+      )
+
+    with StatestoreSubscriber(update_cb=check_entries) as sub2:
+      (
+        sub2.start()
+            .register(topics=reg)
+            .wait_for_update(topic_name, 2)
+      )
 
   def test_heartbeat_failure_reset(self):
     """Regression test for IMPALA-6785: the heartbeat failure count for the subscriber ID
     should be reset when it resubscribes, not after the first successful heartbeat. Delay
     the heartbeat to force the topic update to finish first."""
 
-    sub = StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(0.5))
-    topic_name = "test_heartbeat_failure_reset"
-    reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
-    sub.start()
-    sub.register(topics=[reg])
-    LOG.info("Registered with id {0}".format(sub.subscriber_id))
-    sub.wait_for_heartbeat(1)
-    sub.kill()
-    LOG.info("Killed, waiting for statestore to detect failure via heartbeats")
-    sub.wait_for_failure()
-    # IMPALA-6785 caused only one topic update to be send. Wait for multiple updates to
-    # be received to confirm that the subsequent updates are being scheduled repeatedly.
-    target_updates = sub.update_counts[topic_name] + 5
-    sub.start()
-    sub.register(topics=[reg])
-    LOG.info("Re-registered with id {0}, waiting for update".format(sub.subscriber_id))
-    sub.wait_for_update(topic_name, target_updates)
+    with StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(0.5)) as sub:
+      topic_name = "test_heartbeat_failure_reset"
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+      sub.start()
+      sub.register(topics=[reg])
+      LOG.info("Registered with id {0}".format(sub.subscriber_id))
+      sub.wait_for_heartbeat(1)
+      sub.kill()
+      LOG.info("Killed, waiting for statestore to detect failure via heartbeats")
+      sub.wait_for_failure()
+      # IMPALA-6785 caused only one topic update to be send. Wait for multiple updates to
+      # be received to confirm that the subsequent updates are being scheduled repeatedly.
+      target_updates = sub.update_counts[topic_name] + 5
+      sub.start()
+      sub.register(topics=[reg])
+      LOG.info("Re-registered with id {0}, waiting for update".format(sub.subscriber_id))
+      sub.wait_for_update(topic_name, target_updates)
 
   def test_min_subscriber_topic_version(self):
     self._do_test_min_subscriber_topic_version(False)
@@ -716,19 +728,19 @@ class TestStatestore():
     # version, the other which just consumes the updates.
     def producer_callback(sub, args): return callback(sub, args, True, "producer")
     def consumer_callback(sub, args): return callback(sub, args, False, "consumer")
-    consumer_sub = StatestoreSubscriber(update_cb=consumer_callback)
-    consumer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
-    producer_sub = StatestoreSubscriber(update_cb=producer_callback)
-    producer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True,
-        populate_min_subscriber_topic_version=True)
-    NUM_UPDATES = 6
-    (
-      consumer_sub.start()
-          .register(topics=[consumer_reg])
-    )
-    (
-      producer_sub.start()
-          .register(topics=[producer_reg])
-          .wait_for_update(topic_name, NUM_UPDATES)
-    )
-    consumer_sub.wait_for_update(topic_name, NUM_UPDATES)
+    with StatestoreSubscriber(update_cb=consumer_callback) as consumer_sub:
+      with StatestoreSubscriber(update_cb=producer_callback) as producer_sub:
+        consumer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+        producer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True,
+            populate_min_subscriber_topic_version=True)
+        NUM_UPDATES = 6
+        (
+          consumer_sub.start()
+                      .register(topics=[consumer_reg])
+        )
+        (
+          producer_sub.start()
+                      .register(topics=[producer_reg])
+                      .wait_for_update(topic_name, NUM_UPDATES)
+        )
+        consumer_sub.wait_for_update(topic_name, NUM_UPDATES)