You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/10/27 15:16:36 UTC
[32/50] [abbrv] Renamed base module name to python_cartridgeagent
Started decrypt password test
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python-cartridge-agent/test/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/test/__init__.py b/tools/python-cartridge-agent/test/__init__.py
deleted file mode 100644
index 13a8339..0000000
--- a/tools/python-cartridge-agent/test/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python-cartridge-agent/test/asynctest.txt
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/test/asynctest.txt b/tools/python-cartridge-agent/test/asynctest.txt
deleted file mode 100644
index 623c418..0000000
--- a/tools/python-cartridge-agent/test/asynctest.txt
+++ /dev/null
@@ -1 +0,0 @@
-1413799652508.8130
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python-cartridge-agent/test/test_util.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/test/test_util.py b/tools/python-cartridge-agent/test/test_util.py
deleted file mode 100644
index f62b2e8..0000000
--- a/tools/python-cartridge-agent/test/test_util.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from cartridgeagent.modules.util.asyncscheduledtask import *
-import time
-
-def test_async_task():
- test_task = TestTask()
- astask = ScheduledExecutor(2, test_task)
- start_time = time.time() * 1000
- astask.start()
- time.sleep(3)
- astask.terminate()
- f = open("asynctest.txt", "r")
- end_time = float(f.read())
- assert (end_time - start_time) >= 2 * 1000, "Task was executed before specified delay"
-
-
-class TestTask(AbstractAsyncScheduledTask):
-
- def execute_task(self):
- with open("asynctest.txt", "w") as f:
- f.seek(0)
- f.truncate()
- f.write("%1.4f" % (time.time()*1000))
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/__init__.py b/tools/python_cartridgeagent/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/__init__.py b/tools/python_cartridgeagent/cartridgeagent/__init__.py
new file mode 100644
index 0000000..d216be4
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/agent.conf
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/agent.conf b/tools/python_cartridgeagent/cartridgeagent/agent.conf
new file mode 100644
index 0000000..5c087e9
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/agent.conf
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[agent]
+mb.ip =MB-IP
+mb.port =MB-PORT
+listen.address =LISTEN_ADDR
+thrift.receiver.ip =CEP-IP
+thrift.receiver.port =CEP-PORT
+thrift.server.admin.username =CEP-ADMIN-USERNAME
+thrift.server.admin.password =CEP-ADMIN-PASSWORD
+param.file.path =/mnt/cartridge-agent/payload/launch-params
+extensions.dir =/mnt/cartridge-agent/extensions
+cep.stats.publisher.enabled =ENABLE_HEALTH_PUBLISHER
+lb.private.ip =LB_PRIVATE_IP
+lb.public.ip =LB_PUBLIC_IP
+enable.artifact.update =ENABLE_ARTFCT_UPDATE
+auto.commit =COMMIT_ENABLED
+auto.checkout =CHECKOUT_ENABLED
+artifact.update.interval =ARTFCT_UPDATE_INT
+port.check.timeout =PORT_CHECK_TIMEOUT
+enable.data.publisher =ENABLE-DATA-PUBLISHER
+monitoring.server.ip =MONITORING-SERVER-IP
+monitoring.server.port =MONITORING-SERVER-PORT
+monitoring.server.secure.port =MONITORING-SERVER-SECURE-PORT
+monitoring.server.admin.username =MONITORING-SERVER-ADMIN-USERNAME
+monitoring.server.admin.password =MONITORING-SERVER-ADMIN-PASSWORD
+log.file.paths =LOG_FILE_PATHS
+APP_PATH =APP-PATH
+super.tenant.repository.path =/repository/deployment/server/
+tenant.repository.path =/repository/tenants/
+extension.instance.started =instance-started.sh
+extension.start.servers =start-servers.sh
+extension.instance.activated =instance-activated.sh
+extension.artifacts.updated =artifacts-updated.sh
+extension.clean =clean.sh
+extension.mount.volumes =mount_volumes.sh
+extension.member.started =member-started.sh
+extension.member.activated =member-activated.sh
+extension.member.suspended =member-suspended.sh
+extension.member.terminated =member-terminated.sh
+extension.complete.topology =complete-topology.sh
+extension.complete.tenant =complete-tenant.sh
+extension.subscription.domain.added =subscription-domain-added.sh
+extension.subscription.domain.removed =subscription-domain-removed.sh
+extension.artifacts.copy =artifacts-copy.sh
+extension.tenant.subscribed =tenant-subscribed.sh
+extension.tenant.unsubscribed =tenant-unsubscribed.sh
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/agent.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/agent.py b/tools/python_cartridgeagent/cartridgeagent/agent.py
new file mode 100644
index 0000000..9f1a972
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/agent.py
@@ -0,0 +1,343 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import threading
+
+from modules.exception.parameternotfoundexception import ParameterNotFoundException
+from modules.subscriber import eventsubscriber
+from modules.publisher import cartridgeagentpublisher
+from modules.event.instance.notifier.events import *
+from modules.event.tenant.events import *
+from modules.event.topology.events import *
+from modules.tenant.tenantcontext import *
+from modules.topology.topologycontext import *
+from modules.datapublisher.logpublisher import *
+from modules.config import cartridgeagentconfiguration
+from modules.extensions import defaultextensionhandler
+
+
+class CartridgeAgent(threading.Thread):
+ extension_handler = defaultextensionhandler.DefaultExtensionHandler()
+
+ def __init__(self):
+ threading.Thread.__init__(self)
+
+ mb_ip = cartridgeagentconfiguration.CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_IP)
+ mb_port = cartridgeagentconfiguration.CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_PORT)
+
+ self.__instance_event_subscriber = eventsubscriber.EventSubscriber(
+ cartridgeagentconstants.INSTANCE_NOTIFIER_TOPIC,
+ mb_ip,
+ mb_port)
+ self.__tenant_event_subscriber = eventsubscriber.EventSubscriber(
+ cartridgeagentconstants.TENANT_TOPIC,
+ mb_ip,
+ mb_port)
+ self.__topology_event_subscriber = eventsubscriber.EventSubscriber(
+ cartridgeagentconstants.TOPOLOGY_TOPIC,
+ mb_ip,
+ mb_port)
+
+ self.__tenant_context_initialized = False
+
+ self.log_publish_manager = None
+
+ self.terminated = False
+
+ self.log = LogFactory().get_log(__name__)
+
+ self.cartridge_agent_config = CartridgeAgentConfiguration()
+
+ def run(self):
+ self.log.info("Starting Cartridge Agent...")
+
+ #Check if required prpoerties are set
+ self.validate_required_properties()
+
+ #Start instance notifier listener thread
+ self.subscribe_to_topics_and_register_listeners()
+
+ #Start topology event receiver thread
+ self.register_topology_event_listeners()
+
+ #Start tenant event receiver thread
+ self.register_tenant_event_listeners()
+
+ #wait for intance spawned event
+ while not self.cartridge_agent_config.initialized:
+ self.log.debug("Waiting for Cartridge Agent to be initialized...")
+ time.sleep(1)
+
+ #Execute instance started shell script
+ CartridgeAgent.extension_handler.on_instance_started_event()
+
+ #Publish instance started event
+ cartridgeagentpublisher.publish_instance_started_event()
+
+ #Execute start servers extension
+ try:
+ CartridgeAgent.extension_handler.start_server_extension()
+ except:
+ self.log.exception("Error processing start servers event")
+
+ #Wait for all ports to be active
+ cartridgeagentutils.wait_until_ports_active(
+ self.cartridge_agent_config.listen_address,
+ self.cartridge_agent_config.ports,
+ int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False))
+ )
+
+ # check if artifact management is required before publishing instance activated event
+ repo_url = self.cartridge_agent_config.repo_url
+ if repo_url is None or str(repo_url).strip() == "":
+ self.log.info("No artifact repository found")
+ CartridgeAgent.extension_handler.on_instance_activated_event()
+
+ cartridgeagentpublisher.publish_instance_activated_event()
+
+ persistence_mappping_payload = self.cartridge_agent_config.persistence_mappings
+ if persistence_mappping_payload is not None:
+ CartridgeAgent.extension_handler.volume_mount_extension(persistence_mappping_payload)
+
+ # start log publishing thread
+ if DataPublisherConfiguration.get_instance().enabled:
+ log_file_paths = self.cartridge_agent_config.log_file_paths
+ if log_file_paths is None:
+ self.log.exception("No valid log file paths found, no logs will be published")
+ else:
+ self.log_publish_manager = LogPublisherManager(log_file_paths)
+ self.log_publish_manager.start()
+
+ while not self.terminated:
+ time.sleep(1)
+
+ if DataPublisherConfiguration.get_instance().enabled:
+ self.log_publish_manager.terminate_all_publishers()
+
+ def terminate(self):
+ """
+ Allows the CartridgeAgent thread to be terminated
+
+ :return: void
+ """
+ self.terminated = True
+
+ def validate_required_properties(self):
+ """
+ Checks if required properties are set
+ :return: void
+ """
+ #PARAM_FILE_PATH
+ try:
+ self.cartridge_agent_config.read_property(cartridgeagentconstants.PARAM_FILE_PATH)
+ except ParameterNotFoundException:
+ self.log.error("System property not found: %r" % cartridgeagentconstants.PARAM_FILE_PATH)
+ return
+
+ #EXTENSIONS_DIR
+ try:
+ self.cartridge_agent_config.read_property(cartridgeagentconstants.EXTENSIONS_DIR)
+ except ParameterNotFoundException:
+ self.log.error("System property not found: %r" % cartridgeagentconstants.EXTENSIONS_DIR)
+ return
+
+ def subscribe_to_topics_and_register_listeners(self):
+ self.log.debug("Starting instance notifier event message receiver thread")
+
+ self.__instance_event_subscriber.register_handler("ArtifactUpdatedEvent", self.on_artifact_updated)
+ self.__instance_event_subscriber.register_handler("InstanceCleanupMemberEvent", self.on_instance_cleanup_member)
+ self.__instance_event_subscriber.register_handler("InstanceCleanupClusterEvent", self.on_instance_cleanup_cluster)
+
+ self.__instance_event_subscriber.start()
+ self.log.info("Instance notifier event message receiver thread started")
+
+ # wait till subscribed to continue
+ while not self.__instance_event_subscriber.is_subscribed():
+ time.sleep(2)
+
+ def on_artifact_updated(self, msg):
+ event_obj = ArtifactUpdatedEvent.create_from_json(msg.payload)
+ CartridgeAgent.extension_handler.on_artifact_updated_event(event_obj)
+
+ def on_instance_cleanup_member(self, msg):
+ member_in_payload = self.cartridge_agent_config.member_id
+ event_obj = InstanceCleanupMemberEvent.create_from_json(msg.payload)
+ member_in_event = event_obj.member_id
+ if member_in_payload == member_in_event:
+ CartridgeAgent.extension_handler.on_instance_cleanup_member_event(event_obj)
+
+ def on_instance_cleanup_cluster(self, msg):
+ event_obj = InstanceCleanupClusterEvent.create_from_json(msg.payload)
+ cluster_in_payload = self.cartridge_agent_config.cluster_id
+ cluster_in_event = event_obj.cluster_id
+
+ if cluster_in_event == cluster_in_payload:
+ CartridgeAgent.extension_handler.on_instance_cleanup_cluster_event(event_obj)
+
+ def register_topology_event_listeners(self):
+ self.log.debug("Starting topology event message receiver thread")
+
+ self.__topology_event_subscriber.register_handler("MemberActivatedEvent", self.on_member_activated)
+ self.__topology_event_subscriber.register_handler("MemberTerminatedEvent", self.on_member_terminated)
+ self.__topology_event_subscriber.register_handler("MemberSuspendedEvent", self.on_member_suspended)
+ self.__topology_event_subscriber.register_handler("CompleteTopologyEvent", self.on_complete_topology)
+ self.__topology_event_subscriber.register_handler("MemberStartedEvent", self.on_member_started)
+ self.__topology_event_subscriber.register_handler("InstanceSpawnedEvent", self.on_instance_spawned)
+
+ self.__topology_event_subscriber.start()
+ self.log.info("Cartridge Agent topology receiver thread started")
+
+ def on_instance_spawned(self, msg):
+ self.log.debug("Instance spawned event received: %r" % msg.payload)
+ if self.cartridge_agent_config.initialized:
+ return
+
+ event_obj = InstanceSpawnedEvent.create_from_json(msg.payload)
+ try:
+ CartridgeAgent.extension_handler.on_instance_spawned_event(event_obj)
+ except:
+ self.log.exception("Error processing instance spawned event")
+
+ def on_member_activated(self, msg):
+ self.log.debug("Member activated event received: %r" % msg.payload)
+ if not self.cartridge_agent_config.initialized:
+ return
+
+ event_obj = MemberActivatedEvent.create_from_json(msg.payload)
+ try:
+ CartridgeAgent.extension_handler.on_member_activated_event(event_obj)
+ except:
+ self.log.exception("Error processing member activated event")
+
+ def on_member_terminated(self, msg):
+ self.log.debug("Member terminated event received: %r" % msg.payload)
+ if not self.cartridge_agent_config.initialized:
+ return
+
+ event_obj = MemberTerminatedEvent.create_from_json(msg.payload)
+ try:
+ CartridgeAgent.extension_handler.on_member_terminated_event(event_obj)
+ except:
+ self.log.exception("Error processing member terminated event")
+
+ def on_member_suspended(self, msg):
+ self.log.debug("Member suspended event received: %r" % msg.payload)
+ if not self.cartridge_agent_config.initialized:
+ return
+
+ event_obj = MemberSuspendedEvent.create_from_json(msg.payload)
+ try:
+ CartridgeAgent.extension_handler.on_member_suspended_event(event_obj)
+ except:
+ self.log.exception("Error processing member suspended event")
+
+ def on_complete_topology(self, msg):
+ if not self.cartridge_agent_config.initialized:
+ self.log.debug("Complete topology event received")
+ event_obj = CompleteTopologyEvent.create_from_json(msg.payload)
+ TopologyContext.update(event_obj.topology)
+ try:
+ CartridgeAgent.extension_handler.on_complete_topology_event(event_obj)
+ except:
+ self.log.exception("Error processing complete topology event")
+ else:
+ self.log.info("Complete topology event updating task disabled")
+
+ def on_member_started(self, msg):
+ self.log.debug("Member started event received: %r" % msg.payload)
+ if not self.cartridge_agent_config.initialized:
+ return
+
+ event_obj = MemberStartedEvent.create_from_json(msg.payload)
+ try:
+ CartridgeAgent.extension_handler.on_member_started_event(event_obj)
+ except:
+ self.log.exception("Error processing member started event")
+
+ def register_tenant_event_listeners(self):
+ self.log.debug("Starting tenant event message receiver thread")
+ self.__tenant_event_subscriber.register_handler("SubscriptionDomainAddedEvent", self.on_subscription_domain_added)
+ self.__tenant_event_subscriber.register_handler("SubscriptionDomainsRemovedEvent", self.on_subscription_domain_removed)
+ self.__tenant_event_subscriber.register_handler("CompleteTenantEvent", self.on_complete_tenant)
+ self.__tenant_event_subscriber.register_handler("TenantSubscribedEvent", self.on_tenant_subscribed)
+ self.__tenant_event_subscriber.register_handler("TenantUnSubscribedEvent", self.on_tenant_unsubscribed)
+
+ self.__tenant_event_subscriber.start()
+ self.log.info("Tenant event message receiver thread started")
+
+ def on_subscription_domain_added(self, msg):
+ self.log.debug("Subscription domain added event received : %r" % msg.payload)
+ event_obj = SubscriptionDomainAddedEvent.create_from_json(msg.payload)
+ try:
+ CartridgeAgent.extension_handler.on_subscription_domain_added_event(event_obj)
+ except:
+ self.log.exception("Error processing subscription domains added event")
+
+ def on_subscription_domain_removed(self, msg):
+ self.log.debug("Subscription domain removed event received : %r" % msg.payload)
+ event_obj = SubscriptionDomainRemovedEvent.create_from_json(msg.payload)
+ try:
+ CartridgeAgent.extension_handler.on_subscription_domain_removed_event(event_obj)
+ except:
+ self.log.exception("Error processing subscription domains removed event")
+
+ def on_complete_tenant(self, msg):
+ if not self.__tenant_context_initialized:
+ self.log.debug("Complete tenant event received")
+ event_obj = CompleteTenantEvent.create_from_json(msg.payload)
+ TenantContext.update(event_obj.tenants)
+
+ try:
+ CartridgeAgent.extension_handler.on_complete_tenant_event(event_obj)
+ self.__tenant_context_initialized = True
+ except:
+ self.log.exception("Error processing complete tenant event")
+ else:
+ self.log.info("Complete tenant event updating task disabled")
+
+ def on_tenant_subscribed(self, msg):
+ self.log.debug("Tenant subscribed event received: %r" % msg.payload)
+ event_obj = TenantSubscribedEvent.create_from_json(msg.payload)
+ try:
+ CartridgeAgent.extension_handler.on_tenant_subscribed_event(event_obj)
+ except:
+ self.log.exception("Error processing tenant subscribed event")
+
+ def on_tenant_unsubscribed(self, msg):
+ self.log.debug("Tenant unSubscribed event received: %r" % msg.payload)
+ event_obj = TenantUnsubscribedEvent.create_from_json(msg.payload)
+ try:
+ CartridgeAgent.extension_handler.on_tenant_unsubscribed_event(event_obj)
+ except:
+ self.log.exception("Error processing tenant unSubscribed event")
+
+
+def main():
+ cartridge_agent = CartridgeAgent()
+ log = LogFactory().get_log(__name__)
+
+ try:
+ log.debug("Starting cartridge agent")
+ cartridge_agent.start()
+ except:
+ log.exception("Cartridge Agent Exception")
+ cartridge_agent.terminate()
+
+
+if __name__ == "__main__":
+ main()
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/logging.ini
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/logging.ini b/tools/python_cartridgeagent/cartridgeagent/logging.ini
new file mode 100644
index 0000000..3e49a96
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/logging.ini
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+[formatters]
+keys=default
+
+[formatter_default]
+format=%(asctime)s:%(levelname)s:%(message)s
+class=logging.Formatter
+
+[handlers]
+keys=console, error_file, log_file
+
+[handler_console]
+class=logging.StreamHandler
+formatter=default
+args=tuple()
+
+[handler_log_file]
+class=logging.FileHandler
+level=LOG_LEVEL
+formatter=default
+args=("agent.log", "w")
+
+[handler_error_file]
+class=logging.FileHandler
+level=ERROR
+formatter=default
+args=("error.log", "w")
+
+[loggers]
+keys=root
+
+[logger_root]
+level=LOG_LEVEL
+formatter=default
+handlers=console,error_file,log_file
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/__init__.py
new file mode 100644
index 0000000..d216be4
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/__init__.py
new file mode 100644
index 0000000..2456923
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/__init__.py
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/__init__.py
new file mode 100644
index 0000000..2456923
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/__init__.py
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/agentgithandler.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/agentgithandler.py b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/agentgithandler.py
new file mode 100644
index 0000000..6da9c58
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/agentgithandler.py
@@ -0,0 +1,503 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from threading import current_thread, Thread
+from git import *
+from gittle import Gittle, GittleAuth # GitPython and Gittle are both used at the time being for pros and cons of both
+import urllib2
+
+from ... util.log import LogFactory
+from ... util import cartridgeagentutils, extensionutils, cartridgeagentconstants
+from gitrepository import GitRepository
+from ... config import cartridgeagentconfiguration
+from ... util.asyncscheduledtask import AbstractAsyncScheduledTask, ScheduledExecutor
+from ... artifactmgt.repositoryinformation import RepositoryInformation
+
+class AgentGitHandler:
+ """
+ Handles all the git artifact management tasks related to a cartridge
+ """
+
+ log = LogFactory().get_log(__name__)
+
+ SUPER_TENANT_ID = -1234
+ SUPER_TENANT_REPO_PATH = "/repository/deployment/server/"
+ TENANT_REPO_PATH = "/repository/tenants/"
+
+ extension_handler = None
+
+ __git_repositories = {}
+ # (tenant_id => gitrepository.GitRepository)
+
+ cartridge_agent_config = cartridgeagentconfiguration.CartridgeAgentConfiguration()
+
+ @staticmethod
+ def checkout(repo_info):
+ """
+ Checks out the code from the remote repository.
+ If local repository path is empty, a clone operation is done.
+ If there is a cloned repository already on the local repository path, a pull operation
+ will be performed.
+ If there are artifacts not in the repository already on the local repository path,
+ they will be added to a git repository, the remote url added as origin, and then
+ a pull operation will be performed.
+
+ :param RepositoryInformation repo_info: The repository information object
+ :return: A tuple containing whether it was an initial clone or not, and the repository
+ context object
+ :rtype: tuple(bool, GitRepository)
+ """
+ repo_context = AgentGitHandler.get_repo_context(repo_info.tenant_id)
+ if repo_context is not None:
+ #has been previously cloned, this is not the subscription run
+ subscribe_run = False
+ if AgentGitHandler.is_valid_git_repository(repo_context):
+ AgentGitHandler.log.debug("Existing git repository detected for tenant %r, no clone required" % repo_info.tenant_id)
+ AgentGitHandler.pull(repo_context)
+ else:
+ if not os.listdir(repo_context.local_repo_path):
+ #empty dir, clone
+ repo_context.repo = AgentGitHandler.clone(repo_info)
+ else:
+ #not empty
+ if AgentGitHandler.sync_initial_local_artifacts(repo_context):
+ AgentGitHandler.pull(repo_context)
+ else:
+ repo_context = None
+ else:
+ #subscribing run.. need to clone
+ subscribe_run = True
+ repo_context = AgentGitHandler.clone(repo_context)
+
+ return subscribe_run, repo_context
+
+ @staticmethod
+ def sync_initial_local_artifacts(repo_context):
+ #init git repo
+ AgentGitHandler.init(repo_context.local_repo_path)
+
+ # add remote repos
+ return AgentGitHandler.add_remote(repo_context)
+
+ @staticmethod
+ def add_remote(repo_context):
+ try:
+ #add origin remote
+ repo_context.repo.create_remote("origin", repo_context.repo_url)
+ #fetch branch details from origin
+ repo_context.repo.git.fetch()
+ #checkout master branch from origin/master as tracking
+ repo_context.repo.git.branch("-f", "--track", "master", "origin/master")
+ return True
+ except:
+ AgentGitHandler.log.exception("Error in adding remote origin %r for local repository %r"
+ % (repo_context.repo_url, repo_context.local_repo_path))
+ return False
+
+ @staticmethod
+ def init(path):
+ try:
+ repo = Gittle.init(path)
+ return repo
+ except:
+ AgentGitHandler.log.exception("Initializing local repo at %r failed" % path)
+ raise Exception("Initializing local repo at %r failed" % path)
+
+ @staticmethod
+ def is_valid_git_repository(repo_context):
+ if repo_context.cloned:
+ return True
+
+ for ref in repo_context.repo.refs:
+ try:
+ ref._get_object()
+ except ValueError:
+ return False
+
+ return True
+
+ @staticmethod
+ def pull(repo_context):
+ repo = Repo(repo_context.local_repo_path)
+ from ....agent import CartridgeAgent
+ AgentGitHandler.extension_handler = CartridgeAgent.extension_handler
+ try:
+ repo.git.checkout("master")
+ pull_output = repo.git.pull()
+ if "Already up-to-date." not in pull_output:
+ AgentGitHandler.log.debug("Artifacts were updated as a result of the pull operation, thread: %r - %r" % (current_thread().getName(), current_thread().ident))
+
+ AgentGitHandler.extension_handler.on_artifact_update_scheduler_event(repo_context.tenant_id)
+ except GitCommandError as ex:
+ if "fatal: Could not read from remote repository." in ex:
+ #invalid configuration, need to delete and reclone
+ AgentGitHandler.log.warn("Git pull unsuccessful for tenant %r, invalid configuration. %r" % (repo_context.tenant_id, ex))
+ cartridgeagentutils.delete_folder_tree(repo_context.local_repo_path)
+ AgentGitHandler.clone(RepositoryInformation(
+ repo_context.repo_url,
+ repo_context.repo_username,
+ repo_context.repo_password,
+ repo_context.local_repo_path,
+ repo_context.tenant_id,
+ repo_context.is_multitenant,
+ repo_context.commit_enabled
+ ))
+ AgentGitHandler.extension_handler.on_artifact_update_scheduler_event(repo_context.tenant_id)
+ elif "error: Your local changes to the following files would be overwritten by merge:" in ex:
+ #conflict error
+ AgentGitHandler.log.warn("Git pull unsuccessful for tenant %r, conflicts detected." % repo_context.tenant_id)
+ #raise ex
+
+ """
+ 0:'git pull' returned exit status 1: error: Your local changes to the following files would be overwritten by merge:
+ 1: README.md
+ 2: index.php
+ 3:Please, commit your changes or stash them before you can merge.
+ 4:Aborting
+ """
+ conflict_list = []
+ files_arr = str(ex).split("\n")
+ for file_index in range(1, len(files_arr) - 2):
+ file_name = files_arr[file_index].strip()
+ conflict_list.append(file_name)
+ AgentGitHandler.log.debug("Added the file path %r to checkout from the remote repository" % file_name)
+
+ AgentGitHandler.checkout_individually(conflict_list, repo)
+ elif "fatal: unable to access " in ex:
+ #transport error
+ AgentGitHandler.log.exception("Accessing remote git repository %r failed for tenant %r" % (repo_context.repo_url, repo_context.tenant_id))
+ else:
+ AgentGitHandler.log.exception("Git pull operation for tenant %r failed" % repo_context.tenant_id)
+
+ @staticmethod
+ def checkout_individually(conflict_list, repo):
+ try:
+ for conflicted_file in conflict_list:
+ repo.git.checkout(conflicted_file)
+ AgentGitHandler.log.info("Checked out the conflicting files from the remote repository successfully")
+ except:
+ AgentGitHandler.log.exception("Checking out artifacts from index failed")
+
+ @staticmethod
+ def clone(repo_info):
+ repo_context = None
+ try:
+ repo_context = AgentGitHandler.create_git_repo_context(repo_info)
+ #create the directory if it doesn't exist
+ if not os.path.isdir(repo_context.local_repo_path):
+ cartridgeagentutils.create_dir(repo_context.local_repo_path)
+
+ auth = AgentGitHandler.create_auth_configuration(repo_context)
+
+ if auth is not None:
+ # authentication is required, use Gittle
+ gittle_repo = Gittle.clone(repo_context.repo_url, repo_context.local_repo_path, auth=auth)
+ repo = Repo(repo_context.local_repo_path)
+ else:
+ # authentication is not required, use GitPython
+ repo = Repo.clone_from(repo_context.repo_url, repo_context.local_repo_path)
+ gittle_repo = Gittle(repo_context.local_repo_path)
+
+ repo_context.cloned = True
+ repo_context.gittle_repo = gittle_repo
+ repo_context.repo = repo
+ AgentGitHandler.add_repo_context(repo_context)
+ AgentGitHandler.log.info("Git clone operation for tenant %r successful" % repo_context.tenant_id)
+ except urllib2.URLError:
+ AgentGitHandler.log.exception("Accessing remote git repository failed for tenant %r" % repo_context.tenant_id)
+ except OSError:
+ AgentGitHandler.log.exception("Permission denied for repository path for tenant %r" % repo_context.tenant_id)
+ except:
+ AgentGitHandler.log.exception("Git clone operation for tenant %r failed" % repo_context.tenant_id)
+ finally:
+ return repo_context
+
+ @staticmethod
+ def create_auth_configuration(repo_context):
+ """
+ Creates a GittleAuth object based on the type of authorization
+ :param GitRepository repo_context: The repository context object
+ :return: GittleAuth object or None if no authorization needed
+ :rtype: GittleAuth
+ """
+ if repo_context.key_based_auth:
+ pkey = AgentGitHandler.get_private_key()
+ auth = GittleAuth(pkey=pkey)
+ elif repo_context.repo_username.strip() != "" and repo_context.repo_password.strip() != "":
+ auth = GittleAuth(username=repo_context.repo_username, password=repo_context.repo_password)
+ else:
+ auth = None
+
+ return auth
+
+ @staticmethod
+ def get_private_key():
+ """
+ Returns a file handler to the private key path specified by Carbon or default if not specified
+ by Carbon
+ :return: The file object of the private key file
+ :rtype: file
+ """
+ pkey_name = cartridgeagentutils.get_carbon_server_property("SshPrivateKeyName")
+ if pkey_name is None:
+ pkey_name = "wso2"
+
+ pkey_path = cartridgeagentutils.get_carbon_server_property("SshPrivateKeyPath")
+ if pkey_path is None:
+ pkey_path = os.environ["HOME"] + "/.ssh"
+
+ if pkey_path.endswith("/"):
+ pkey_ptr = pkey_path + pkey_name
+ else:
+ pkey_ptr = pkey_path + "/" + pkey_name
+
+ pkey_file = open(pkey_ptr)
+
+ return pkey_file
+
+
+ @staticmethod
+ def add_repo_context(repo_context):
+ AgentGitHandler.__git_repositories[repo_context.tenant_id] = repo_context
+
+ @staticmethod
+ def get_repo_context(tenant_id):
+ """
+
+ :param int tenant_id:
+ :return: GitRepository object
+ :rtype: GitRepository
+ """
+ if tenant_id in AgentGitHandler.__git_repositories:
+ return AgentGitHandler.__git_repositories[tenant_id]
+
+ return None
+
+ @staticmethod
+ def remove_repo_context(tenant_id):
+ if tenant_id in AgentGitHandler.__git_repositories:
+ del AgentGitHandler.__git_repositories[tenant_id]
+
+ @staticmethod
+ def create_git_repo_context(repo_info):
+ repo_context = GitRepository()
+ repo_context.tenant_id = repo_info.tenant_id
+ repo_context.local_repo_path = AgentGitHandler.get_repo_path_for_tenant(
+ repo_info.tenant_id, repo_info.repo_path, repo_info.is_multitenant)
+ repo_context.repo_url = repo_info.repo_url
+ repo_context.repo_username = repo_info.repo_username
+ repo_context.repo_password = repo_info.repo_password
+ repo_context.is_multitenant = repo_info.is_multitenant
+ repo_context.commit_enabled = repo_info.commit_enabled
+
+ if AgentGitHandler.is_key_based_auth(repo_info.repo_url, repo_info.tenant_id):
+ repo_context.key_based_auth = True
+ else:
+ repo_context.key_based_auth = False
+
+ repo_context.cloned = False
+
+ repo_context.repo = None
+ repo_context.gittle_repo = None
+
+ return repo_context
+
+ @staticmethod
+ def is_key_based_auth(repo_url, tenant_id):
+ """
+ Checks if the given git repo has key based authentication
+ :param str repo_url: Git repository remote url
+ :param str tenant_id: Tenant ID
+ :return: True if key based, False otherwise
+ :rtype: bool
+ """
+ if repo_url.startswith("http://") or repo_url.startswith("https://"):
+ # username and password, not key based
+ return False
+ elif repo_url.startswith("git://github.com"):
+ # no auth required
+ return False
+ elif repo_url.startswith("ssh://") or "@" in repo_url:
+ # key based
+ return True
+ else:
+ AgentGitHandler.log.error("Invalid git URL provided for tenant " + tenant_id)
+ raise RuntimeError("Invalid git URL provided for tenant " + tenant_id)
+
+ @staticmethod
+ def get_repo_path_for_tenant(tenant_id, git_local_repo_path, is_multitenant):
+ repo_path = ""
+
+ if is_multitenant:
+ if tenant_id == AgentGitHandler.SUPER_TENANT_ID:
+ #super tenant, /repository/deploy/server/
+ super_tenant_repo_path = AgentGitHandler.cartridge_agent_config.super_tenant_repository_path
+ #"app_path"
+ repo_path += git_local_repo_path
+
+ if super_tenant_repo_path is not None and super_tenant_repo_path != "":
+ super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.startswith("/") else "/" + super_tenant_repo_path
+ super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.endswith("/") else super_tenant_repo_path + "/"
+ #"app_path/repository/deploy/server/"
+ repo_path += super_tenant_repo_path
+ else:
+ #"app_path/repository/deploy/server/"
+ repo_path += AgentGitHandler.SUPER_TENANT_REPO_PATH
+
+ else:
+ #normal tenant, /repository/tenants/tenant_id
+ tenant_repo_path = AgentGitHandler.cartridge_agent_config.tenant_repository_path
+ #"app_path"
+ repo_path += git_local_repo_path
+
+ if tenant_repo_path is not None and tenant_repo_path != "":
+ tenant_repo_path = tenant_repo_path if tenant_repo_path.startswith("/") else "/" + tenant_repo_path
+ tenant_repo_path = tenant_repo_path if tenant_repo_path.endswith("/") else tenant_repo_path + "/"
+ #"app_path/repository/tenants/244653444"
+ repo_path += tenant_repo_path + tenant_id
+ else:
+ #"app_path/repository/tenants/244653444"
+ repo_path += AgentGitHandler.TENANT_REPO_PATH + tenant_id
+
+ #tenant_dir_path = git_local_repo_path + AgentGitHandler.TENANT_REPO_PATH + tenant_id
+ cartridgeagentutils.create_dir(repo_path)
+ else:
+ #not multi tenant, app_path
+ repo_path = git_local_repo_path
+
+ AgentGitHandler.log.debug("Repo path returned : %r" % repo_path)
+ return repo_path
+
+ @staticmethod
+ def commit(repo_info):
+ """
+ Commits and pushes new artifacts to the remote repository
+ :param repo_info:
+ :return:
+ """
+ tenant_id = repo_info.tenant_id
+ repo_context = AgentGitHandler.get_repo_context(tenant_id)
+ gittle_repo = repo_context.gittle_repo
+ try:
+ modified = True if gittle_repo.modified_unstaged_files.count > 0 else False
+ except OSError:
+ # removed files
+ modified = True
+
+ if not modified:
+ AgentGitHandler.log.debug("No changes detected in the local repository for tenant " + tenant_id)
+ return
+
+ gittle_repo.stage(gittle_repo.untracked_files)
+ gittle_repo.stage(gittle_repo.removed_files)
+ gittle_repo.stage(gittle_repo.modified_unstaged_files)
+
+ #commit to local repositpory
+ commit_message = "tenant " + tenant_id + "'s artifacts committed to local repo at " + repo_context.local_repo_path
+
+ try:
+ commit_hash = gittle_repo.commit(name="First Author", email="author@example.org", message=commit_message)
+ AgentGitHandler.log.debug("Committed artifacts for tenant : " + tenant_id + " : " + commit_hash)
+ except:
+ AgentGitHandler.log.exception("Committing artifacts to local repository failed for tenant " + tenant_id)
+
+ #push to remote
+ try:
+ repo = repo_context.repo
+ #TODO: check key based authentication
+ credentialed_remote_url = AgentGitHandler.get_credentialed_remote_url(repo_context)
+ push_remote = repo.create_remote('push_remote', credentialed_remote_url)
+ push_remote.push()
+ AgentGitHandler.log.debug("Pushed artifacts for tenant : " + tenant_id)
+ except:
+ AgentGitHandler.log.exception("Pushing artifacts to remote repository failed for tenant " + tenant_id)
+
+ @staticmethod
+ def get_credentialed_remote_url(repo_context):
+ """
+ Creates a remote url including the credentials
+ :param repo_context:
+ :return:
+ """
+ username = repo_context.repo_username
+ password = repo_context.repo_password
+
+ raise NotImplementedError
+
+ @staticmethod
+ def schedule_artifact_update_scheduled_task(repo_info, auto_checkout, auto_commit, update_interval):
+ repo_context = AgentGitHandler.get_repo_context(repo_info.tenant_id)
+
+ if repo_context is None:
+ AgentGitHandler.log.error("Unable to schedule artifact sync task, repositoryContext null for tenant %r" % repo_info.tenant_id)
+ return
+
+ if repo_context.scheduled_update_task is None:
+ #TODO: make thread safe
+ artifact_update_task = ArtifactUpdateTask(repo_info, auto_checkout, auto_commit)
+ async_task = ScheduledExecutor(update_interval, artifact_update_task)
+
+ repo_context.scheduled_update_task = async_task
+ async_task.start()
+ AgentGitHandler.log.info("Scheduled Artifact Synchronization Task for path %r" % repo_context.local_repo_path)
+ else:
+ AgentGitHandler.log.info("Artifact Synchronization Task for path %r already scheduled" % repo_context.local_repo_path)
+
+ @staticmethod
+ def remove_repo(tenant_id):
+ repo_context = AgentGitHandler.get_repo_context(tenant_id)
+
+ #stop artifact update task
+ repo_context.scheduled_update_task.terminate()
+
+ #remove git contents
+ cartridgeagentutils.delete_folder_tree(repo_context.local_repo_path)
+
+ AgentGitHandler.remove_repo_context(tenant_id)
+
+ if tenant_id == -1234:
+ if AgentGitHandler.cartridge_agent_config.is_multitenant:
+ extensionutils.execute_copy_artifact_extension(
+ cartridgeagentconstants.SUPERTENANT_TEMP_PATH,
+ AgentGitHandler.cartridge_agent_config.app_path + "/repository/deployment/server/"
+ )
+
+ AgentGitHandler.log.info("git repository deleted for tenant %r" % repo_context.tenant_id)
+
+ return True
+
+
+class ArtifactUpdateTask(AbstractAsyncScheduledTask):
+ """
+ Checks if the autocheckout and autocommit are enabled and executes respective tasks
+ """
+
+ def __init__(self, repo_info, auto_checkout, auto_commit):
+ self.log = LogFactory().get_log(__name__)
+ self.repo_info = repo_info
+ self.auto_checkout = auto_checkout
+ self.auto_commit = auto_commit
+
+ def execute_task(self):
+ try:
+ if self.auto_checkout:
+ AgentGitHandler.checkout(self.repo_info)
+ except:
+ self.log.exception("Auto checkout task failed")
+
+ if self.auto_commit:
+ AgentGitHandler.commit(self.repo_info)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/gitrepository.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/gitrepository.py b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/gitrepository.py
new file mode 100644
index 0000000..98a8a44
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/gitrepository.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from ...util.asyncscheduledtask import AsyncScheduledTask
+from gittle import Gittle
+from git import *
+
+class GitRepository:
+ """
+ Represents a git repository inside a particular instance
+ """
+
+ def __init__(self):
+ self.repo_url = None
+ """ :type : str """
+ self.local_repo_path = None
+ """ :type : str """
+ self.cloned = False
+ """ :type : bool """
+ self.repo = None
+ """ :type : git.repo.base.Repo """
+ self.gittle_repo = None
+ """ :type : gittle.gittle.Gittle """
+ self.tenant_id = None
+ """ :type : int """
+ self.key_based_auth = False
+ """ :type : bool """
+ self.repo_username = None
+ """ :type : str """
+ self.repo_password = None
+ """ :type : str """
+ self.is_multitenant = False
+ """ :type : bool """
+ self.commit_enabled = False
+ """ :type : bool """
+ self.scheduled_update_task = None
+ """:type : AsyncScheduledTask """
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/repositoryinformation.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/repositoryinformation.py b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/repositoryinformation.py
new file mode 100644
index 0000000..b67eada
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/repositoryinformation.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+class RepositoryInformation:
+ """
+ Holds repository information to be used in artifact management
+ """
+
+ def __init__(self, repo_url, repo_username, repo_password, repo_path, tenant_id, is_multitenant, commit_enabled):
+ self.repo_url = repo_url
+ """ :type : str """
+ self.repo_username = repo_username
+ """ :type : str """
+ self.repo_password = repo_password
+ """ :type : str """
+ self.repo_path = repo_path
+ """ :type : str """
+ self.tenant_id = tenant_id
+ """ :type : int """
+ self.is_multitenant = is_multitenant
+ """ :type : bool """
+ self.commit_enabled = commit_enabled
+ """ :type : bool """
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/config/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/config/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/config/__init__.py
new file mode 100644
index 0000000..2456923
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/config/__init__.py
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/config/cartridgeagentconfiguration.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/config/cartridgeagentconfiguration.py b/tools/python_cartridgeagent/cartridgeagent/modules/config/cartridgeagentconfiguration.py
new file mode 100644
index 0000000..15871ba
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/config/cartridgeagentconfiguration.py
@@ -0,0 +1,346 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import ConfigParser
+import os
+import socket
+
+from ..util.log import LogFactory
+
+
+class CartridgeAgentConfiguration:
+ """
+ Handles the configuration information of the particular Cartridge Agent
+ """
+ class __CartridgeAgentConfiguration:
+ def __init__(self):
+ # set log level
+ self.log = LogFactory().get_log(__name__)
+
+ self.payload_params = {}
+ self.properties = None
+
+ self.service_group = None
+ """ :type : str """
+ self.is_clustered = False
+ """ :type : bool """
+ self.service_name = None
+ """ :type : str """
+ self.cluster_id = None
+ """ :type : str """
+ self.network_partition_id = None
+ """ :type : str """
+ self.partition_id = None
+ """ :type : str """
+ self.member_id = None
+ """ :type : str """
+ self.cartridge_key = None
+ """ :type : str """
+ self.app_path = None
+ """ :type : str """
+ self.repo_url = None
+ """ :type : str """
+ self.ports = []
+ """ :type : list[str] """
+ self.log_file_paths = []
+ """ :type : list[str] """
+ self.is_multitenant = False
+ """ :type : bool """
+ self.persistence_mappings = None
+ """ :type : str """
+ self.is_commits_enabled = False
+ """ :type : bool """
+ self.is_checkout_enabled = False
+ """ :type : bool """
+ self.listen_address = None
+ """ :type : str """
+ self.is_internal_repo = False
+ """ :type : bool """
+ self.tenant_id = None
+ """ :type : str """
+ self.lb_cluster_id = None
+ """ :type : str """
+ self.min_count = None
+ """ :type : str """
+ self.lb_private_ip = None
+ """ :type : str """
+ self.lb_public_ip = None
+ """ :type : str """
+ self.tenant_repository_path = None
+ """ :type : str """
+ self.super_tenant_repository_path = None
+ """ :type : str """
+ self.deployment = None
+ """ :type : str """
+ self.manager_service_name = None
+ """ :type : str """
+ self.worker_service_name = None
+ """ :type : str """
+ self.is_primary = False
+ """ :type : bool """
+
+ self.payload_params = {}
+ self.__read_conf_file()
+ self.__read_parameter_file()
+
+ self.initialized = False
+ """ :type : bool """
+
+ try:
+ self.service_group = self.payload_params[cartridgeagentconstants.SERVICE_GROUP] \
+ if cartridgeagentconstants.SERVICE_GROUP in self.payload_params \
+ else None
+
+ if cartridgeagentconstants.CLUSTERING in self.payload_params and \
+ str(self.payload_params[cartridgeagentconstants.CLUSTERING]).strip().lower() == "true":
+ self.is_clustered = True
+ else:
+ self.is_clustered = False
+ # self.__isClustered = self.payload_params[
+ # cartridgeagentconstants.CLUSTERING] if cartridgeagentconstants.CLUSTERING in self.payload_params else None
+
+ self.service_name = self.read_property(cartridgeagentconstants.SERVICE_NAME)
+ self.cluster_id = self.read_property(cartridgeagentconstants.CLUSTER_ID)
+ self.network_partition_id = self.read_property(cartridgeagentconstants.NETWORK_PARTITION_ID, False)
+ self.partition_id = self.read_property(cartridgeagentconstants.PARTITION_ID, False)
+ self.member_id = self.get_member_id(cartridgeagentconstants.MEMBER_ID)
+ self.cartridge_key = self.read_property(cartridgeagentconstants.CARTRIDGE_KEY)
+ self.app_path = self.read_property(cartridgeagentconstants.APP_PATH, False)
+ self.repo_url = self.read_property(cartridgeagentconstants.REPO_URL, False)
+ self.ports = str(self.read_property(cartridgeagentconstants.PORTS)).split("|")
+
+ try:
+ self.log_file_paths = str(
+ self.read_property(cartridgeagentconstants.LOG_FILE_PATHS)).strip().split("|")
+ except ParameterNotFoundException as ex:
+ self.log.debug("Cannot read log file path : %r" % ex.get_message())
+ self.log_file_paths = None
+
+ is_multi_str = self.read_property(cartridgeagentconstants.MULTITENANT)
+ self.is_multitenant = True if str(is_multi_str).lower().strip() == "true" else False
+
+ try:
+ self.persistence_mappings = self.read_property(
+ cartridgeagentconstants.PERSISTENCE_MAPPING)
+ except ParameterNotFoundException as ex:
+ self.log.debug("Cannot read persistence mapping : %r" % ex.get_message())
+ self.persistence_mappings = None
+
+ try:
+ is_commit_str = self.read_property(cartridgeagentconstants.COMMIT_ENABLED)
+ self.is_commits_enabled = True if str(is_commit_str).lower().strip() == "true" else False
+ except ParameterNotFoundException:
+ try:
+ is_commit_str = self.read_property(cartridgeagentconstants.AUTO_COMMIT)
+ self.is_commits_enabled = True if str(is_commit_str).lower().strip() == "true" else False
+ except ParameterNotFoundException:
+ self.log.info(
+ "%r is not found and setting it to false" % cartridgeagentconstants.COMMIT_ENABLED)
+ self.is_commits_enabled = False
+
+ auto_checkout_str = self.read_property(cartridgeagentconstants.AUTO_CHECKOUT, False)
+ self.is_checkout_enabled = True if str(auto_checkout_str).lower().strip() == "true" else False
+
+ self.listen_address = self.read_property(
+ cartridgeagentconstants.LISTEN_ADDRESS, False)
+
+ try:
+ int_repo_str = self.read_property(cartridgeagentconstants.PROVIDER)
+ self.is_internal_repo = True if str(int_repo_str).strip().lower() == cartridgeagentconstants.INTERNAL else False
+ except ParameterNotFoundException:
+ self.log.info(" INTERNAL payload parameter is not found")
+ self.is_internal_repo = False
+
+ self.tenant_id = self.read_property(cartridgeagentconstants.TENANT_ID)
+ self.lb_cluster_id = self.read_property(cartridgeagentconstants.LB_CLUSTER_ID, False)
+ self.min_count = self.read_property(cartridgeagentconstants.MIN_INSTANCE_COUNT, False)
+ self.lb_private_ip = self.read_property(cartridgeagentconstants.LB_PRIVATE_IP, False)
+ self.lb_public_ip = self.read_property(cartridgeagentconstants.LB_PUBLIC_IP, False)
+ self.tenant_repository_path = self.read_property(cartridgeagentconstants.TENANT_REPO_PATH, False)
+ self.super_tenant_repository_path = self.read_property(cartridgeagentconstants.SUPER_TENANT_REPO_PATH, False)
+
+ try:
+ self.deployment = self.read_property(
+ cartridgeagentconstants.DEPLOYMENT)
+ except ParameterNotFoundException:
+ self.deployment = None
+
+ # Setting worker-manager setup - manager service name
+ if self.deployment is None:
+ self.manager_service_name = None
+
+ if str(self.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower():
+ self.manager_service_name = self.service_name
+
+ elif str(self.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower():
+ self.deployment = self.read_property(
+ cartridgeagentconstants.MANAGER_SERVICE_TYPE)
+
+ elif str(self.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_DEFAULT.lower():
+ self.deployment = None
+ else:
+ self.deployment = None
+
+ # Setting worker-manager setup - worker service name
+ if self.deployment is None:
+ self.worker_service_name = None
+
+ if str(self.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower():
+ self.manager_service_name = self.service_name
+
+ elif str(self.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower():
+ self.deployment = self.read_property(
+ cartridgeagentconstants.WORKER_SERVICE_TYPE)
+
+ elif str(self.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_DEFAULT.lower():
+ self.deployment = None
+ else:
+ self.deployment = None
+
+ try:
+ self.is_primary = self.read_property(
+ cartridgeagentconstants.CLUSTERING_PRIMARY_KEY)
+ except ParameterNotFoundException:
+ self.is_primary = None
+ except ParameterNotFoundException as ex:
+ raise RuntimeError(ex)
+
+ self.log.info("Cartridge agent configuration initialized")
+
+ self.log.debug("service-name: %r" % self.service_name)
+ self.log.debug("cluster-id: %r" % self.cluster_id)
+ self.log.debug(
+ "network-partition-id: %r" % self.network_partition_id)
+ self.log.debug("partition-id: %r" % self.partition_id)
+ self.log.debug("member-id: %r" % self.member_id)
+ self.log.debug("cartridge-key: %r" % self.cartridge_key)
+ self.log.debug("app-path: %r" % self.app_path)
+ self.log.debug("repo-url: %r" % self.repo_url)
+ self.log.debug("ports: %r" % str(self.ports))
+ self.log.debug("lb-private-ip: %r" % self.lb_private_ip)
+ self.log.debug("lb-public-ip: %r" % self.lb_public_ip)
+
+ def get_member_id(self, member_id_field):
+ """
+ Reads the member id from the payload file or configuration file. If neither of
+ these sources contain the member id, the hostname is assigned to it and returned.
+ :param str member_id_field: the key of the member id to lookup
+ :return: The member id
+ :rtype : str
+ """
+ try:
+ member_id = self.read_property(member_id_field)
+ except ParameterNotFoundException:
+ try:
+ self.log.info("Reading hostname from container")
+ member_id = socket.gethostname()
+ except:
+ self.log.exception("Hostname can not be resolved")
+ member_id = "unknown"
+
+ self.log.debug("MemberId is taking the value of hostname : [" + member_id + "] ")
+ return member_id
+
+ def __read_conf_file(self):
+ """
+ Reads and stores the agent's configuration file
+ :return: void
+ """
+
+ conf_file_path = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "agent.conf"
+ self.log.debug("Config file path : %r" % conf_file_path)
+ self.properties = ConfigParser.SafeConfigParser()
+ self.properties.read(conf_file_path)
+
+ def __read_parameter_file(self):
+ """
+ Reads the payload file of the cartridge and stores the values in a dictionary
+ :return: void
+ """
+
+ param_file = self.read_property(cartridgeagentconstants.PARAM_FILE_PATH, False)
+ self.log.debug("Param file path : %r" % param_file)
+
+ try:
+ if param_file is not None:
+ metadata_file = open(param_file)
+ metadata_payload_content = metadata_file.read()
+ for param in metadata_payload_content.split(","):
+ if param.strip() != "":
+ param_value = param.strip().split("=")
+ self.payload_params[param_value[0]] = param_value[1]
+
+ # self.payload_params = dict(
+ # param.split("=") for param in metadata_payload_content.split(","))
+ metadata_file.close()
+ else:
+ self.log.error("File not found: %r" % param_file)
+ except:
+ self.log.exception(
+ "Could not read launch parameter file, hence trying to read from System properties.")
+
+ def read_property(self, property_key, critical=True):
+ """
+ Returns the value of the provided property
+ :param str property_key: the name of the property to be read
+ :return: Value of the property,
+ :rtype: str
+ :exception: ParameterNotFoundException if the provided property cannot be found
+ """
+
+ if self.properties.has_option("agent", property_key):
+ self.log.debug("Has key: %r" % property_key)
+ temp_str = self.properties.get("agent", property_key)
+ if temp_str != "" and temp_str is not None:
+ if str(temp_str).strip().lower() == "null":
+ return ""
+ else:
+ return str(temp_str).strip()
+
+ if property_key in self.payload_params:
+ temp_str = self.payload_params[property_key]
+ if temp_str != "" and temp_str is not None:
+ if str(temp_str).strip().lower() == "null":
+ return ""
+ else:
+ return str(temp_str).strip()
+
+ if critical:
+ raise ParameterNotFoundException("Cannot find the value of required parameter: %r" % property_key)
+
+ instance = None
+ """ :type : __CartridgeAgentConfiguration"""
+
+ # def __new__(cls, *args, **kwargs):
+ # if not CartridgeAgentConfiguration.instance:
+ # CartridgeAgentConfiguration.instance = CartridgeAgentConfiguration.__CartridgeAgentConfiguration()
+ #
+ # return CartridgeAgentConfiguration.instance
+
+ def __init__(self):
+ if not CartridgeAgentConfiguration.instance:
+ CartridgeAgentConfiguration.instance = CartridgeAgentConfiguration.__CartridgeAgentConfiguration()
+
+ def __getattr__(self, name):
+ return getattr(self.instance, name)
+
+ def __setattr__(self, name, value):
+ return setattr(self.instance, name, value)
+
+
+from ..exception.parameternotfoundexception import ParameterNotFoundException
+from ..util import cartridgeagentconstants
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/databridge/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/databridge/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/__init__.py
new file mode 100644
index 0000000..2456923
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/__init__.py
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py
new file mode 100644
index 0000000..1859d8a
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py
@@ -0,0 +1,202 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from thrift.publisher import *
+from ..util.log import *
+
+
+class StreamDefinition:
+ """
+ Represents a BAM/CEP stream definition
+ """
+
+ STRING = 'STRING'
+ DOUBLE = 'DOUBLE'
+ INT = 'INT'
+ LONG = 'LONG'
+ BOOL = 'BOOL'
+
+ def __init__(self):
+ self.name = None
+ """:type : str"""
+ self.version = None
+ """:type : str"""
+ self.nickname = None
+ """:type : str"""
+ self.description = None
+ """:type : str"""
+ self.meta_data = []
+ """:type : list[str]"""
+ self.correlation_data = []
+ """:type : list[str]"""
+ self.payload_data = []
+ """:type : list[str]"""
+ self.stream_id = None
+ """ :type : str """
+
+ def add_metadata_attribute(self, attr_name, attr_type):
+ self.meta_data.append({"name": attr_name, "type": attr_type})
+
+ def add_payloaddata_attribute(self, attr_name, attr_type):
+ self.payload_data.append({"name": attr_name, "type": attr_type})
+
+ def add_correlationdata_attribute(self, attr_name, attr_type):
+ self.correlation_data.append({"name": attr_name, "type": attr_type})
+
+ def __str__(self):
+ """
+ To string override
+ """
+
+ json_str = "{"
+ json_str += "\"name\":\"" + self.name + "\","
+ json_str += "\"version\":\"" + self.version + "\","
+ json_str += "\"nickName\":\"" + self.nickname + "\","
+ json_str += "\"description\":\"" + self.description + "\","
+
+ # add metadata attributes if exists
+ if len(self.meta_data) > 0:
+ json_str += "\"metaData\":["
+ for metadatum in self.meta_data:
+ json_str += "{\"name\":\"" + metadatum["name"] + "\", \"type\": \"" + metadatum["type"] + "\"},"
+
+ json_str = json_str[:-1] + "],"
+
+ # add correlationdata attributes if exists
+ if len(self.correlation_data) > 0:
+ json_str += "\"correlationData\":["
+ for coredatum in self.correlation_data:
+ json_str += "{\"name\":\"" + coredatum["name"] + "\", \"type\": \"" + coredatum["type"] + "\"},"
+
+ json_str = json_str[:-1] + "],"
+
+ # add payloaddata attributes if exists
+ if len(self.payload_data) > 0:
+ json_str += "\"payloadData\":["
+ for payloaddatum in self.payload_data:
+ json_str += "{\"name\":\"" + payloaddatum["name"] + "\", \"type\": \"" + payloaddatum["type"] + "\"},"
+
+ json_str = json_str[:-1] + "],"
+
+ json_str = json_str[:-1] + "}"
+
+ return json_str
+
+
+class ThriftEvent:
+ """
+ Represents an event to be published to a BAM/CEP monitoring server
+ """
+ def __init__(self):
+ self.metaData = []
+ """:type : list[str]"""
+ self.correlationData = []
+ """:type : list[str]"""
+ self.payloadData = []
+ """:type : list[str]"""
+
+
+class ThriftPublisher:
+ """
+ Handles publishing events to BAM/CEP through thrift using the provided address and credentials
+ """
+ log = LogFactory().get_log(__name__)
+
+ def __init__(self, ip, port, username, password, stream_definition):
+ """
+ Initializes a ThriftPublisher object.
+
+ At initialization a ThriftPublisher connects and defines a stream definition. A connection
+ should be disconnected after all the publishing has been done.
+
+ :param str ip: IP address of the monitoring server
+ :param str port: Port of the monitoring server
+ :param str username: Username
+ :param str password: Password
+ :param StreamDefinition stream_definition: StreamDefinition object for this particular connection
+ :return: ThriftPublisher object
+ :rtype: ThriftPublisher
+ """
+ try:
+ port_number = int(port)
+ except ValueError:
+ raise RuntimeError("Port number for Thrift Publisher is invalid: %r" % port)
+
+ self.__publisher = Publisher(ip, port_number)
+ self.__publisher.connect(username, password)
+ self.__publisher.defineStream(str(stream_definition))
+
+ self.stream_definition = stream_definition
+ self.stream_id = self.__publisher.streamId
+ self.ip = ip
+ self.port = port
+ self.username = username
+ self.password = password
+
+ def publish(self, event):
+ """
+ Publishes the given event by creating the event bundle from the log event
+
+ :param ThriftEvent event: The log event to be published
+ :return: void
+ """
+ event_bundler = EventBundle()
+ ThriftPublisher.assign_attributes(event.metaData, event_bundler)
+ ThriftPublisher.assign_attributes(event.correlationData, event_bundler)
+ ThriftPublisher.assign_attributes(event.payloadData, event_bundler)
+
+ self.__publisher.publish(event_bundler)
+ self.log.debug("Published event to thrift stream [%r]" % self.stream_id)
+
+ def disconnect(self):
+ """
+ Disconnect the thrift publisher
+ :return: void
+ """
+ self.__publisher.disconnect()
+
+ @staticmethod
+ def assign_attributes(attributes, event_bundler):
+ """
+ Adds the given attributes to the given event bundler according to type of each attribute
+ :param list attributes: attributes to be assigned
+ :param EventBundle event_bundler: Event bundle to assign attributes to
+ :return: void
+ """
+
+ # __intAttributeList = []
+ # __longAttributeList = []
+ # __doubleAttributeList = []
+ # __boolAttributeList = []
+ # __stringAttributeList = []
+
+ if attributes is not None and len(attributes) > 0:
+ for attrib in attributes:
+ if isinstance(attrib, int):
+ event_bundler.addIntAttribute(attrib)
+ elif isinstance(attrib, long):
+ event_bundler.addLongAttribute(attrib)
+ elif isinstance(attrib, float):
+ event_bundler.addDoubleAttribute(attrib)
+ elif isinstance(attrib, bool):
+ event_bundler.addBoolAttribute(attrib)
+ elif isinstance(attrib, str):
+ event_bundler.addStringAttribute(attrib)
+ else:
+ ThriftPublisher.log.error("Undefined attribute type: %r" % attrib)
+ else:
+ ThriftPublisher.log.debug("Empty attribute list")
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/__init__.py
new file mode 100644
index 0000000..2456923
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/__init__.py
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/__init__.py
new file mode 100644
index 0000000..adefd8e
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/__init__.py
@@ -0,0 +1 @@
+__all__ = ['ttypes', 'constants']
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/constants.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/constants.py b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/constants.py
new file mode 100644
index 0000000..36943ba
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/constants.py
@@ -0,0 +1,8 @@
+#
+# Autogenerated by Thrift Compiler (0.9.1)
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+# options string: py
+#
+