You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/13 21:46:32 UTC
[beam] Diff for: [GitHub] aaltay closed pull request #7492: Deflake
statesampler_test
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py
index dfb4bc10a913..1ff0c5ca6f5d 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_test.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py
@@ -24,6 +24,9 @@
import unittest
from builtins import range
+from tenacity import retry
+from tenacity import stop_after_attempt
+
from apache_beam.runners.worker import statesampler
from apache_beam.utils.counters import CounterFactory
from apache_beam.utils.counters import CounterName
@@ -31,33 +34,41 @@
class StateSamplerTest(unittest.TestCase):
+ # Due to somewhat non-deterministic nature of state sampling and sleep,
+ # this test is flaky when state duraiton is low.
+ # Since increasing state duration significantly would also slow down
+ # the test suite, we are retrying twice on failure as a mitigation.
+ @retry(reraise=True, stop=stop_after_attempt(3))
def test_basic_sampler(self):
# Set up state sampler.
counter_factory = CounterFactory()
sampler = statesampler.StateSampler('basic', counter_factory,
sampling_period_ms=1)
+ # Duration of the fastest state. Total test duration is 6 times longer.
+ state_duration_ms = 1000
+ margin_of_error = 0.25
# Run basic workload transitioning between 3 states.
sampler.start()
with sampler.scoped_state('step1', 'statea'):
- time.sleep(0.1)
+ time.sleep(state_duration_ms / 1000)
self.assertEqual(
sampler.current_state().name,
CounterName(
'statea-msecs', step_name='step1', stage_name='basic'))
with sampler.scoped_state('step1', 'stateb'):
- time.sleep(0.2 / 2)
+ time.sleep(state_duration_ms / 1000)
self.assertEqual(
sampler.current_state().name,
CounterName(
'stateb-msecs', step_name='step1', stage_name='basic'))
with sampler.scoped_state('step1', 'statec'):
- time.sleep(0.3)
+ time.sleep(3 * state_duration_ms / 1000)
self.assertEqual(
sampler.current_state().name,
CounterName(
'statec-msecs', step_name='step1', stage_name='basic'))
- time.sleep(0.2 / 2)
+ time.sleep(state_duration_ms / 1000)
sampler.stop()
sampler.commit_counters()
@@ -68,9 +79,12 @@ def test_basic_sampler(self):
# Test that sampled state timings are close to their expected values.
expected_counter_values = {
- CounterName('statea-msecs', step_name='step1', stage_name='basic'): 100,
- CounterName('stateb-msecs', step_name='step1', stage_name='basic'): 200,
- CounterName('statec-msecs', step_name='step1', stage_name='basic'): 300,
+ CounterName('statea-msecs', step_name='step1', stage_name='basic'):
+ state_duration_ms,
+ CounterName('stateb-msecs', step_name='step1', stage_name='basic'):
+ 2 * state_duration_ms,
+ CounterName('statec-msecs', step_name='step1', stage_name='basic'):
+ 3 * state_duration_ms,
}
for counter in counter_factory.get_counters():
self.assertIn(counter.name, expected_counter_values)
@@ -78,8 +92,8 @@ def test_basic_sampler(self):
actual_value = counter.value()
deviation = float(abs(actual_value - expected_value)) / expected_value
logging.info('Sampling deviation from expectation: %f', deviation)
- self.assertGreater(actual_value, expected_value * 0.75)
- self.assertLess(actual_value, expected_value * 1.25)
+ self.assertGreater(actual_value, expected_value * (1.0 - margin_of_error))
+ self.assertLess(actual_value, expected_value * (1.0 + margin_of_error))
def test_sampler_transition_overhead(self):
# Set up state sampler.
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 807ff09ac5fe..4e2c761185f9 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -133,10 +133,11 @@ def get_version():
REQUIRED_TEST_PACKAGES = [
'nose>=1.3.7',
+ 'numpy>=1.14.3,<2',
'pandas>=0.23.4,<0.24',
'parameterized>=0.6.0,<0.7.0',
- 'numpy>=1.14.3,<2',
'pyhamcrest>=1.9,<2.0',
+ 'tenacity>=5.0.2,<6.0',
]
GCP_REQUIREMENTS = [
With regards,
Apache Git Services