You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/02/16 11:06:55 UTC
incubator-ariatosca git commit: tests fix
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-79-concurrent-storage-modifications 1b03bf211 -> b77070899
tests fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b7707089
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b7707089
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b7707089
Branch: refs/heads/ARIA-79-concurrent-storage-modifications
Commit: b77070899e9f2113973d9a65fdc9a95b35d996fb
Parents: 1b03bf2
Author: mxmrlv <mx...@gmail.com>
Authored: Thu Feb 16 13:06:35 2017 +0200
Committer: mxmrlv <mx...@gmail.com>
Committed: Thu Feb 16 13:06:35 2017 +0200
----------------------------------------------------------------------
aria/orchestrator/workflows/executor/process.py | 5 ++
...process_executor_concurrent_modifications.py | 74 ++++++++------------
2 files changed, 34 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b7707089/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 319982e..2c563a8 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -366,6 +366,7 @@ def _main():
storage_type.remove_mutable_association_listener()
with instrumentation.track_changes() as instrument:
+ # import pydevd; pydevd.settrace('localhost')
try:
ctx = serialization.operation_context_from_dict(context_dict)
_patch_session(ctx=ctx, messenger=messenger, instrument=instrument)
@@ -376,6 +377,10 @@ def _main():
task_func(ctx=ctx, **operation_inputs)
messenger.succeeded(tracked_changes=instrument.tracked_changes)
except BaseException as e:
+ # import traceback
+ # with open('/home/maxim/Desktop/tmp_log', 'wr+') as f:
+ # traceback.print_exc(file=f)
+
messenger.failed(exception=e, tracked_changes=instrument.tracked_changes)
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b7707089/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
index 7c54bc5..e46921e 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import time
import json
@@ -33,34 +32,34 @@ from tests import mock
from tests import storage
-def test_concurrent_modification_on_task_succeeded(context, executor, shared_file):
- _test(context, executor, shared_file, _test_task_succeeded, expected_failure=True)
+def test_concurrent_modification_on_task_succeeded(context, executor, lock_files):
+ _test(context, executor, lock_files, _test_task_succeeded, expected_failure=True)
@operation
-def _test_task_succeeded(ctx, shared_file, key, first_value, second_value):
- _concurrent_update(shared_file, ctx.node_instance, key, first_value, second_value)
+def _test_task_succeeded(ctx, lock_files, key, first_value, second_value):
+ _concurrent_update(lock_files, ctx.node_instance, key, first_value, second_value)
-def test_concurrent_modification_on_task_failed(context, executor, shared_file):
- _test(context, executor, shared_file, _test_task_failed, expected_failure=True)
+def test_concurrent_modification_on_task_failed(context, executor, lock_files):
+ _test(context, executor, lock_files, _test_task_failed, expected_failure=True)
@operation
-def _test_task_failed(ctx, shared_file, key, first_value, second_value):
- first = _concurrent_update(shared_file, ctx.node_instance, key, first_value, second_value)
+def _test_task_failed(ctx, lock_files, key, first_value, second_value):
+ first = _concurrent_update(lock_files, ctx.node_instance, key, first_value, second_value)
if not first:
raise RuntimeError('MESSAGE')
-def test_concurrent_modification_on_update_and_refresh(context, executor, shared_file):
- _test(context, executor, shared_file, _test_update_and_refresh, expected_failure=False)
+def test_concurrent_modification_on_update_and_refresh(context, executor, lock_files):
+ _test(context, executor, lock_files, _test_update_and_refresh, expected_failure=False)
@operation
-def _test_update_and_refresh(ctx, shared_file, key, first_value, second_value):
+def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value):
node_instance = ctx.node_instance
- first = _concurrent_update(shared_file, node_instance, key, first_value, second_value)
+ first = _concurrent_update(lock_files, node_instance, key, first_value, second_value)
if not first:
try:
ctx.model.node_instance.update(node_instance)
@@ -71,16 +70,15 @@ def _test_update_and_refresh(ctx, shared_file, key, first_value, second_value):
raise RuntimeError('Unexpected')
-def _test(context, executor, shared_file, func, expected_failure):
+def _test(context, executor, lock_files, func, expected_failure):
def _node_instance(ctx):
return ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
- shared_file.write(json.dumps({}))
key = 'key'
first_value = 'value1'
second_value = 'value2'
inputs = {
- 'shared_file': str(shared_file),
+ 'lock_files': lock_files,
'key': key,
'first_value': first_value,
'second_value': second_value
@@ -130,45 +128,31 @@ def context(tmpdir):
@pytest.fixture
-def shared_file(tmpdir):
- return tmpdir.join('shared_file')
-
+def lock_files(tmpdir):
+ return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file'))
-def _concurrent_update(shared_file, node_instance, key, first_value, second_value):
- def lock():
- return fasteners.InterProcessLock(shared_file)
- def get(key):
- with open(shared_file) as f:
- return json.load(f).get(key)
+def _concurrent_update(lock_files, node_instance, key, first_value, second_value):
- def set(key):
- with open(shared_file) as f:
- content = json.load(f)
- content[key] = True
- with open(shared_file, 'wb') as f:
- json.dump(content, f)
+ locker1 = fasteners.InterProcessLock(lock_files[0])
+ locker2 = fasteners.InterProcessLock(lock_files[1])
- def wait_for(key):
- while True:
- time.sleep(0.01)
- with lock():
- if get(key):
- break
-
- with lock():
- first = not get('first_in')
- set('first_in' if first else 'second_in')
+ first = locker1.acquire(blocking=False)
if first:
- wait_for('second_in')
+ # Give chance for both processes to acquire locks
+ while locker2.acquire(blocking=False):
+ locker2.release()
+ time.sleep(0.1)
+ else:
+ locker2.acquire()
node_instance.runtime_properties[key] = first_value if first else second_value
if first:
- with lock():
- set('first_out')
+ locker1.release()
else:
- wait_for('first_out')
+ with locker1:
+ locker2.release()
return first