You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ra...@apache.org on 2015/08/31 22:12:36 UTC
stratos git commit: Adding Thrift test server to PCA integration tests
Repository: stratos
Updated Branches:
refs/heads/master 1a6da949f -> 7f634c5ee
Adding Thrift test server to PCA integration tests
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7f634c5e
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7f634c5e
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7f634c5e
Branch: refs/heads/master
Commit: 7f634c5ee9062fd2c37aeffd5bbb490c030a054a
Parents: 1a6da94
Author: Akila Perera <ra...@gmail.com>
Authored: Tue Sep 1 01:41:28 2015 +0530
Committer: Akila Perera <ra...@gmail.com>
Committed: Tue Sep 1 01:41:40 2015 +0530
----------------------------------------------------------------------
.../cartridge.agent/cartridge.agent/config.py | 13 +-
.../modules/databridge/thrift/publisher.py | 4 +-
.../thrift/thrift/transport/TSSLSocket.py | 357 ++++++++++---------
pom.xml | 16 +
.../python-cartridge-agent/integration/pom.xml | 62 +++-
.../test/ADCMTAppTenantUserTest.java | 6 +-
.../test/ADCMTAppTest.java | 6 +-
.../python.cartridge.agent/test/ADCTest.java | 6 +-
.../test/AgentStartupTest.java | 8 +-
.../test/DataPublisherTestUtil.java | 46 +++
.../test/PythonAgentTestManager.java | 86 +++--
.../test/ThriftTestServer.java | 213 +++++++++++
.../test/resources/common/client-truststore.jks | Bin 0 -> 37935 bytes
.../resources/common/data-bridge-config.xml | 75 ++++
.../src/test/resources/common/log4j.properties | 41 +++
.../common/stratos-health-stream-def.json | 1 +
.../resources/common/thrift-agent-config.xml | 64 ++++
.../src/test/resources/common/wso2carbon.jks | Bin 0 -> 33260 bytes
.../src/test/resources/log4j.properties | 41 ---
.../test-conf/integration-test.properties | 3 +-
20 files changed, 768 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
index fdf3880..983c4c4 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
@@ -18,11 +18,12 @@
import ConfigParser
import os
+from yapsy.PluginManager import PluginManager
+
from modules.util.log import LogFactory
from exception import ParameterNotFoundException
import constants
from plugins.contracts import ICartridgeAgentPlugin, IArtifactManagementPlugin, IHealthStatReaderPlugin
-from yapsy.PluginManager import PluginManager
class Config:
@@ -134,18 +135,18 @@ class Config:
:rtype: ConfigParser.SafeConfigParser()
"""
- conf_file_path = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "/agent.conf"
+ conf_file_path = os.path.abspath(os.path.dirname(__file__)) + "/agent.conf"
Config.log.debug("Config file path : %r" % conf_file_path)
properties = ConfigParser.SafeConfigParser()
properties.read(conf_file_path)
# set calculated values
- param_file = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "/payload/launch-params"
+ param_file = os.path.abspath(os.path.dirname(__file__)) + "/payload/launch-params"
properties.set("agent", constants.PARAM_FILE_PATH, param_file)
- plugins_dir = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "/plugins"
+ plugins_dir = os.path.abspath(os.path.dirname(__file__)) + "/plugins"
properties.set("agent", constants.PLUGINS_DIR, plugins_dir)
- plugins_dir = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "/extensions/py"
+ plugins_dir = os.path.abspath(os.path.dirname(__file__)) + "/extensions/py"
properties.set("agent", constants.EXTENSIONS_DIR, plugins_dir)
return properties
@@ -264,7 +265,7 @@ class Config:
Config.application_id = Config.read_property(constants.APPLICATION_ID)
Config.service_name = Config.read_property(constants.SERVICE_NAME)
Config.cluster_id = Config.read_property(constants.CLUSTER_ID)
- Config.ports = Config.read_property(constants.PORTS).replace("'","").split("|")
+ Config.ports = Config.read_property(constants.PORTS).replace("'", "").split("|")
Config.is_multiTenant = Config.read_property(constants.MULTITENANT)
Config.tenant_id = Config.read_property(constants.TENANT_ID)
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/thrift/publisher.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/thrift/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/thrift/publisher.py
index de96ced..e9c605f 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/thrift/publisher.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/thrift/publisher.py
@@ -21,7 +21,7 @@ sys.path.append("gen")
from gen.ThriftSecureEventTransmissionService import ThriftSecureEventTransmissionService
from gen.Data.ttypes import ThriftEventBundle
-
+from ...util.log import LogFactory
from thrift.transport import TSSLSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
@@ -30,6 +30,7 @@ from thrift.protocol import TBinaryProtocol
# Define publisher class
class Publisher:
client = None
+ log = LogFactory().get_log(__name__)
def __init__(self, ip, port, stream_definition):
# Make SSL socket
@@ -72,7 +73,6 @@ class Publisher:
class EventBundle:
-
def __init__(self):
self.__sessionId = ""
self.__eventNum = 0
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py
index df35be4..9bb9771 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py
@@ -26,189 +26,192 @@ from TTransport import TTransportException
class TSSLSocket(TSocket.TSocket):
- """
- SSL implementation of client-side TSocket
-
- This class creates outbound sockets wrapped using the
- python standard ssl module for encrypted connections.
-
- The protocol used is set using the class variable
- SSL_VERSION, which must be one of ssl.PROTOCOL_* and
- defaults to ssl.PROTOCOL_TLSv1 for greatest security.
- """
- SSL_VERSION = ssl.PROTOCOL_TLSv1
-
- def __init__(self,
- host='localhost',
- port=9090,
- validate=True,
- ca_certs=None,
- keyfile=None,
- certfile=None,
- unix_socket=None):
- """Create SSL TSocket
-
- @param validate: Set to False to disable SSL certificate validation
- @type validate: bool
- @param ca_certs: Filename to the Certificate Authority pem file, possibly a
- file downloaded from: http://curl.haxx.se/ca/cacert.pem This is passed to
- the ssl_wrap function as the 'ca_certs' parameter.
- @type ca_certs: str
- @param keyfile: The private key
- @type keyfile: str
- @param certfile: The cert file
- @type certfile: str
-
- Raises an IOError exception if validate is True and the ca_certs file is
- None, not present or unreadable.
"""
- self.validate = validate
- self.is_valid = False
- self.peercert = None
- if not validate:
- self.cert_reqs = ssl.CERT_NONE
- else:
- self.cert_reqs = ssl.CERT_REQUIRED
- self.ca_certs = ca_certs
- self.keyfile = keyfile
- self.certfile = certfile
- if validate:
- if ca_certs is None or not os.access(ca_certs, os.R_OK):
- raise IOError('Certificate Authority ca_certs file "%s" '
- 'is not readable, cannot validate SSL '
- 'certificates.' % (ca_certs))
- TSocket.TSocket.__init__(self, host, port, unix_socket)
-
- def open(self):
- try:
- res0 = self._resolveAddr()
- for res in res0:
- sock_family, sock_type = res[0:2]
- ip_port = res[4]
- plain_sock = socket.socket(sock_family, sock_type)
- self.handle = ssl.wrap_socket(plain_sock,
- ssl_version=self.SSL_VERSION,
- do_handshake_on_connect=True,
- ca_certs=self.ca_certs,
- keyfile=self.keyfile,
- certfile=self.certfile,
- cert_reqs=self.cert_reqs)
- self.handle.settimeout(self._timeout)
+ SSL implementation of client-side TSocket
+
+ This class creates outbound sockets wrapped using the
+ python standard ssl module for encrypted connections.
+
+ The protocol used is set using the class variable
+ SSL_VERSION, which must be one of ssl.PROTOCOL_* and
+ defaults to ssl.PROTOCOL_TLSv1 for greatest security.
+ """
+ SSL_VERSION = ssl.PROTOCOL_TLSv1
+ CIPHERS = "DEFAULT:!ECDH"
+
+ def __init__(self,
+ host='localhost',
+ port=9090,
+ validate=True,
+ ca_certs=None,
+ keyfile=None,
+ certfile=None,
+ unix_socket=None):
+ """Create SSL TSocket
+
+ @param validate: Set to False to disable SSL certificate validation
+ @type validate: bool
+ @param ca_certs: Filename to the Certificate Authority pem file, possibly a
+ file downloaded from: http://curl.haxx.se/ca/cacert.pem This is passed to
+ the ssl_wrap function as the 'ca_certs' parameter.
+ @type ca_certs: str
+ @param keyfile: The private key
+ @type keyfile: str
+ @param certfile: The cert file
+ @type certfile: str
+
+ Raises an IOError exception if validate is True and the ca_certs file is
+ None, not present or unreadable.
+ """
+ TSocket.TSocket.__init__(self, host, port, unix_socket)
+
+ self.validate = validate
+ self.is_valid = False
+ self.peercert = None
+ if not validate:
+ self.cert_reqs = ssl.CERT_NONE
+ else:
+ self.cert_reqs = ssl.CERT_REQUIRED
+ self.ca_certs = ca_certs
+ self.keyfile = keyfile
+ self.certfile = certfile
+ if validate:
+ if ca_certs is None or not os.access(ca_certs, os.R_OK):
+ raise IOError('Certificate Authority ca_certs file "%s" '
+ 'is not readable, cannot validate SSL '
+ 'certificates.' % (ca_certs))
+
+ def open(self):
try:
- self.handle.connect(ip_port)
+ res0 = self._resolveAddr()
+ for res in res0:
+ sock_family, sock_type = res[0:2]
+ ip_port = res[4]
+ plain_sock = socket.socket(sock_family, sock_type)
+ self.handle = ssl.wrap_socket(plain_sock,
+ ssl_version=TSSLSocket.SSL_VERSION,
+ do_handshake_on_connect=True,
+ ca_certs=self.ca_certs,
+ keyfile=self.keyfile,
+ certfile=self.certfile,
+ cert_reqs=self.cert_reqs,
+ ciphers=TSSLSocket.CIPHERS)
+ self.handle.settimeout(self._timeout)
+ try:
+ self.handle.connect(ip_port)
+ except socket.error, e:
+ if res is not res0[-1]:
+ continue
+ else:
+ raise e
+ break
except socket.error, e:
- if res is not res0[-1]:
- continue
- else:
- raise e
- break
- except socket.error, e:
- if self._unix_socket:
- message = 'Could not connect to secure socket %s: %s' \
- % (self._unix_socket, e)
- else:
- message = 'Could not connect to %s:%d: %s' % (self.host, self.port, e)
- raise TTransportException(type=TTransportException.NOT_OPEN,
- message=message)
- if self.validate:
- self._validate_cert()
-
- def _validate_cert(self):
- """internal method to validate the peer's SSL certificate, and to check the
- commonName of the certificate to ensure it matches the hostname we
- used to make this connection. Does not support subjectAltName records
- in certificates.
-
- raises TTransportException if the certificate fails validation.
- """
- cert = self.handle.getpeercert()
- self.peercert = cert
- if 'subject' not in cert:
- raise TTransportException(
- type=TTransportException.NOT_OPEN,
- message='No SSL certificate found from %s:%s' % (self.host, self.port))
- fields = cert['subject']
- for field in fields:
- # ensure structure we get back is what we expect
- if not isinstance(field, tuple):
- continue
- cert_pair = field[0]
- if len(cert_pair) < 2:
- continue
- cert_key, cert_value = cert_pair[0:2]
- if cert_key != 'commonName':
- continue
- certhost = cert_value
- # this check should be performed by some sort of Access Manager
- if certhost == self.host:
- # success, cert commonName matches desired hostname
- self.is_valid = True
- return
- else:
+ if self._unix_socket:
+ message = 'Could not connect to secure socket %s: %s' \
+ % (self._unix_socket, e)
+ else:
+ message = 'Could not connect to %s:%d: %s' % (self.host, self.port, e)
+ raise TTransportException(type=TTransportException.NOT_OPEN,
+ message=message)
+ if self.validate:
+ self._validate_cert()
+
+ def _validate_cert(self):
+ """internal method to validate the peer's SSL certificate, and to check the
+ commonName of the certificate to ensure it matches the hostname we
+ used to make this connection. Does not support subjectAltName records
+ in certificates.
+
+ raises TTransportException if the certificate fails validation.
+ """
+ cert = self.handle.getpeercert()
+ self.peercert = cert
+ if 'subject' not in cert:
+ raise TTransportException(
+ type=TTransportException.NOT_OPEN,
+ message='No SSL certificate found from %s:%s' % (self.host, self.port))
+ fields = cert['subject']
+ for field in fields:
+ # ensure structure we get back is what we expect
+ if not isinstance(field, tuple):
+ continue
+ cert_pair = field[0]
+ if len(cert_pair) < 2:
+ continue
+ cert_key, cert_value = cert_pair[0:2]
+ if cert_key != 'commonName':
+ continue
+ certhost = cert_value
+ # this check should be performed by some sort of Access Manager
+ if certhost == self.host:
+ # success, cert commonName matches desired hostname
+ self.is_valid = True
+ return
+ else:
+ raise TTransportException(
+ type=TTransportException.UNKNOWN,
+ message='Hostname we connected to "%s" doesn\'t match certificate '
+ 'provided commonName "%s"' % (self.host, certhost))
raise TTransportException(
- type=TTransportException.UNKNOWN,
- message='Hostname we connected to "%s" doesn\'t match certificate '
- 'provided commonName "%s"' % (self.host, certhost))
- raise TTransportException(
- type=TTransportException.UNKNOWN,
- message='Could not validate SSL certificate from '
- 'host "%s". Cert=%s' % (self.host, cert))
+ type=TTransportException.UNKNOWN,
+ message='Could not validate SSL certificate from '
+ 'host "%s". Cert=%s' % (self.host, cert))
class TSSLServerSocket(TSocket.TServerSocket):
- """SSL implementation of TServerSocket
-
- This uses the ssl module's wrap_socket() method to provide SSL
- negotiated encryption.
- """
- SSL_VERSION = ssl.PROTOCOL_TLSv1
-
- def __init__(self,
- host=None,
- port=9090,
- certfile='cert.pem',
- unix_socket=None):
- """Initialize a TSSLServerSocket
-
- @param certfile: filename of the server certificate, defaults to cert.pem
- @type certfile: str
- @param host: The hostname or IP to bind the listen socket to,
- i.e. 'localhost' for only allowing local network connections.
- Pass None to bind to all interfaces.
- @type host: str
- @param port: The port to listen on for inbound connections.
- @type port: int
- """
- self.setCertfile(certfile)
- TSocket.TServerSocket.__init__(self, host, port)
-
- def setCertfile(self, certfile):
- """Set or change the server certificate file used to wrap new connections.
+ """SSL implementation of TServerSocket
- @param certfile: The filename of the server certificate,
- i.e. '/etc/certs/server.pem'
- @type certfile: str
-
- Raises an IOError exception if the certfile is not present or unreadable.
+ This uses the ssl module's wrap_socket() method to provide SSL
+ negotiated encryption.
"""
- if not os.access(certfile, os.R_OK):
- raise IOError('No such certfile found: %s' % (certfile))
- self.certfile = certfile
-
- def accept(self):
- plain_client, addr = self.handle.accept()
- try:
- client = ssl.wrap_socket(plain_client, certfile=self.certfile,
- server_side=True, ssl_version=self.SSL_VERSION)
- except ssl.SSLError, ssl_exc:
- # failed handshake/ssl wrap, close socket to client
- plain_client.close()
- # raise ssl_exc
- # We can't raise the exception, because it kills most TServer derived
- # serve() methods.
- # Instead, return None, and let the TServer instance deal with it in
- # other exception handling. (but TSimpleServer dies anyway)
- return None
- result = TSocket.TSocket()
- result.setHandle(client)
- return result
+ SSL_VERSION = ssl.PROTOCOL_TLSv1
+
+ def __init__(self,
+ host=None,
+ port=9090,
+ certfile='cert.pem',
+ unix_socket=None):
+ """Initialize a TSSLServerSocket
+
+ @param certfile: filename of the server certificate, defaults to cert.pem
+ @type certfile: str
+ @param host: The hostname or IP to bind the listen socket to,
+ i.e. 'localhost' for only allowing local network connections.
+ Pass None to bind to all interfaces.
+ @type host: str
+ @param port: The port to listen on for inbound connections.
+ @type port: int
+ """
+ self.setCertfile(certfile)
+ TSocket.TServerSocket.__init__(self, host, port)
+
+ def setCertfile(self, certfile):
+ """Set or change the server certificate file used to wrap new connections.
+
+ @param certfile: The filename of the server certificate,
+ i.e. '/etc/certs/server.pem'
+ @type certfile: str
+
+ Raises an IOError exception if the certfile is not present or unreadable.
+ """
+ if not os.access(certfile, os.R_OK):
+ raise IOError('No such certfile found: %s' % (certfile))
+ self.certfile = certfile
+
+ def accept(self):
+ plain_client, addr = self.handle.accept()
+ try:
+ client = ssl.wrap_socket(plain_client, certfile=self.certfile,
+ server_side=True, ssl_version=self.SSL_VERSION)
+ except ssl.SSLError, ssl_exc:
+ # failed handshake/ssl wrap, close socket to client
+ plain_client.close()
+ # raise ssl_exc
+ # We can't raise the exception, because it kills most TServer derived
+ # serve() methods.
+ # Instead, return None, and let the TServer instance deal with it in
+ # other exception handling. (but TSimpleServer dies anyway)
+ return None
+ result = TSocket.TSocket()
+ result.setHandle(client)
+ return result
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0804613..e52aa6f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -449,6 +449,21 @@
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.wso2.carbon.analytics-common</groupId>
+ <artifactId>org.wso2.carbon.databridge.receiver.thrift</artifactId>
+ <version>${carbon.analytics.common.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon.analytics-common</groupId>
+ <artifactId>org.wso2.carbon.databridge.receiver.binary</artifactId>
+ <version>${carbon.analytics.common.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon.analytics-common</groupId>
+ <artifactId>org.wso2.carbon.databridge.core</artifactId>
+ <version>${carbon.analytics.common.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -678,6 +693,7 @@
<carbon.platform.patch.version.4.2.6>4.2.6</carbon.platform.patch.version.4.2.6>
<carbon.platform.patch.version.4.2.7>4.2.7</carbon.platform.patch.version.4.2.7>
<carbon.platform.patch.version.4.2.8>4.2.8</carbon.platform.patch.version.4.2.8>
+ <carbon.analytics.common.version>5.0.0</carbon.analytics.common.version>
<!-- 3rd party library versions -->
<synapse.wso2.feature.version>2.1.2-wso2v3</synapse.wso2.feature.version>
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/pom.xml
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/pom.xml b/products/python-cartridge-agent/integration/pom.xml
index b9583b1..946bf01 100755
--- a/products/python-cartridge-agent/integration/pom.xml
+++ b/products/python-cartridge-agent/integration/pom.xml
@@ -36,6 +36,9 @@
<directory>src/test/resources/test-conf</directory>
<filtering>true</filtering>
</resource>
+ <resource>
+ <directory>src/test/resources/common</directory>
+ </resource>
</resources>
</build>
@@ -158,42 +161,42 @@
<artifactId>commons-exec</artifactId>
<version>1.0.1</version>
</dependency>
- <dependency>
+ <!--dependency>
<groupId>org.apache.stratos</groupId>
<artifactId>org.apache.stratos.common</artifactId>
<version>${project.version}</version>
- </dependency>
+ </dependency-->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.1.1</version>
</dependency>
- <dependency>
+ <!--dependency>
<groupId>org.apache.axis2.wso2</groupId>
<artifactId>axis2-client</artifactId>
<version>${axis2.wso2.version}</version>
- </dependency>
- <dependency>
+ </dependency-->
+ <!--dependency>
<groupId>org.apache.httpcomponents.wso2</groupId>
<artifactId>httpcore</artifactId>
<version>4.3.0.wso2v1</version>
- </dependency>
- <dependency>
+ </dependency-->
+ <!--dependency>
<groupId>org.apache.httpcomponents.wso2</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.5.wso2v1</version>
- </dependency>
- <dependency>
+ </dependency-->
+ <!--dependency>
<groupId>org.apache.stratos</groupId>
<artifactId>org.apache.stratos.mock.iaas.client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- </dependency>
- <dependency>
+ </dependency-->
+ <!--dependency>
<groupId>org.wso2.andes.wso2</groupId>
<artifactId>andes-client</artifactId>
<version>0.13.wso2v8</version>
- </dependency>
+ </dependency-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
@@ -208,11 +211,40 @@
<groupId>org.apache.stratos</groupId>
<artifactId>org.apache.stratos.messaging</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.wso2.carbon</groupId>
+ <artifactId>org.wso2.carbon.databridge.commons</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.wso2.carbon</groupId>
+ <artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.wso2.carbon</groupId>
+ <artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon.analytics-common</groupId>
+ <artifactId>org.wso2.carbon.databridge.receiver.thrift</artifactId>
+ <version>${carbon.analytics.common.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon.analytics-common</groupId>
+ <artifactId>org.wso2.carbon.databridge.receiver.binary</artifactId>
+ <version>${carbon.analytics.common.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon.analytics-common</groupId>
+ <artifactId>org.wso2.carbon.databridge.core</artifactId>
+ <version>${carbon.analytics.common.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.2</version>
+ <groupId>org.wso2.carbon.analytics-common</groupId>
+ <artifactId>org.wso2.carbon.databridge.commons</artifactId>
+ <version>${carbon.analytics.common.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCMTAppTenantUserTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCMTAppTenantUserTest.java b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCMTAppTenantUserTest.java
index a604f55..07dd6b2 100644
--- a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCMTAppTenantUserTest.java
+++ b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCMTAppTenantUserTest.java
@@ -38,7 +38,7 @@ import static junit.framework.Assert.assertTrue;
public class ADCMTAppTenantUserTest extends PythonAgentTestManager {
private static final Log log = LogFactory.getLog(ADCMTAppTenantUserTest.class);
private static final int ADC_TIMEOUT = 180000;
- private static final String RESOURCES_PATH = "/suite-4";
+ private static final String SUITE_NAME = "suite-4";
private static final String APPLICATION_PATH = "/tmp/pca-test-suite-4";
private static final String CLUSTER_ID = "tomcat.domain";
private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-4";
@@ -56,10 +56,10 @@ public class ADCMTAppTenantUserTest extends PythonAgentTestManager {
@BeforeSuite
public void setupADCMTAppTest() {
// Set jndi.properties.dir system property for initializing event publishers and receivers
- System.setProperty("jndi.properties.dir", getResourcesPath(RESOURCES_PATH));
+ System.setProperty("jndi.properties.dir", getResourcesPath(SUITE_NAME));
// start Python agent with configurations provided in resource path
- setup(RESOURCES_PATH);
+ setup(SUITE_NAME);
// Simulate server socket
startServerSocket(8080);
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCMTAppTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCMTAppTest.java b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCMTAppTest.java
index 2a4c8b5..dbab83e 100644
--- a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCMTAppTest.java
+++ b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCMTAppTest.java
@@ -38,7 +38,7 @@ import static junit.framework.Assert.assertTrue;
public class ADCMTAppTest extends PythonAgentTestManager {
private static final Log log = LogFactory.getLog(ADCMTAppTest.class);
private static final int ADC_TIMEOUT = 180000;
- private static final String RESOURCES_PATH = "/suite-3";
+ private static final String SUITE_NAME = "suite-3";
private static final String APPLICATION_PATH = "/tmp/pca-test-suite-3";
private static final String CLUSTER_ID = "tomcat.domain";
private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-3";
@@ -56,10 +56,10 @@ public class ADCMTAppTest extends PythonAgentTestManager {
@BeforeSuite
public void setupADCMTAppTest() {
// Set jndi.properties.dir system property for initializing event publishers and receivers
- System.setProperty("jndi.properties.dir", getResourcesPath(RESOURCES_PATH));
+ System.setProperty("jndi.properties.dir", getResourcesPath(SUITE_NAME));
// start Python agent with configurations provided in resource path
- setup(RESOURCES_PATH);
+ setup(SUITE_NAME);
// Simulate server socket
startServerSocket(8080);
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCTest.java b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCTest.java
index de5be67..d1355e1 100755
--- a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCTest.java
+++ b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCTest.java
@@ -40,7 +40,7 @@ import static junit.framework.Assert.assertTrue;
public class ADCTest extends PythonAgentTestManager {
private static final Log log = LogFactory.getLog(ADCTest.class);
private static final int ADC_TIMEOUT = 180000;
- private static final String RESOURCES_PATH = "/suite-2";
+ private static final String SUITE_NAME = "suite-2";
private static final String APPLICATION_PATH = "/tmp/pca-test-suite-2";
private static final String CLUSTER_ID = "tomcat.domain";
private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-2";
@@ -58,10 +58,10 @@ public class ADCTest extends PythonAgentTestManager {
@BeforeSuite
public void setupADCTest() {
// Set jndi.properties.dir system property for initializing event publishers and receivers
- System.setProperty("jndi.properties.dir", getResourcesPath(RESOURCES_PATH));
+ System.setProperty("jndi.properties.dir", getResourcesPath(SUITE_NAME));
// start Python agent with configurations provided in resource path
- setup(RESOURCES_PATH);
+ setup(SUITE_NAME);
// Simulate server socket
startServerSocket(8080);
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/AgentStartupTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/AgentStartupTest.java b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/AgentStartupTest.java
index c08ab5c..615cd8e 100755
--- a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/AgentStartupTest.java
+++ b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/AgentStartupTest.java
@@ -35,8 +35,8 @@ import java.util.Properties;
public class AgentStartupTest extends PythonAgentTestManager {
private static final Log log = LogFactory.getLog(AgentStartupTest.class);
- private static final int STARTUP_TIMEOUT = 60000;
- private static final String RESOURCES_PATH = "/suite-1";
+ private static final int STARTUP_TIMEOUT = 120000;
+ private static final String SUITE_NAME = "suite-1";
private static final String CLUSTER_ID = "php.php.domain";
private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-1";
private static final String AUTOSCALING_POLICY_NAME = "autoscaling-policy-1";
@@ -52,10 +52,10 @@ public class AgentStartupTest extends PythonAgentTestManager {
@BeforeSuite
public void setupAgentStartupTest() {
// Set jndi.properties.dir system property for initializing event publishers and receivers
- System.setProperty("jndi.properties.dir", getResourcesPath(RESOURCES_PATH));
+ System.setProperty("jndi.properties.dir", getResourcesPath(SUITE_NAME));
// start Python agent with configurations provided in resource path
- setup(RESOURCES_PATH);
+ setup(SUITE_NAME);
// Simulate server socket
startServerSocket(8080);
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/DataPublisherTestUtil.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/DataPublisherTestUtil.java b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/DataPublisherTestUtil.java
new file mode 100644
index 0000000..7f1d985
--- /dev/null
+++ b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/DataPublisherTestUtil.java
@@ -0,0 +1,46 @@
+package org.apache.stratos.python.cartridge.agent.test;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+
+public class DataPublisherTestUtil {
+ public static final String LOCAL_HOST = "localhost";
+
+ public static void setTrustStoreParams() {
+ String trustStore = PythonAgentTestManager.getResourcesPath("common");
+ System.setProperty("javax.net.ssl.trustStore", trustStore + File.separator + "client-truststore.jks");
+ System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
+ }
+
+ public static void setKeyStoreParams() {
+ String keyStore = PythonAgentTestManager.getResourcesPath("common");
+ System.setProperty("Security.KeyStore.Location", keyStore + File.separator + "wso2carbon.jks");
+ System.setProperty("Security.KeyStore.Password", "wso2carbon");
+ }
+
+ public static String getDataAgentConfigPath() {
+ String filePath = PythonAgentTestManager.getResourcesPath("common");
+ return filePath + File.separator + "data-agent-config.xml";
+ }
+
+ public static String getDataBridgeConfigPath() {
+ String filePath = PythonAgentTestManager.getResourcesPath("common");
+ return filePath + File.separator + "data-bridge-config.xml";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonAgentTestManager.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonAgentTestManager.java b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonAgentTestManager.java
index 5ba6c6a..2394392 100644
--- a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonAgentTestManager.java
+++ b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonAgentTestManager.java
@@ -43,6 +43,7 @@ import java.util.zip.ZipInputStream;
public class PythonAgentTestManager {
protected final Properties integrationProperties = new Properties();
+ public static final String PATH_SEP = File.separator;
private static final Log log = LogFactory.getLog(PythonAgentTestManager.class);
protected BrokerService broker = new BrokerService();
@@ -51,6 +52,7 @@ public class PythonAgentTestManager {
public static final String ACTIVEMQ_AMQP_BIND_ADDRESS = "activemq.amqp.bind.address";
public static final String ACTIVEMQ_MQTT_BIND_ADDRESS = "activemq.mqtt.bind.address";
public static final String CEP_PORT = "cep.port";
+ public static final String CEP_SSL_PORT = "cep.ssl.port";
public static final String DISTRIBUTION_NAME = "distribution.name";
protected final UUID PYTHON_AGENT_DIR_NAME = UUID.randomUUID();
@@ -58,6 +60,7 @@ public class PythonAgentTestManager {
protected Map<String, Executor> executorList = new HashMap<String, Executor>();
protected int cepPort;
+ protected int cepSSLPort;
protected String amqpBindAddress;
protected String mqttBindAddress;
protected String distributionName;
@@ -68,6 +71,7 @@ public class PythonAgentTestManager {
protected boolean instanceStarted;
protected boolean instanceActivated;
protected ByteArrayOutputStreamLocal outputStream;
+ private ThriftTestServer thriftTestServer;
/**
* Setup method for test method testPythonCartridgeAgent
@@ -110,15 +114,33 @@ public class PythonAgentTestManager {
this.eventReceiverInitiated = true;
}
- // Simulate CEP Thrift server
- //startServerSocket(cepPort);
- // TODO: create a mock thrift server; sockets will not work with health stats publisher
+
+ // Start Thrift server to emulate CEP
+ thriftTestServer = new ThriftTestServer();
+ try {
+ File file = new File(getResourcesPath("common") + PATH_SEP + "stratos-health-stream-def.json");
+ FileInputStream fis = new FileInputStream(file);
+ byte[] data = new byte[(int) file.length()];
+ fis.read(data);
+ fis.close();
+ String str = new String(data, "UTF-8");
+ if (str.equals("")) {
+ log.warn("Stream definition of health stat stream is empty. Thrift server will not function properly");
+ }
+ thriftTestServer.addStreamDefinition(str, -1234);
+ // start with non-ssl port; test server will automatically bind to ssl port
+ thriftTestServer.start(cepPort);
+ log.info("Started Thrift server with stream definition: " + str);
+ }
+ catch (Exception e) {
+ log.error("Could not start Thrift test server", e);
+ }
+
+
String agentPath = setupPythonAgent(resourcePath);
log.info("Python agent working directory name: " + PYTHON_AGENT_DIR_NAME);
log.info("Starting python cartridge agent...");
- this.outputStream = executeCommand("python " + agentPath + "/agent.py > " +
- PythonAgentTestManager.class.getResource(File.separator).getPath() + "/../" + PYTHON_AGENT_DIR_NAME +
- "/cartridge-agent-console.log");
+ this.outputStream = executeCommand("python " + agentPath + PATH_SEP + "agent.py");
}
@@ -151,6 +173,14 @@ public class PythonAgentTestManager {
catch (IOException ignore) {
}
}
+ try {
+ if (thriftTestServer != null) {
+ thriftTestServer.stop();
+ }
+ }
+ catch (Exception e) {
+ log.error("Could not stop Thrift test server", e);
+ }
try {
log.info("Deleting source checkout folder...");
@@ -174,11 +204,12 @@ public class PythonAgentTestManager {
public PythonAgentTestManager() {
try {
integrationProperties
- .load(PythonAgentTestManager.class.getResourceAsStream("/integration-test.properties"));
+ .load(PythonAgentTestManager.class.getResourceAsStream(PATH_SEP + "integration-test.properties"));
distributionName = integrationProperties.getProperty(DISTRIBUTION_NAME);
amqpBindAddress = integrationProperties.getProperty(ACTIVEMQ_AMQP_BIND_ADDRESS);
mqttBindAddress = integrationProperties.getProperty(ACTIVEMQ_MQTT_BIND_ADDRESS);
cepPort = Integer.parseInt(integrationProperties.getProperty(CEP_PORT));
+ cepSSLPort = Integer.parseInt(integrationProperties.getProperty(CEP_SSL_PORT));
log.info("PCA integration properties: " + integrationProperties.toString());
}
catch (IOException e) {
@@ -192,8 +223,8 @@ public class PythonAgentTestManager {
broker.addConnector(mqttBindAddress);
broker.setBrokerName("testBroker");
broker.setDataDirectory(
- PythonAgentTestManager.class.getResource("/").getPath() + "/../" + PYTHON_AGENT_DIR_NAME +
- "/activemq-data");
+ PythonAgentTestManager.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." + PATH_SEP +
+ PYTHON_AGENT_DIR_NAME + PATH_SEP + "activemq-data");
broker.start();
log.info("Broker service started!");
}
@@ -215,7 +246,7 @@ public class PythonAgentTestManager {
log.error("ERROR found in PCA log", e);
}
}
- log.info(line);
+ log.info("[PCA] " + line);
}
}
sleep(100);
@@ -269,11 +300,13 @@ public class PythonAgentTestManager {
protected static String getResourcesPath() {
- return PythonAgentTestManager.class.getResource("/").getPath() + "/../../src/test/resources";
+ return PythonAgentTestManager.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." + PATH_SEP +
+ ".." + PATH_SEP + "src" + PATH_SEP + "test" + PATH_SEP + "resources";
}
protected static String getResourcesPath(String resourcesPath) {
- return PythonAgentTestManager.class.getResource("/").getPath() + "/../../src/test/resources" + resourcesPath;
+ return PythonAgentTestManager.class.getResource(PATH_SEP).getPath() + ".." + PATH_SEP + ".." +
+ PATH_SEP + "src" + PATH_SEP + "test" + PATH_SEP + "resources" + PATH_SEP + resourcesPath;
}
/**
@@ -286,29 +319,31 @@ public class PythonAgentTestManager {
log.info("Setting up python cartridge agent...");
- String srcAgentPath = PythonAgentTestManager.class.getResource("/").getPath() +
- "/../../../distribution/target/" + distributionName + ".zip";
+ String srcAgentPath = PythonAgentTestManager.class.getResource(PATH_SEP).getPath() +
+ PATH_SEP + ".." + PATH_SEP + ".." + PATH_SEP + ".." + PATH_SEP + "distribution" + PATH_SEP +
+ "target" + PATH_SEP + distributionName + ".zip";
String unzipDestPath =
- PythonAgentTestManager.class.getResource("/").getPath() + "/../" + PYTHON_AGENT_DIR_NAME + "/";
+ PythonAgentTestManager.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." + PATH_SEP +
+ PYTHON_AGENT_DIR_NAME + PATH_SEP;
//FileUtils.copyFile(new File(srcAgentPath), new File(destAgentPath));
unzip(srcAgentPath, unzipDestPath);
- String destAgentPath = PythonAgentTestManager.class.getResource("/").getPath() + "/../" +
- PYTHON_AGENT_DIR_NAME + "/" + distributionName;
+ String destAgentPath = PythonAgentTestManager.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." +
+ PATH_SEP + PYTHON_AGENT_DIR_NAME + PATH_SEP + distributionName;
- String srcAgentConfPath = getResourcesPath(resourcesPath) + "/agent.conf";
- String destAgentConfPath = destAgentPath + "/agent.conf";
+ String srcAgentConfPath = getResourcesPath(resourcesPath) + PATH_SEP + "agent.conf";
+ String destAgentConfPath = destAgentPath + PATH_SEP + "agent.conf";
FileUtils.copyFile(new File(srcAgentConfPath), new File(destAgentConfPath));
- String srcLoggingIniPath = getResourcesPath(resourcesPath) + "/logging.ini";
- String destLoggingIniPath = destAgentPath + "/logging.ini";
+ String srcLoggingIniPath = getResourcesPath(resourcesPath) + PATH_SEP + "logging.ini";
+ String destLoggingIniPath = destAgentPath + PATH_SEP + "logging.ini";
FileUtils.copyFile(new File(srcLoggingIniPath), new File(destLoggingIniPath));
- String srcPayloadPath = getResourcesPath(resourcesPath) + "/payload";
- String destPayloadPath = destAgentPath + "/payload";
+ String srcPayloadPath = getResourcesPath(resourcesPath) + PATH_SEP + "payload";
+ String destPayloadPath = destAgentPath + PATH_SEP + "payload";
FileUtils.copyDirectory(new File(srcPayloadPath), new File(destPayloadPath));
log.info("Changing extension scripts permissions");
- File extensionsPath = new File(destAgentPath + "/extensions/bash");
+ File extensionsPath = new File(destAgentPath + PATH_SEP + "extensions" + PATH_SEP + "bash");
File[] extensions = extensionsPath.listFiles();
for (File extension : extensions) {
extension.setExecutable(true);
@@ -371,7 +406,8 @@ public class PythonAgentTestManager {
DefaultExecutor exec = new DefaultExecutor();
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
exec.setWorkingDirectory(new File(
- PythonAgentTestManager.class.getResource("/").getPath() + "/../" + PYTHON_AGENT_DIR_NAME));
+ PythonAgentTestManager.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." + PATH_SEP +
+ PYTHON_AGENT_DIR_NAME));
exec.setStreamHandler(streamHandler);
ExecuteWatchdog watchdog = new ExecuteWatchdog(TIMEOUT);
exec.setWatchdog(watchdog);
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ThriftTestServer.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ThriftTestServer.java b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ThriftTestServer.java
new file mode 100644
index 0000000..aaa9ba0
--- /dev/null
+++ b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ThriftTestServer.java
@@ -0,0 +1,213 @@
+package org.apache.stratos.python.cartridge.agent.test;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.log4j.Logger;
+import org.wso2.carbon.databridge.commons.Credentials;
+import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
+import org.wso2.carbon.databridge.commons.utils.EventDefinitionConverterUtils;
+import org.wso2.carbon.databridge.core.AgentCallback;
+import org.wso2.carbon.databridge.core.DataBridge;
+import org.wso2.carbon.databridge.core.Utils.AgentSession;
+import org.wso2.carbon.databridge.core.definitionstore.InMemoryStreamDefinitionStore;
+import org.wso2.carbon.databridge.core.exception.DataBridgeException;
+import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
+import org.wso2.carbon.databridge.core.internal.authentication.AuthenticationHandler;
+import org.wso2.carbon.databridge.receiver.thrift.ThriftDataReceiver;
+import org.wso2.carbon.user.api.UserStoreException;
+
+import java.net.SocketException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ThriftTestServer {
+ Logger log = Logger.getLogger(ThriftTestServer.class);
+ ThriftDataReceiver thriftDataReceiver;
+ InMemoryStreamDefinitionStore streamDefinitionStore;
+ AtomicInteger numberOfEventsReceived;
+ RestarterThread restarterThread;
+
+ public void startTestServer() throws DataBridgeException, InterruptedException {
+ ThriftTestServer thriftTestServer = new ThriftTestServer();
+ thriftTestServer.start(7611);
+ Thread.sleep(100000000);
+ thriftTestServer.stop();
+ }
+
+
+ public void addStreamDefinition(StreamDefinition streamDefinition, int tenantId)
+ throws StreamDefinitionStoreException {
+ streamDefinitionStore.saveStreamDefinitionToStore(streamDefinition, tenantId);
+ }
+
+ public void addStreamDefinition(String streamDefinitionStr, int tenantId)
+ throws StreamDefinitionStoreException, MalformedStreamDefinitionException {
+ StreamDefinition streamDefinition = EventDefinitionConverterUtils.convertFromJson(streamDefinitionStr);
+ getStreamDefinitionStore().saveStreamDefinitionToStore(streamDefinition, tenantId);
+ }
+
+ private InMemoryStreamDefinitionStore getStreamDefinitionStore() {
+ if (streamDefinitionStore == null) {
+ streamDefinitionStore = new InMemoryStreamDefinitionStore();
+ }
+ return streamDefinitionStore;
+ }
+
+ public void start(int receiverPort) throws DataBridgeException {
+ DataPublisherTestUtil.setKeyStoreParams();
+ streamDefinitionStore = getStreamDefinitionStore();
+ numberOfEventsReceived = new AtomicInteger(0);
+ DataBridge databridge = new DataBridge(new AuthenticationHandler() {
+ @Override
+ public boolean authenticate(String userName,
+ String password) {
+ log.info("Thrift authentication returning true");
+ return true;// allays authenticate to true
+
+ }
+
+ @Override
+ public String getTenantDomain(String userName) {
+ return "admin";
+ }
+
+ @Override
+ public int getTenantId(String tenantDomain) throws UserStoreException {
+ return -1234;
+ }
+
+ @Override
+ public void initContext(AgentSession agentSession) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ log.info("Initializing Thrift agent context");
+ }
+
+ @Override
+ public void destroyContext(AgentSession agentSession) {
+
+ }
+ }, streamDefinitionStore, DataPublisherTestUtil.getDataBridgeConfigPath());
+
+ thriftDataReceiver = new ThriftDataReceiver(receiverPort, databridge);
+
+ databridge.subscribe(new AgentCallback() {
+ int totalSize = 0;
+
+ public void definedStream(StreamDefinition streamDefinition,
+ int tenantId) {
+ log.info("StreamDefinition " + streamDefinition);
+ }
+
+ @Override
+ public void removeStream(StreamDefinition streamDefinition, int tenantId) {
+ log.info("StreamDefinition remove " + streamDefinition);
+ }
+
+ @Override
+ public void receive(List<Event> eventList, Credentials credentials) {
+ numberOfEventsReceived.addAndGet(eventList.size());
+ log.info("Received events : " + numberOfEventsReceived);
+// log.info("eventListSize=" + eventList.size() + " eventList " + eventList + " for username " + credentials.getUsername());
+ }
+
+ });
+
+ String address = "localhost";
+ log.info("Test Server starting on " + address);
+ thriftDataReceiver.start(address);
+ log.info("Test Server Started");
+ }
+
+ public int getNumberOfEventsReceived() {
+ if (numberOfEventsReceived != null) {
+ return numberOfEventsReceived.get();
+ } else {
+ return 0;
+ }
+ }
+
+ public void resetReceivedEvents() {
+ numberOfEventsReceived.set(0);
+ }
+
+ public void stop() {
+ thriftDataReceiver.stop();
+ log.info("Test Server Stopped");
+ }
+
+ public void stopAndStartDuration(int port, long stopAfterTimeMilliSeconds, long startAfterTimeMS)
+ throws SocketException, DataBridgeException {
+ restarterThread = new RestarterThread(port, stopAfterTimeMilliSeconds, startAfterTimeMS);
+ Thread thread = new Thread(restarterThread);
+ thread.start();
+ }
+
+ public int getEventsReceivedBeforeLastRestart() {
+ return restarterThread.eventReceived;
+ }
+
+
+ class RestarterThread implements Runnable {
+ int eventReceived;
+ int port;
+
+ long stopAfterTimeMilliSeconds;
+ long startAfterTimeMS;
+
+ RestarterThread(int port, long stopAfterTime, long startAfterTime) {
+ this.port = port;
+ stopAfterTimeMilliSeconds = stopAfterTime;
+ startAfterTimeMS = startAfterTime;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(stopAfterTimeMilliSeconds);
+ }
+ catch (InterruptedException e) {
+ }
+ if (thriftDataReceiver != null) {
+ thriftDataReceiver.stop();
+ }
+
+ eventReceived = getNumberOfEventsReceived();
+
+ log.info("Number of events received in server shutdown :" + eventReceived);
+ try {
+ Thread.sleep(startAfterTimeMS);
+ }
+ catch (InterruptedException e) {
+ }
+
+ try {
+ if (thriftDataReceiver != null) {
+ thriftDataReceiver.start(DataPublisherTestUtil.LOCAL_HOST);
+ } else {
+ start(port);
+ }
+ }
+ catch (DataBridgeException e) {
+ log.error(e);
+ }
+
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/resources/common/client-truststore.jks
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/resources/common/client-truststore.jks b/products/python-cartridge-agent/integration/src/test/resources/common/client-truststore.jks
new file mode 100644
index 0000000..2d22c24
Binary files /dev/null and b/products/python-cartridge-agent/integration/src/test/resources/common/client-truststore.jks differ
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/resources/common/data-bridge-config.xml
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/resources/common/data-bridge-config.xml b/products/python-cartridge-agent/integration/src/test/resources/common/data-bridge-config.xml
new file mode 100644
index 0000000..13ca54c
--- /dev/null
+++ b/products/python-cartridge-agent/integration/src/test/resources/common/data-bridge-config.xml
@@ -0,0 +1,75 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<dataBridgeConfiguration>
+
+ <workerThreads>10</workerThreads>
+ <eventBufferCapacity>10000</eventBufferCapacity>
+ <clientTimeoutMin>30</clientTimeoutMin>
+
+ <dataReceiver name="Thrift">
+ <config name="tcpPort">7611</config>
+ <config name="sslPort">7711</config>
+ </dataReceiver>
+
+ <dataReceiver name="Binary">
+ <config name="tcpPort">9611</config>
+ <config name="sslPort">9711</config>
+ <config name="sslReceiverThreadPoolSize">100</config>
+ <config name="tcpReceiverThreadPoolSize">100</config>
+ </dataReceiver>
+
+ <!--<streamDefinitions>
+ <streamDefinition>
+ {
+ 'name':'org.wso2.esb.MediatorStatistics',
+ 'version':'1.3.0',
+ 'nickName': 'Stock Quote Information',
+ 'description': 'Some Desc',
+ 'metaData':[
+ {'name':'ipAdd','type':'STRING'}
+ ],
+ 'payloadData':[
+ {'name':'symbol','type':'STRING'},
+ {'name':'price','type':'DOUBLE'},
+ {'name':'volume','type':'INT'},
+ {'name':'max','type':'DOUBLE'},
+ {'name':'min','type':'Double'}
+ ]
+ }
+ </streamDefinition>
+ <streamDefinition domainName="wso2">
+ {
+ 'name':'org.wso2.esb.MediatorStatistics',
+ 'version':'1.3.4',
+ 'nickName': 'Stock Quote Information',
+ 'description': 'Some Other Desc',
+ 'metaData':[
+ {'name':'ipAdd','type':'STRING'}
+ ],
+ 'payloadData':[
+ {'name':'symbol','type':'STRING'},
+ {'name':'price','type':'DOUBLE'},
+ {'name':'volume','type':'INT'}
+ ]
+ }
+ </streamDefinition>
+ </streamDefinitions>-->
+
+</dataBridgeConfiguration>
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/resources/common/log4j.properties
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/resources/common/log4j.properties b/products/python-cartridge-agent/integration/src/test/resources/common/log4j.properties
new file mode 100755
index 0000000..c0c6e78
--- /dev/null
+++ b/products/python-cartridge-agent/integration/src/test/resources/common/log4j.properties
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set root logger level and appenders
+log4j.rootLogger=INFO, CONSOLE_APPENDER, FILE_APPENDER
+
+# CONSOLE_APPENDER is set to be a ConsoleAppender.
+log4j.appender.CONSOLE_APPENDER=org.apache.log4j.ConsoleAppender
+
+# The standard error log where all the warnings, errors and fatal errors will be logged
+log4j.appender.FILE_APPENDER=org.apache.log4j.FileAppender
+log4j.appender.FILE_APPENDER.File=cartridge-agent.log
+log4j.appender.FILE_APPENDER.layout=org.apache.log4j.PatternLayout
+log4j.appender.FILE_APPENDER.layout.ConversionPattern=%d{ISO8601} [%X{ip}-%X{host}] [%t] %5p %c{1} %m%n
+log4j.appender.FILE_APPENDER.threshold=DEBUG
+
+# CONSOLE_APPENDER uses PatternLayout.
+log4j.appender.CONSOLE_APPENDER.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE_APPENDER.layout.ConversionPattern=[%d{ISO8601}] %5p - [%c{1}] %m%n
+
+log4j.logger.org.apache.stratos.cartridge.agent=INFO
+log4j.logger.org.apache.stratos.messaging=INFO
+log4j.logger.org.apache.stratos.common.util=DEBUG
+log4j.logger.org.wso2.andes.client=ERROR
+log4j.logger.org.apache.activemq.jndi.ActiveMQInitialContextFactory=ERROR
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/resources/common/stratos-health-stream-def.json
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/resources/common/stratos-health-stream-def.json b/products/python-cartridge-agent/integration/src/test/resources/common/stratos-health-stream-def.json
new file mode 100644
index 0000000..4d36c19
--- /dev/null
+++ b/products/python-cartridge-agent/integration/src/test/resources/common/stratos-health-stream-def.json
@@ -0,0 +1 @@
+{"name":"cartridge_agent_health_stats","version":"1.0.0","nickName":"agent health stats","description":"agent health stats","payloadData":[{"name":"cluster_id", "type": "STRING"},{"name":"cluster_instance_id", "type": "STRING"},{"name":"network_partition_id", "type": "STRING"},{"name":"member_id", "type": "STRING"},{"name":"partition_id", "type": "STRING"},{"name":"health_description", "type": "STRING"},{"name":"value", "type": "DOUBLE"}]}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/resources/common/thrift-agent-config.xml
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/resources/common/thrift-agent-config.xml b/products/python-cartridge-agent/integration/src/test/resources/common/thrift-agent-config.xml
new file mode 100644
index 0000000..dbb2ba3
--- /dev/null
+++ b/products/python-cartridge-agent/integration/src/test/resources/common/thrift-agent-config.xml
@@ -0,0 +1,64 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<DataAgentsConfiguration>
+ <Agent>
+ <Name>Thrift</Name>
+ <DataEndpointClass>org.wso2.carbon.databridge.agent.endpoint.thrift.ThriftDataEndpoint</DataEndpointClass>
+ <!--<TrustSore>src/main/resources/client-truststore.jks</TrustSore>-->
+ <!--<TrustSorePassword>wso2carbon</TrustSorePassword>-->
+ <QueueSize>32768</QueueSize>
+ <BatchSize>200</BatchSize>
+ <CorePoolSize>5</CorePoolSize>
+ <MaxPoolSize>10</MaxPoolSize>
+ <SocketTimeoutMS>30000</SocketTimeoutMS>
+ <KeepAliveTimeInPool>20</KeepAliveTimeInPool>
+ <ReconnectionInterval>30</ReconnectionInterval>
+ <MaxTransportPoolSize>250</MaxTransportPoolSize>
+ <MaxIdleConnections>250</MaxIdleConnections>
+ <EvictionTimePeriod>5500</EvictionTimePeriod>
+ <MinIdleTimeInPool>5000</MinIdleTimeInPool>
+ <SecureMaxTransportPoolSize>250</SecureMaxTransportPoolSize>
+ <SecureMaxIdleConnections>250</SecureMaxIdleConnections>
+ <SecureEvictionTimePeriod>5500</SecureEvictionTimePeriod>
+ <SecureMinIdleTimeInPool>5000</SecureMinIdleTimeInPool>
+ </Agent>
+
+ <Agent>
+ <Name>Binary</Name>
+ <DataEndpointClass>org.wso2.carbon.databridge.agent.endpoint.binary.BinaryDataEndpoint</DataEndpointClass>
+ <!--<TrustSore>src/main/resources/client-truststore.jks</TrustSore>-->
+ <!--<TrustSorePassword>wso2carbon</TrustSorePassword>-->
+ <QueueSize>32768</QueueSize>
+ <BatchSize>200</BatchSize>
+ <CorePoolSize>5</CorePoolSize>
+ <MaxPoolSize>10</MaxPoolSize>
+ <SocketTimeoutMS>30000</SocketTimeoutMS>
+ <KeepAliveTimeInPool>20</KeepAliveTimeInPool>
+ <ReconnectionInterval>30</ReconnectionInterval>
+ <MaxTransportPoolSize>250</MaxTransportPoolSize>
+ <MaxIdleConnections>250</MaxIdleConnections>
+ <EvictionTimePeriod>5500</EvictionTimePeriod>
+ <MinIdleTimeInPool>5000</MinIdleTimeInPool>
+ <SecureMaxTransportPoolSize>250</SecureMaxTransportPoolSize>
+ <SecureMaxIdleConnections>250</SecureMaxIdleConnections>
+ <SecureEvictionTimePeriod>5500</SecureEvictionTimePeriod>
+ <SecureMinIdleTimeInPool>5000</SecureMinIdleTimeInPool>
+ </Agent>
+</DataAgentsConfiguration>
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/resources/common/wso2carbon.jks
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/resources/common/wso2carbon.jks b/products/python-cartridge-agent/integration/src/test/resources/common/wso2carbon.jks
new file mode 100644
index 0000000..7942c53
Binary files /dev/null and b/products/python-cartridge-agent/integration/src/test/resources/common/wso2carbon.jks differ
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/resources/log4j.properties b/products/python-cartridge-agent/integration/src/test/resources/log4j.properties
deleted file mode 100755
index c0c6e78..0000000
--- a/products/python-cartridge-agent/integration/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Set root logger level and appenders
-log4j.rootLogger=INFO, CONSOLE_APPENDER, FILE_APPENDER
-
-# CONSOLE_APPENDER is set to be a ConsoleAppender.
-log4j.appender.CONSOLE_APPENDER=org.apache.log4j.ConsoleAppender
-
-# The standard error log where all the warnings, errors and fatal errors will be logged
-log4j.appender.FILE_APPENDER=org.apache.log4j.FileAppender
-log4j.appender.FILE_APPENDER.File=cartridge-agent.log
-log4j.appender.FILE_APPENDER.layout=org.apache.log4j.PatternLayout
-log4j.appender.FILE_APPENDER.layout.ConversionPattern=%d{ISO8601} [%X{ip}-%X{host}] [%t] %5p %c{1} %m%n
-log4j.appender.FILE_APPENDER.threshold=DEBUG
-
-# CONSOLE_APPENDER uses PatternLayout.
-log4j.appender.CONSOLE_APPENDER.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE_APPENDER.layout.ConversionPattern=[%d{ISO8601}] %5p - [%c{1}] %m%n
-
-log4j.logger.org.apache.stratos.cartridge.agent=INFO
-log4j.logger.org.apache.stratos.messaging=INFO
-log4j.logger.org.apache.stratos.common.util=DEBUG
-log4j.logger.org.wso2.andes.client=ERROR
-log4j.logger.org.apache.activemq.jndi.ActiveMQInitialContextFactory=ERROR
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f634c5e/products/python-cartridge-agent/integration/src/test/resources/test-conf/integration-test.properties
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/integration/src/test/resources/test-conf/integration-test.properties b/products/python-cartridge-agent/integration/src/test/resources/test-conf/integration-test.properties
index 1242841..332441a 100755
--- a/products/python-cartridge-agent/integration/src/test/resources/test-conf/integration-test.properties
+++ b/products/python-cartridge-agent/integration/src/test/resources/test-conf/integration-test.properties
@@ -20,7 +20,8 @@ distribution.version=${project.version}
distribution.name=${python.cartridge.agent.distribution.name}-${project.version}
activemq.amqp.bind.address=tcp://localhost:61617
activemq.mqtt.bind.address=mqtt://localhost:1885
-cep.port=7712
+cep.port=7612
+cep.ssl.port=7712
stratos.endpoint=http://localhost:9763
stratos.admin.username=admin
stratos.admin.password=admin
\ No newline at end of file