You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/02/08 10:26:53 UTC
incubator-ariatosca git commit: some rework
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-42-Generic-ctx-serialization-mechanism 6da1ba132 -> 77165a4fb
some rework
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/77165a4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/77165a4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/77165a4f
Branch: refs/heads/ARIA-42-Generic-ctx-serialization-mechanism
Commit: 77165a4fbc8fbb2e7612bbe816b45d764a07ab30
Parents: 6da1ba1
Author: mxmrlv <mx...@gmail.com>
Authored: Wed Feb 8 12:26:44 2017 +0200
Committer: mxmrlv <mx...@gmail.com>
Committed: Wed Feb 8 12:26:44 2017 +0200
----------------------------------------------------------------------
aria/orchestrator/context/operation.py | 27 ++++++++
aria/orchestrator/context/serialization.py | 73 --------------------
aria/orchestrator/runner.py | 19 ++---
aria/orchestrator/workflows/executor/process.py | 5 +-
aria/storage/core.py | 8 +++
.../workflows/executor/test_executor.py | 5 +-
.../workflows/executor/test_process_executor.py | 17 +++--
7 files changed, 57 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 23a6fd4..d1f61b2 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -17,6 +17,7 @@
Workflow and operation contexts
"""
+import aria
from aria.utils import file
from .common import BaseContext
@@ -73,6 +74,32 @@ class BaseOperationContext(BaseContext):
file.makedirs(plugin_workdir)
return plugin_workdir
+ @property
+ def serialization_dict(self):
+ context_cls = self.__class__
+ context_dict = {
+ 'name': self.name,
+ 'deployment_id': self._deployment_id,
+ 'task_id': self._task_id,
+ 'actor_id': self._actor_id,
+ 'workdir': self._workdir,
+ 'model_storage': self.model.serialization_dict if self.model else None,
+ 'resource_storage': self.resource.serialization_dict if self.resource else None
+ }
+ return {
+ 'context_cls': context_cls,
+ 'context': context_dict
+ }
+
+ @classmethod
+ def deserialize_from_dict(cls, model_storage=None, resource_storage=None, **kwargs):
+ if model_storage:
+ model_storage = aria.application_model_storage(**model_storage)
+ if resource_storage:
+ resource_storage = aria.application_resource_storage(**resource_storage)
+
+ return cls(model_storage=model_storage, resource_storage=resource_storage, **kwargs)
+
class NodeOperationContext(BaseOperationContext):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/aria/orchestrator/context/serialization.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/serialization.py b/aria/orchestrator/context/serialization.py
deleted file mode 100644
index 705a63b..0000000
--- a/aria/orchestrator/context/serialization.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import aria
-
-
-def operation_context_to_dict(context):
- context_cls = context.__class__
- context_dict = {
- 'name': context.name,
- 'deployment_id': context._deployment_id,
- 'task_id': context._task_id,
- 'actor_id': context._actor_id,
- 'workdir': context._workdir
- }
- if context.model:
- model = context.model
- context_dict['model_storage'] = {
- 'api_cls': model.api,
- 'initiator': model._initiator,
- 'initiator_kwargs': model._initiator_kwargs,
- }
-
- else:
- context_dict['model_storage'] = None
- if context.resource:
- resource = context.resource
- context_dict['resource_storage'] = {
- 'api_cls': resource.api,
- 'initiator': resource._initiator,
- 'initiator_kwargs': resource._initiator_kwargs,
- }
- else:
- context_dict['resource_storage'] = None
- return {
- 'context_cls': context_cls,
- 'context': context_dict
- }
-
-
-def operation_context_from_dict(context_dict):
- context_cls = context_dict['context_cls']
- context = context_dict['context']
-
- model_storage = context['model_storage']
- if model_storage:
- api_cls = model_storage['api_cls']
- initiator_kwargs = model_storage['initiator_kwargs']
- init_func = model_storage['initiator']
- context['model_storage'] = aria.application_model_storage(
- api_cls, initiator_kwargs=initiator_kwargs, initiator=init_func)
-
- resource_storage = context['resource_storage']
- if resource_storage:
- api_cls = resource_storage['api_cls']
- initiator_kwargs = resource_storage['initiator_kwargs']
- init_func = resource_storage['initiator']
- context['resource_storage'] = aria.application_resource_storage(
- api=api_cls, initiator_kwargs=initiator_kwargs, initiator=init_func)
-
- return context_cls(**context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/aria/orchestrator/runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py
index 6f998be..97fe410 100644
--- a/aria/orchestrator/runner.py
+++ b/aria/orchestrator/runner.py
@@ -17,22 +17,20 @@
Workflow runner
"""
-import platform
import tempfile
import os
-from sqlalchemy import (create_engine, orm) # @UnresolvedImport
-from sqlalchemy.pool import StaticPool # @UnresolvedImport
-
from .context.workflow import WorkflowContext
from .workflows.core.engine import Engine
from .workflows.executor.thread import ThreadExecutor
from ..storage import (
- model,
sql_mapi,
filesystem_rapi,
)
-from .. import (application_model_storage, application_resource_storage)
+from .. import (
+ application_model_storage,
+ application_resource_storage
+)
SQLITE_IN_MEMORY = 'sqlite:///:memory:'
@@ -98,12 +96,7 @@ class Runner(object):
task_retry_interval=1)
- @staticmethod
- def create_fs_resource_storage(directory='.'):
- return
-
def cleanup(self):
- if (self._is_storage_temporary and
- (self._storage_path is not None) and
- os.path.isfile(self._storage_path)):
+ if (self._is_storage_temporary and (self._storage_path is not None) and
+ os.path.isfile(self._storage_path)):
os.remove(self._storage_path)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 23a4f52..c4b8ba1 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -46,7 +46,6 @@ from aria.extension import process_executor
from aria.utils import imports
from aria.utils import exceptions
from aria.orchestrator.workflows.executor import base
-from aria.orchestrator.context import serialization
from aria.storage import instrumentation
from aria.storage import type as storage_type
@@ -194,7 +193,7 @@ class ProcessExecutor(base.BaseExecutor):
'operation_mapping': task.operation_mapping,
'operation_inputs': task.inputs,
'port': self._server_port,
- 'context': serialization.operation_context_to_dict(task.context),
+ 'context': task.context.serialization_dict,
}
def _update_env(self, env, plugin_prefix):
@@ -328,7 +327,7 @@ def _main():
with instrumentation.track_changes() as instrument:
try:
- ctx = serialization.operation_context_from_dict(context_dict)
+ ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
_patch_session(ctx=ctx, messenger=messenger, instrument=instrument)
task_func = imports.load_attribute(operation_mapping)
aria.install_aria_extensions()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/aria/storage/core.py
----------------------------------------------------------------------
diff --git a/aria/storage/core.py b/aria/storage/core.py
index c45f2f2..4f7cd02 100644
--- a/aria/storage/core.py
+++ b/aria/storage/core.py
@@ -84,6 +84,14 @@ class Storage(LoggerMixin):
except KeyError:
return super(Storage, self).__getattribute__(item)
+ @property
+ def serialization_dict(self):
+ return {
+ 'api': self.api,
+ 'initiator': self._initiator,
+ 'initiator_kwargs': self._initiator_kwargs
+ }
+
def register(self, entry):
"""
Register the entry to the storage
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 2486a1e..0b33ae6 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -30,6 +30,7 @@ except ImportError:
from aria.storage import model
from aria.orchestrator import events
+from aria.orchestrator.context import operation
from aria.orchestrator.workflows.executor import (
thread,
process,
@@ -80,9 +81,9 @@ class MockException(Exception):
pass
-class MockContext(object):
+class MockContext(operation.BaseOperationContext):
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called
pass
def __getattr__(self, item):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 4972e61..9f0d1db 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -22,11 +22,16 @@ from contextlib import contextmanager
import pytest
from aria import application_model_storage
-from aria.storage import model as aria_model
+from aria.storage import (
+ model as aria_model,
+ sql_mapi
+)
+from aria.orchestrator import (
+ events,
+ plugin
+)
+from aria.orchestrator.context import operation
from aria.utils.plugin import create as create_plugin
-from aria.storage import sql_mapi
-from aria.orchestrator import events
-from aria.orchestrator import plugin
from aria.orchestrator.workflows.executor import process
@@ -107,9 +112,9 @@ def mock_plugin(plugin_manager, tmpdir):
return plugin_manager.install(source=plugin_path)
-class MockContext(object):
+class MockContext(operation.BaseOperationContext):
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called
pass
def __getattr__(self, item):