You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/04/12 08:55:51 UTC

[05/18] camel git commit: Initial import of Camel Milo

Initial import of Camel Milo

Signed-off-by: Jens Reimann <jr...@redhat.com>


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7b3837fa
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7b3837fa
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7b3837fa

Branch: refs/heads/master
Commit: 7b3837fae7acf115885da3ea3958b79042240d26
Parents: 586f940
Author: Jens Reimann <jr...@redhat.com>
Authored: Mon Apr 10 15:48:30 2017 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Wed Apr 12 10:55:26 2017 +0200

----------------------------------------------------------------------
 components/camel-milo/pom.xml                   | 104 ++++
 .../camel-milo/src/main/docs/milo-client.adoc   | 168 ++++++
 .../camel-milo/src/main/docs/milo-server.adoc   | 111 ++++
 .../camel/component/milo/KeyStoreLoader.java    | 145 +++++
 .../apache/camel/component/milo/Messages.java   |  38 ++
 .../camel/component/milo/NamespaceId.java       |  83 +++
 .../camel/component/milo/PartialNodeId.java     | 119 ++++
 .../milo/client/MiloClientComponent.java        | 219 +++++++
 .../milo/client/MiloClientConfiguration.java    | 292 ++++++++++
 .../milo/client/MiloClientConnection.java       | 127 +++++
 .../milo/client/MiloClientConsumer.java         |  92 +++
 .../milo/client/MiloClientEndpoint.java         | 242 ++++++++
 .../client/MiloClientItemConfiguration.java     |  28 +
 .../milo/client/MiloClientProducer.java         |  63 ++
 .../client/internal/SubscriptionManager.java    | 570 +++++++++++++++++++
 .../component/milo/client/package-info.java     |  20 +
 .../milo/converter/ValueConverter.java          |  38 ++
 .../component/milo/converter/package-info.java  |   4 +
 .../camel/component/milo/package-info.java      |  20 +
 .../milo/server/MiloServerComponent.java        | 466 +++++++++++++++
 .../milo/server/MiloServerConsumer.java         |  78 +++
 .../milo/server/MiloServerEndpoint.java         | 106 ++++
 .../milo/server/MiloServerProducer.java         |  38 ++
 .../milo/server/internal/CamelNamespace.java    | 213 +++++++
 .../milo/server/internal/CamelServerItem.java   | 146 +++++
 .../component/milo/server/package-info.java     |  20 +
 .../src/main/resources/META-INF/LICENSE.txt     | 203 +++++++
 .../src/main/resources/META-INF/NOTICE.txt      |  11 +
 .../services/org/apache/camel/TypeConverter     |   1 +
 .../org/apache/camel/component/milo-client      |   1 +
 .../org/apache/camel/component/milo-server      |   1 +
 .../component/milo/AbstractMiloServerTest.java  |  90 +++
 .../MonitorItemMultiConnectionsCertTest.java    | 124 ++++
 .../milo/MonitorItemMultiConnectionsTest.java   |  99 ++++
 .../camel/component/milo/MonitorItemTest.java   |  89 +++
 .../apache/camel/component/milo/NodeIdTest.java | 100 ++++
 .../camel/component/milo/WriteClientTest.java   | 156 +++++
 .../component/milo/converter/ConverterTest.java |  49 ++
 .../component/milo/server/ServerLocalTest.java  |  83 +++
 .../server/ServerSetCertificateManagerTest.java |  51 ++
 .../server/ServerSetSecurityPoliciesTest.java   |  50 ++
 .../component/milo/testing/Application.java     |  72 +++
 .../milo/testing/Application2Client.java        |  59 ++
 .../milo/testing/Application2Server.java        |  72 +++
 .../camel-milo/src/test/resources/cert/Makefile |  16 +
 .../camel-milo/src/test/resources/cert/cert.ini |  13 +
 .../camel-milo/src/test/resources/cert/cert.p12 | Bin 0 -> 2461 bytes
 .../src/test/resources/cert/certificate.crt     |  20 +
 .../src/test/resources/cert/certificate.der     | Bin 0 -> 856 bytes
 .../src/test/resources/cert/privateKey.key      |  28 +
 .../src/test/resources/log4j2.properties        |  28 +
 components/pom.xml                              |   1 +
 parent/pom.xml                                  |   1 +
 .../camel-milo-starter/pom.xml                  |  51 ++
 .../src/main/resources/META-INF/LICENSE.txt     | 203 +++++++
 .../src/main/resources/META-INF/NOTICE.txt      |  11 +
 .../src/main/resources/META-INF/spring.provides |  18 +
 .../spring-boot/components-starter/pom.xml      |   1 +
 58 files changed, 5252 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-milo/pom.xml b/components/camel-milo/pom.xml
new file mode 100644
index 0000000..5c597ba
--- /dev/null
+++ b/components/camel-milo/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>components</artifactId>
+        <version>2.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-milo</artifactId>
+    <packaging>jar</packaging>
+    <name>Camel :: Milo</name>
+    <description>Camel OPC UA support</description>
+
+    <properties>
+        <camel.osgi.export.pkg>
+            !*.internal.*,
+            org.apache.camel.component.milo.*
+        </camel.osgi.export.pkg>
+        <camel.osgi.import.before.defaults>
+            com.google.common.*;version="19.0.0"
+        </camel.osgi.import.before.defaults>
+        <camel.osgi.export.service>
+            org.apache.camel.spi.ComponentResolver;component=milo-client,
+            org.apache.camel.spi.ComponentResolver;component=milo-server
+        </camel.osgi.export.service>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.eclipse.milo</groupId>
+            <artifactId>sdk-server</artifactId>
+            <version>${milo-version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.eclipse.milo</groupId>
+            <artifactId>sdk-client</artifactId>
+            <version>${milo-version}</version>
+        </dependency>
+
+        <!-- testing -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <!-- required due to issue eclipse/milo#23 -->
+                    <reuseForks>false</reuseForks>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/docs/milo-client.adoc
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/docs/milo-client.adoc b/components/camel-milo/src/main/docs/milo-client.adoc
new file mode 100644
index 0000000..60dfbcf
--- /dev/null
+++ b/components/camel-milo/src/main/docs/milo-client.adoc
@@ -0,0 +1,168 @@
+[[MiloClient-MiloClientComponent]]
+
+Milo Client Component
+~~~~~~~~~~~~~~~~~~~~~~~
+
+*Available as of Camel 2.18*
+
+The *Milo Client* component provides access to OPC UA servers using the
+http://eclipse.org/milo[Eclipse Milo\u2122] implementation.
+
+*Java 8*: This component requires Java 8 at runtime. 
+
+Maven users will need to add the following dependency to their `pom.xml`
+for this component:
+
+[source,xml]
+------------------------------------------------------------
+<dependency>
+    <groupId>de.dentrassi.camel</groupId>
+    <artifactId>de.dentrassi.camel.milo</artifactId>
+    <version><!-- your Apache Camel version--></version>
+</dependency>
+------------------------------------------------------------
+
+
+
+// component options: START
+The Milo based OPC UA Client component supports 5 options which are listed below.
+
+
+
+[width="100%",cols="2s,1m,8",options="header"]
+|=======================================================================
+| Name | Java Type | Description
+| defaultConfiguration | MiloClientConfiguration | All default options for client
+| applicationName | String | Default application name
+| applicationUri | String | Default application URI
+| productUri | String | Default product URI
+| reconnectTimeout | Long | Default reconnect timeout
+|=======================================================================
+// component options: END
+
+
+
+[[MiloClient-URIformat]]
+URI format
+^^^^^^^^^^
+
+The URI syntax of the endpoint is: 
+
+[source]
+------------------------
+milo-client:tcp://[user:password@]host:port/path/to/service?node=RAW(nsu=urn:foo:bar;s=item-1)
+------------------------
+
+If the server does not use a path, then it is possible to simply omit it:
+
+------------------------
+milo-client:tcp://[user:password@]host:port?node=RAW(nsu=urn:foo:bar;s=item-1)
+------------------------
+
+If no user credentials are provided the client will switch to anonymous mode.
+
+[[MiloClient-URIOptions]]
+URI options
+^^^^^^^^^^^
+
+All configuration options in the group +client+ are applicable to the shared client instance. Endpoints
+will share client instances for each endpoint URI. So the first time a request for that endpoint URI is
+made, the options of the +client+ group are applied. All further instances will be ignored.
+
+If you need alternate options for the same endpoint URI it is possible though to set the +clientId+ option
+which will by added internally to the endpoint URI in order to select a different shared connection instance.
+In other words, shared connections located by the combination of endpoint URI and client id.
+
+
+
+
+
+
+
+
+
+
+
+
+// endpoint options: START
+The Milo based OPC UA Client component supports 27 endpoint options which are listed below:
+
+[width="100%",cols="2s,1,1m,1m,5",options="header"]
+|=======================================================================
+| Name | Group | Default | Java Type | Description
+| endpointUri | common |  | String | *Required* The OPC UA server endpoint
+| clientId | common |  | String | A virtual client id to force the creation of a new connection instance
+| defaultAwaitWrites | common | false | boolean | Default await setting for writes
+| namespaceIndex | common |  | Integer | The namespace as numeric index deprecated
+| namespaceUri | common |  | String | The namespace as URI deprecated
+| node | common |  | ExpandedNodeId | The node definition (see Node ID)
+| nodeId | common |  | String | The node ID as string ID deprecated
+| samplingInterval | common |  | Double | The sampling interval in milliseconds
+| bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored.
+| exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored.
+| exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange
+| synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
+| applicationName | client | Apache Camel adapter for Eclipse Milo | String | The application name
+| applicationUri | client | http://camel.apache.org/EclipseMilo/Client | String | The application URI
+| channelLifetime | client |  | Long | Channel lifetime in milliseconds
+| keyAlias | client |  | String | The name of the key in the keystore file
+| keyPassword | client |  | String | The key password
+| keyStorePassword | client |  | String | The keystore password
+| keyStoreType | client |  | String | The key store type
+| keyStoreUrl | client |  | URL | The URL where the key should be loaded from
+| maxPendingPublishRequests | client |  | Long | The maximum number of pending publish requests
+| maxResponseMessageSize | client |  | Long | The maximum number of bytes a response message may have
+| productUri | client | http://camel.apache.org/EclipseMilo | String | The product URI
+| requestTimeout | client |  | Long | Request timeout in milliseconds
+| secureChannelReauthenticationEnabled | client |  | Boolean | Whether secure channel re-authentication is enabled
+| sessionName | client |  | String | Session name
+| sessionTimeout | client |  | Long | Session timeout in milliseconds
+|=======================================================================
+// endpoint options: END
+
+
+
+
+
+
+Node ID
++++++++
+
+
+In order to define a target node a namespace and node id is required. In previous versions this was possible by
+specifying `nodeId` and either `namespaceUri` or `namespaceIndex`. However this only allowed for using
+string based node IDs. And while this configuration is still possible, the newer one is preferred.
+
+The new approach is to specify a full namespace+node ID in the format `ns=1;i=1` which also allows to use the other
+node ID formats (like numeric, GUID/UUID or opaque). If the `node` parameter is used the older ones must not be used.
+The syntax of this node format is a set of `key=value` pairs delimited by a semi-colon (`;`). 
+
+Exactly one namespace and one node id key must be used. See the following table for possible keys:
+
+[width="100%",cols="2s,1,1m,1m,5",options="header"]
+|=======================================================================
+| Key | Type | Description
+| ns  | namespace | Numeric namespace index
+| nsu | namespace | Namespace URI
+| s   | node | String node ID
+| i   | node | Numeric node ID
+| g   | node | GUID/UUID node ID
+| b   | node | Base64 encoded string for opaque node ID
+|=======================================================================
+
+As the values generated by the syntax cannot be transparently encoded into a URI parameter value, it is necessary to escape
+them. However Camel allows to wrap the actual value inside `RAW(\u2026)`, which makes escaping unnecessary. For example:
+
+------------------------
+milo-client://user:password@localhost:12345?node=RAW(nsu=http://foo.bar;s=foo/bar)
+------------------------
+
+[[MiloClient-SeeAlso]]
+See Also
+^^^^^^^^
+
+* link:configuring-camel.html[Configuring Camel]
+* link:component.html[Component]
+* link:endpoint.html[Endpoint]
+* link:getting-started.html[Getting Started]
+

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/docs/milo-server.adoc
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/docs/milo-server.adoc b/components/camel-milo/src/main/docs/milo-server.adoc
new file mode 100644
index 0000000..3f699f6
--- /dev/null
+++ b/components/camel-milo/src/main/docs/milo-server.adoc
@@ -0,0 +1,111 @@
+[[MiloServer-OpcUaServerComponent]]
+
+Eclipse Milo Server Component
+~~~~~~~~~~~~~~~~~~~~~~~
+
+*Available as of Camel 2.18*
+
+The *Milo Server* component provides an OPC UA server using the
+http://eclipse.org/milo[Eclipse Milo\u2122] implementation.
+
+*Java 8*: This component requires Java 8 at runtime. 
+
+Maven users will need to add the following dependency to their `pom.xml`
+for this component:
+
+[source,xml]
+------------------------------------------------------------
+<dependency>
+    <groupId>de.dentrassi.camel</groupId>
+    <artifactId>de.dentrassi.camel.milo</artifactId>
+    <version><!-- your Apache Camel version--></version>
+</dependency>
+------------------------------------------------------------
+
+Messages sent to the endpoint from Camel will be available from the OPC UA server to OPC UA Clients.
+Value write requests from OPC UA Client will trigger messages which are sent into Apache Camel.
+
+
+
+
+// component options: START
+The OPC UA Server component supports 19 options which are listed below.
+
+
+
+[width="100%",cols="2s,1m,8",options="header"]
+|=======================================================================
+| Name | Java Type | Description
+| namespaceUri | String | The URI of the namespace defaults to urn:org:apache:camel
+| applicationName | String | The application name
+| applicationUri | String | The application URI
+| productUri | String | The product URI
+| bindPort | int | The TCP port the server binds to
+| strictEndpointUrlsEnabled | boolean | Set whether strict endpoint URLs are enforced
+| serverName | String | Server name
+| hostname | String | Server hostname
+| securityPolicies | Set | Security policies
+| securityPoliciesById | String> | Security policies by URI or name
+| userAuthenticationCredentials | String | Set user password combinations in the form of user1:pwd1user2:pwd2 Usernames and passwords will be URL decoded
+| enableAnonymousAuthentication | boolean | Enable anonymous authentication disabled by default
+| bindAddresses | String | Set the addresses of the local addresses the server should bind to
+| buildInfo | BuildInfo | Server build info
+| serverCertificate | Result | Server certificate
+| certificateManager | CertificateManager | Server certificate manager
+| certificateValidator | CertificateValidator> | Validator for client certificates
+| defaultCertificateValidator | File | Validator for client certificates using default file based approach
+| defaultCertificateExistingValidator | File | Validator for client certificates using default file based approach
+|=======================================================================
+// component options: END
+
+
+
+
+
+
+
+
+
+[[MiloServer-URIformat]]
+URI format
+^^^^^^^^^^
+
+[source,java]
+------------------------
+milo-server:itemId[?options]
+------------------------
+
+[[Milo-URIOptions]]
+URI options
+^^^^^^^^^^^
+
+
+
+// endpoint options: START
+The OPC UA Server component supports 5 endpoint options which are listed below:
+
+[width="100%",cols="2s,1,1m,1m,5",options="header"]
+|=======================================================================
+| Name | Group | Default | Java Type | Description
+| itemId | common |  | String | *Required* ID of the item
+| bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored.
+| exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored.
+| exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange
+| synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
+|=======================================================================
+// endpoint options: END
+
+
+
+
+
+
+[[MiloServer-SeeAlso]]
+See Also
+^^^^^^^^
+
+* link:configuring-camel.html[Configuring Camel]
+* link:component.html[Component]
+* link:endpoint.html[Endpoint]
+* link:getting-started.html[Getting Started]
+

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/java/org/apache/camel/component/milo/KeyStoreLoader.java
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/KeyStoreLoader.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/KeyStoreLoader.java
new file mode 100644
index 0000000..1df24ee
--- /dev/null
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/KeyStoreLoader.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright (C) 2016 Jens Reimann <jr...@redhat.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.milo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.GeneralSecurityException;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.NoSuchElementException;
+
+public class KeyStoreLoader {
+	public static final String DEFAULT_KEY_STORE_TYPE = "PKCS12";
+
+	private String type = DEFAULT_KEY_STORE_TYPE;
+	private URL url;
+	private String keyStorePassword;
+	private String keyPassword;
+	private String keyAlias;
+
+	public static class Result {
+
+		private final X509Certificate certificate;
+		private final KeyPair keyPair;
+
+		public Result(final X509Certificate certificate, final KeyPair keyPair) {
+			this.certificate = certificate;
+			this.keyPair = keyPair;
+		}
+
+		public X509Certificate getCertificate() {
+			return this.certificate;
+		}
+
+		public KeyPair getKeyPair() {
+			return this.keyPair;
+		}
+	}
+
+	public KeyStoreLoader() {
+	}
+
+	public void setType(final String type) {
+		this.type = type != null ? type : DEFAULT_KEY_STORE_TYPE;
+	}
+
+	public String getType() {
+		return this.type;
+	}
+
+	public void setUrl(final URL url) {
+		this.url = url;
+	}
+
+	public URL getUrl() {
+		return this.url;
+	}
+
+	public void setUrl(final String url) throws MalformedURLException {
+		this.url = new URL(url);
+	}
+
+	public void setKeyStorePassword(final String keyStorePassword) {
+		this.keyStorePassword = keyStorePassword;
+	}
+
+	public String getKeyStorePassword() {
+		return this.keyStorePassword;
+	}
+
+	public void setKeyPassword(final String keyPassword) {
+		this.keyPassword = keyPassword;
+	}
+
+	public String getKeyPassword() {
+		return this.keyPassword;
+	}
+
+	public void setKeyAlias(final String keyAlias) {
+		this.keyAlias = keyAlias;
+	}
+
+	public String getKeyAlias() {
+		return this.keyAlias;
+	}
+
+	public Result load() throws GeneralSecurityException, IOException {
+
+		final KeyStore keyStore = KeyStore.getInstance(this.type);
+
+		try (InputStream stream = this.url.openStream()) {
+			keyStore.load(stream, this.keyStorePassword != null ? this.keyStorePassword.toCharArray() : null);
+		}
+
+		String effectiveKeyAlias = this.keyAlias;
+
+		if (effectiveKeyAlias == null) {
+			if (keyStore.size() != 1) {
+				throw new IllegalArgumentException(
+						"Key store contains more than one key. The use of the 'keyAlias' parameter is required.");
+			}
+			try {
+				effectiveKeyAlias = keyStore.aliases().nextElement();
+			} catch (final NoSuchElementException e) {
+				throw new RuntimeException("Failed to enumerate key alias", e);
+			}
+		}
+
+		final Key privateKey = keyStore.getKey(effectiveKeyAlias,
+				this.keyPassword != null ? this.keyPassword.toCharArray() : null);
+
+		if (privateKey instanceof PrivateKey) {
+			final X509Certificate certificate = (X509Certificate) keyStore.getCertificate(effectiveKeyAlias);
+			if (certificate == null) {
+				return null;
+			}
+
+			final PublicKey publicKey = certificate.getPublicKey();
+			final KeyPair keyPair = new KeyPair(publicKey, (PrivateKey) privateKey);
+			return new Result(certificate, keyPair);
+		}
+
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/java/org/apache/camel/component/milo/Messages.java
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/Messages.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/Messages.java
new file mode 100644
index 0000000..8383d7c
--- /dev/null
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/Messages.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (C) 2016 Jens Reimann <jr...@redhat.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.milo;
+
+import org.apache.camel.impl.DefaultMessage;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+
+public final class Messages {
+	private Messages() {
+	}
+
+	/**
+	 * Fill a Message from a DataValue
+	 *
+	 * @param value
+	 *            the value to feed from
+	 * @param result
+	 *            the result to feed to
+	 */
+	public static void fillFromDataValue(final DataValue value, final DefaultMessage result) {
+		result.setBody(value);
+		result.setFault(value.getStatusCode().isBad());
+	}
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/java/org/apache/camel/component/milo/NamespaceId.java
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/NamespaceId.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/NamespaceId.java
new file mode 100644
index 0000000..f16bbdc
--- /dev/null
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/NamespaceId.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2016 Jens Reimann <jr...@redhat.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.milo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.Serializable;
+
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UShort;
+
+public class NamespaceId {
+	private final String uri;
+	private final UShort numeric;
+
+	public NamespaceId(final String uri) {
+		requireNonNull(uri);
+
+		this.uri = uri;
+		this.numeric = null;
+	}
+
+	public NamespaceId(final UShort numeric) {
+		requireNonNull(numeric);
+
+		this.uri = null;
+		this.numeric = numeric;
+	}
+
+	public String getUri() {
+		return this.uri;
+	}
+
+	public UShort getNumeric() {
+		return this.numeric;
+	}
+
+	public boolean isNumeric() {
+		return this.numeric != null;
+	}
+
+	@Override
+	public String toString() {
+		if (isNumeric()) {
+			return String.format("[Namespace - numeric: %s]", this.numeric);
+		} else {
+			return String.format("[Namespace - URI: %s]", this.uri);
+		}
+	}
+
+	public Serializable getValue() {
+		return this.uri != null ? this.uri : this.numeric;
+	}
+
+	public static NamespaceId fromExpandedNodeId(final ExpandedNodeId id) {
+		if (id == null) {
+			return null;
+		}
+
+		if (id.getNamespaceUri() != null) {
+			return new NamespaceId(id.getNamespaceUri());
+		}
+		if (id.getNamespaceIndex() != null) {
+			return new NamespaceId(id.getNamespaceIndex());
+		}
+
+		throw new IllegalStateException(String.format("Unknown namespace type"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/java/org/apache/camel/component/milo/PartialNodeId.java
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/PartialNodeId.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/PartialNodeId.java
new file mode 100644
index 0000000..0f3defd
--- /dev/null
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/PartialNodeId.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright (C) 2016 Jens Reimann <jr...@redhat.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.milo;
+
+import static java.util.Objects.requireNonNull;
+import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.ushort;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+import org.eclipse.milo.opcua.stack.core.types.builtin.ByteString;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UShort;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.IdType;
+
+import com.google.common.base.MoreObjects;
+
+public class PartialNodeId {
+
+	private IdType type;
+
+	private final Serializable id;
+
+	public PartialNodeId(final int id) {
+		this(uint(id));
+	}
+
+	public PartialNodeId(final UInteger id) {
+		requireNonNull(id);
+		this.id = id;
+	}
+
+	public PartialNodeId(final String id) {
+		requireNonNull(id);
+		this.id = id;
+	}
+
+	public PartialNodeId(final UUID id) {
+		requireNonNull(id);
+		this.id = id;
+	}
+
+	public PartialNodeId(final ByteString id) {
+		requireNonNull(id);
+		this.id = id;
+	}
+
+	public NodeId toNodeId(final int namespaceIndex) {
+		if (this.id instanceof String) {
+			return new NodeId(namespaceIndex, (String) this.id);
+		} else if (this.id instanceof UInteger) {
+			return new NodeId(ushort(namespaceIndex), (UInteger) this.id);
+		} else if (this.id instanceof ByteString) {
+			return new NodeId(namespaceIndex, (ByteString) this.id);
+		} else if (this.id instanceof UUID) {
+			return new NodeId(namespaceIndex, (UUID) this.id);
+		}
+		throw new IllegalStateException("Invalid id type: " + this.id);
+	}
+
+	public NodeId toNodeId(final UShort namespaceIndex) {
+		if (this.id instanceof String) {
+			return new NodeId(namespaceIndex, (String) this.id);
+		} else if (this.id instanceof UInteger) {
+			return new NodeId(namespaceIndex, (UInteger) this.id);
+		} else if (this.id instanceof ByteString) {
+			return new NodeId(namespaceIndex, (ByteString) this.id);
+		} else if (this.id instanceof UUID) {
+			return new NodeId(namespaceIndex, (UUID) this.id);
+		}
+		throw new IllegalStateException("Invalid id type: " + this.id);
+	}
+
+	@Override
+	public String toString() {
+		return MoreObjects.toStringHelper(this).add("type", this.type).add("id", this.id).toString();
+	}
+
+	public Serializable getValue() {
+		return this.id;
+	}
+
+	public static PartialNodeId fromExpandedNodeId(final ExpandedNodeId node) {
+		if (node == null) {
+			return null;
+		}
+
+		final Object value = node.getIdentifier();
+
+		if (value instanceof String) {
+			return new PartialNodeId((String) value);
+		} else if (value instanceof UInteger) {
+			return new PartialNodeId((UInteger) value);
+		} else if (value instanceof UUID) {
+			return new PartialNodeId((UUID) value);
+		} else if (value instanceof ByteString) {
+			return new PartialNodeId((ByteString) value);
+		}
+
+		throw new IllegalStateException(String.format("Unknown node id type: " + value));
+	}
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientComponent.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientComponent.java
new file mode 100644
index 0000000..2824e3e
--- /dev/null
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientComponent.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright (C) 2016 Jens Reimann <jr...@redhat.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.milo.client;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.milo.KeyStoreLoader;
+import org.apache.camel.component.milo.KeyStoreLoader.Result;
+import org.apache.camel.impl.DefaultComponent;
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+public class MiloClientComponent extends DefaultComponent {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MiloClientComponent.class);
+
+	private final Map<String, MiloClientConnection> cache = new HashMap<>();
+	private final Multimap<String, MiloClientEndpoint> connectionMap = HashMultimap.create();
+
+	private MiloClientConfiguration defaultConfiguration = new MiloClientConfiguration();
+
+	@Override
+	protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters)
+			throws Exception {
+
+		final MiloClientConfiguration configuration = new MiloClientConfiguration(this.defaultConfiguration);
+		configuration.setEndpointUri(remaining);
+		setProperties(configuration, parameters);
+
+		return createEndpoint(uri, configuration, parameters);
+	}
+
+	private synchronized MiloClientEndpoint createEndpoint(final String uri,
+			final MiloClientConfiguration configuration, final Map<String, Object> parameters) throws Exception {
+
+		MiloClientConnection connection = this.cache.get(configuration.toCacheId());
+
+		if (connection == null) {
+			LOG.info("Cache miss - creating new connection instance: {}", configuration.toCacheId());
+
+			connection = new MiloClientConnection(configuration, mapToClientConfiguration(configuration));
+			this.cache.put(configuration.toCacheId(), connection);
+		}
+
+		final MiloClientEndpoint endpoint = new MiloClientEndpoint(uri, this, connection,
+				configuration.getEndpointUri());
+
+		setProperties(endpoint, parameters);
+
+		// register connection with endpoint
+
+		this.connectionMap.put(configuration.toCacheId(), endpoint);
+
+		return endpoint;
+	}
+
+	private OpcUaClientConfigBuilder mapToClientConfiguration(final MiloClientConfiguration configuration) {
+		final OpcUaClientConfigBuilder builder = new OpcUaClientConfigBuilder();
+
+		whenHasText(configuration::getApplicationName,
+				value -> builder.setApplicationName(LocalizedText.english(value)));
+		whenHasText(configuration::getApplicationUri, builder::setApplicationUri);
+		whenHasText(configuration::getProductUri, builder::setProductUri);
+
+		if (configuration.getRequestTimeout() != null) {
+			builder.setRequestTimeout(Unsigned.uint(configuration.getRequestTimeout()));
+		}
+		if (configuration.getChannelLifetime() != null) {
+			builder.setChannelLifetime(Unsigned.uint(configuration.getChannelLifetime()));
+		}
+
+		whenHasText(configuration::getSessionName, value -> builder.setSessionName(() -> value));
+		if (configuration.getSessionTimeout() != null) {
+			builder.setSessionTimeout(UInteger.valueOf(configuration.getSessionTimeout()));
+		}
+
+		if (configuration.getMaxPendingPublishRequests() != null) {
+			builder.setMaxPendingPublishRequests(UInteger.valueOf(configuration.getMaxPendingPublishRequests()));
+		}
+
+		if (configuration.getMaxResponseMessageSize() != null) {
+			builder.setMaxResponseMessageSize(UInteger.valueOf(configuration.getMaxPendingPublishRequests()));
+		}
+
+		if (configuration.getSecureChannelReauthenticationEnabled() != null) {
+			builder.setSecureChannelReauthenticationEnabled(configuration.getSecureChannelReauthenticationEnabled());
+		}
+
+		if (configuration.getKeyStoreUrl() != null) {
+			setKey(configuration, builder);
+		}
+
+		return builder;
+	}
+
+	private void setKey(final MiloClientConfiguration configuration, final OpcUaClientConfigBuilder builder) {
+		final KeyStoreLoader loader = new KeyStoreLoader();
+
+		final Result result;
+		try {
+			// key store properties
+			loader.setType(configuration.getKeyStoreType());
+			loader.setUrl(configuration.getKeyStoreUrl());
+			loader.setKeyStorePassword(configuration.getKeyStorePassword());
+
+			// key properties
+			loader.setKeyAlias(configuration.getKeyAlias());
+			loader.setKeyPassword(configuration.getKeyPassword());
+
+			result = loader.load();
+		} catch (GeneralSecurityException | IOException e) {
+			throw new IllegalStateException("Failed to load key", e);
+		}
+
+		if (result == null) {
+			throw new IllegalStateException("Key not found in keystore");
+		}
+
+		builder.setCertificate(result.getCertificate());
+		builder.setKeyPair(result.getKeyPair());
+	}
+
+	private void whenHasText(final Supplier<String> valueSupplier, final Consumer<String> valueConsumer) {
+		final String value = valueSupplier.get();
+		if (value != null && !value.isEmpty()) {
+			valueConsumer.accept(value);
+		}
+	}
+
+	/**
+	 * All default options for client
+	 */
+	public void setDefaultConfiguration(final MiloClientConfiguration defaultConfiguration) {
+		this.defaultConfiguration = defaultConfiguration;
+	}
+
+	/**
+	 * Default application name
+	 */
+	public void setApplicationName(final String applicationName) {
+		this.defaultConfiguration.setApplicationName(applicationName);
+	}
+
+	/**
+	 * Default application URI
+	 */
+	public void setApplicationUri(final String applicationUri) {
+		this.defaultConfiguration.setApplicationUri(applicationUri);
+	}
+
+	/**
+	 * Default product URI
+	 */
+	public void setProductUri(final String productUri) {
+		this.defaultConfiguration.setProductUri(productUri);
+	}
+
+	/**
+	 * Default reconnect timeout
+	 */
+	public void setReconnectTimeout(final Long reconnectTimeout) {
+		this.defaultConfiguration.setRequestTimeout(reconnectTimeout);
+	}
+
+	public synchronized void disposed(final MiloClientEndpoint endpoint) {
+
+		final MiloClientConnection connection = endpoint.getConnection();
+
+		// unregister usage of connection
+
+		this.connectionMap.remove(connection.getConnectionId(), endpoint);
+
+		// test if this was the last endpoint using this connection
+
+		if (!this.connectionMap.containsKey(connection.getConnectionId())) {
+
+			// this was the last endpoint using the connection ...
+
+			// ... remove from the cache
+
+			this.cache.remove(connection.getConnectionId());
+
+			// ... and close
+
+			try {
+				connection.close();
+			} catch (final Exception e) {
+				LOG.warn("Failed to close connection", e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConfiguration.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConfiguration.java
new file mode 100644
index 0000000..cf9905e
--- /dev/null
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConfiguration.java
@@ -0,0 +1,292 @@
+/*
+ * Copyright (C) 2016 Jens Reimann <jr...@redhat.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.milo.client;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.apache.camel.component.milo.KeyStoreLoader;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+@UriParams
+public class MiloClientConfiguration implements Cloneable {
+
+	private static final String DEFAULT_APPLICATION_URI = "http://camel.apache.org/EclipseMilo/Client";
+
+	private static final String DEFAULT_APPLICATION_NAME = "Apache Camel adapter for Eclipse Milo";
+
+	private static final String DEFAULT_PRODUCT_URI = "http://camel.apache.org/EclipseMilo";
+
+	private String endpointUri;
+
+	@UriParam
+	private String clientId;
+
+	@UriParam(label = "client", defaultValue = DEFAULT_APPLICATION_NAME)
+	private String applicationName = DEFAULT_APPLICATION_NAME;
+
+	@UriParam(label = "client", defaultValue = DEFAULT_APPLICATION_URI)
+	private String applicationUri = DEFAULT_APPLICATION_URI;
+
+	@UriParam(label = "client", defaultValue = DEFAULT_PRODUCT_URI)
+	private String productUri = DEFAULT_PRODUCT_URI;
+
+	@UriParam(label = "client")
+	private Long requestTimeout;
+
+	@UriParam(label = "client")
+	private Long channelLifetime;
+
+	@UriParam(label = "client")
+	private String sessionName;
+
+	@UriParam(label = "client")
+	private Long sessionTimeout;
+
+	@UriParam(label = "client")
+	private Long maxPendingPublishRequests;
+
+	@UriParam(label = "client")
+	private Long maxResponseMessageSize;
+
+	@UriParam(label = "client")
+	private Boolean secureChannelReauthenticationEnabled;
+
+	@UriParam(label = "client")
+	private URL keyStoreUrl;
+
+	@UriParam(label = "client")
+	private String keyStoreType = KeyStoreLoader.DEFAULT_KEY_STORE_TYPE;
+
+	@UriParam(label = "client")
+	private String keyAlias;
+
+	@UriParam(label = "client")
+	private String keyStorePassword;
+
+	@UriParam(label = "client")
+	private String keyPassword;
+
+	public MiloClientConfiguration() {
+	}
+
+	public MiloClientConfiguration(final MiloClientConfiguration other) {
+		this.clientId = other.clientId;
+		this.endpointUri = other.endpointUri;
+		this.applicationName = other.applicationName;
+		this.productUri = other.productUri;
+		this.requestTimeout = other.requestTimeout;
+	}
+
+	public void setEndpointUri(final String endpointUri) {
+		this.endpointUri = endpointUri;
+	}
+
+	public String getEndpointUri() {
+		return this.endpointUri;
+	}
+
+	/**
+	 * A virtual client id to force the creation of a new connection instance
+	 */
+	public void setClientId(final String clientId) {
+		this.clientId = clientId;
+	}
+
+	public String getClientId() {
+		return this.clientId;
+	}
+
+	/**
+	 * The application name
+	 */
+	public void setApplicationName(final String applicationName) {
+		this.applicationName = applicationName;
+	}
+
+	public String getApplicationName() {
+		return this.applicationName;
+	}
+
+	/**
+	 * The application URI
+	 */
+	public void setApplicationUri(final String applicationUri) {
+		this.applicationUri = applicationUri;
+	}
+
+	public String getApplicationUri() {
+		return this.applicationUri;
+	}
+
+	/**
+	 * The product URI
+	 */
+	public void setProductUri(final String productUri) {
+		this.productUri = productUri;
+	}
+
+	public String getProductUri() {
+		return this.productUri;
+	}
+
+	/**
+	 * Request timeout in milliseconds
+	 */
+	public void setRequestTimeout(final Long reconnectTimeout) {
+		this.requestTimeout = reconnectTimeout;
+	}
+
+	public Long getRequestTimeout() {
+		return this.requestTimeout;
+	}
+
+	/**
+	 * Channel lifetime in milliseconds
+	 */
+	public void setChannelLifetime(final Long channelLifetime) {
+		this.channelLifetime = channelLifetime;
+	}
+
+	public Long getChannelLifetime() {
+		return this.channelLifetime;
+	}
+
+	/**
+	 * Session name
+	 */
+	public void setSessionName(final String sessionName) {
+		this.sessionName = sessionName;
+	}
+
+	public String getSessionName() {
+		return this.sessionName;
+	}
+
+	/**
+	 * Session timeout in milliseconds
+	 */
+	public void setSessionTimeout(final Long sessionTimeout) {
+		this.sessionTimeout = sessionTimeout;
+	}
+
+	public Long getSessionTimeout() {
+		return this.sessionTimeout;
+	}
+
+	/**
+	 * The maximum number of pending publish requests
+	 */
+	public void setMaxPendingPublishRequests(final Long maxPendingPublishRequests) {
+		this.maxPendingPublishRequests = maxPendingPublishRequests;
+	}
+
+	public Long getMaxPendingPublishRequests() {
+		return this.maxPendingPublishRequests;
+	}
+
+	/**
+	 * The maximum number of bytes a response message may have
+	 */
+	public void setMaxResponseMessageSize(final Long maxResponseMessageSize) {
+		this.maxResponseMessageSize = maxResponseMessageSize;
+	}
+
+	public Long getMaxResponseMessageSize() {
+		return this.maxResponseMessageSize;
+	}
+
+	/**
+	 * Whether secure channel re-authentication is enabled
+	 */
+	public void setSecureChannelReauthenticationEnabled(final Boolean secureChannelReauthenticationEnabled) {
+		this.secureChannelReauthenticationEnabled = secureChannelReauthenticationEnabled;
+	}
+
+	public Boolean getSecureChannelReauthenticationEnabled() {
+		return this.secureChannelReauthenticationEnabled;
+	}
+
+	/**
+	 * The URL where the key should be loaded from
+	 */
+	public void setKeyStoreUrl(final String keyStoreUrl) throws MalformedURLException {
+		this.keyStoreUrl = keyStoreUrl != null ? new URL(keyStoreUrl) : null;
+	}
+
+	public URL getKeyStoreUrl() {
+		return this.keyStoreUrl;
+	}
+
+	/**
+	 * The key store type
+	 */
+	public void setKeyStoreType(final String keyStoreType) {
+		this.keyStoreType = keyStoreType;
+	}
+
+	public String getKeyStoreType() {
+		return this.keyStoreType;
+	}
+
+	/**
+	 * The name of the key in the keystore file
+	 */
+	public void setKeyAlias(final String keyAlias) {
+		this.keyAlias = keyAlias;
+	}
+
+	public String getKeyAlias() {
+		return this.keyAlias;
+	}
+
+	/**
+	 * The keystore password
+	 */
+	public void setKeyStorePassword(final String keyStorePassword) {
+		this.keyStorePassword = keyStorePassword;
+	}
+
+	public String getKeyStorePassword() {
+		return this.keyStorePassword;
+	}
+
+	/**
+	 * The key password
+	 */
+	public void setKeyPassword(final String keyPassword) {
+		this.keyPassword = keyPassword;
+	}
+
+	public String getKeyPassword() {
+		return this.keyPassword;
+	}
+
+	@Override
+	public MiloClientConfiguration clone() {
+		return new MiloClientConfiguration(this);
+	}
+
+	public String toCacheId() {
+		if (this.clientId != null && !this.clientId.isEmpty()) {
+			return this.endpointUri + "|" + this.clientId;
+		} else {
+			return this.endpointUri;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
new file mode 100644
index 0000000..bc67c88
--- /dev/null
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright (C) 2016 Jens Reimann <jr...@redhat.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.milo.client;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.function.Consumer;
+
+import org.apache.camel.component.milo.NamespaceId;
+import org.apache.camel.component.milo.PartialNodeId;
+import org.apache.camel.component.milo.client.internal.SubscriptionManager;
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.stack.core.Stack;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+
+public class MiloClientConnection implements AutoCloseable {
+
+	private final MiloClientConfiguration configuration;
+
+	private SubscriptionManager manager;
+
+	private boolean initialized;
+
+	private final OpcUaClientConfigBuilder clientConfiguration;
+
+	public MiloClientConnection(final MiloClientConfiguration configuration,
+			final OpcUaClientConfigBuilder clientConfiguration) {
+		requireNonNull(configuration);
+
+		// make a copy since the configuration is mutable
+		this.configuration = configuration.clone();
+		this.clientConfiguration = clientConfiguration;
+	}
+
+	protected void init() throws Exception {
+		this.manager = new SubscriptionManager(this.configuration, this.clientConfiguration,
+				Stack.sharedScheduledExecutor(), 10_000);
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (this.manager != null) {
+			this.manager.dispose();
+			this.manager = null;
+		}
+	}
+
+	protected synchronized void checkInit() {
+		if (this.initialized) {
+			return;
+		}
+
+		try {
+			init();
+		} catch (final Exception e) {
+			throw new RuntimeException(e);
+		}
+		this.initialized = true;
+	}
+
+	@FunctionalInterface
+	public interface MonitorHandle {
+		public void unregister();
+	}
+
+	public MonitorHandle monitorValue(final MiloClientItemConfiguration configuration,
+			final Consumer<DataValue> valueConsumer) {
+
+		requireNonNull(configuration);
+		requireNonNull(valueConsumer);
+
+		checkInit();
+
+		final NamespaceId namespaceId = configuration.makeNamespaceId();
+		final PartialNodeId partialNodeId = configuration.makePartialNodeId();
+
+		final UInteger handle = this.manager.registerItem(namespaceId, partialNodeId,
+				configuration.getSamplingInterval(), valueConsumer);
+
+		return () -> MiloClientConnection.this.manager.unregisterItem(handle);
+	}
+
+	public String getConnectionId() {
+		return this.configuration.toCacheId();
+	}
+
+	public void writeValue(final NamespaceId namespaceId, final PartialNodeId partialNodeId, final Object value,
+			final boolean await) {
+		checkInit();
+
+		this.manager.write(namespaceId, partialNodeId, mapValue(value), await);
+	}
+
+	/**
+	 * Map the incoming value to some value writable to the milo client
+	 *
+	 * @param value
+	 *            the incoming value
+	 * @return the outgoing value
+	 */
+	private DataValue mapValue(final Object value) {
+		if (value instanceof DataValue) {
+			return (DataValue) value;
+		}
+		if (value instanceof Variant) {
+			return new DataValue((Variant) value);
+		}
+		return new DataValue(new Variant(value));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
new file mode 100644
index 0000000..b0e5d75
--- /dev/null
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (C) 2016 Jens Reimann <jr...@redhat.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.milo.client;
+
+import java.util.Objects;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.milo.Messages;
+import org.apache.camel.component.milo.client.MiloClientConnection.MonitorHandle;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.impl.DefaultMessage;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MiloClientConsumer extends DefaultConsumer {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MiloClientConsumer.class);
+
+	private final MiloClientConnection connection;
+
+	private final MiloClientItemConfiguration configuration;
+
+	private MonitorHandle handle;
+
+	public MiloClientConsumer(final MiloClientEndpoint endpoint, final Processor processor,
+			final MiloClientConnection connection, final MiloClientItemConfiguration configuration) {
+		super(endpoint, processor);
+
+		Objects.requireNonNull(connection);
+		Objects.requireNonNull(configuration);
+
+		this.connection = connection;
+		this.configuration = configuration;
+	}
+
+	@Override
+	protected void doStart() throws Exception {
+		super.doStart();
+
+		this.handle = this.connection.monitorValue(this.configuration, this::handleValueUpdate);
+	}
+
+	@Override
+	protected void doStop() throws Exception {
+		if (this.handle != null) {
+			this.handle.unregister();
+			this.handle = null;
+		}
+
+		super.doStop();
+	}
+
+	private void handleValueUpdate(final DataValue value) {
+		final Exchange exchange = getEndpoint().createExchange();
+		exchange.setIn(mapMessage(value));
+		try {
+			getAsyncProcessor().process(exchange);
+		} catch (final Exception e) {
+			LOG.debug("Failed to process message", e);
+		}
+	}
+
+	private Message mapMessage(final DataValue value) {
+		if (value == null) {
+			return null;
+		}
+
+		final DefaultMessage result = new DefaultMessage();
+
+		Messages.fillFromDataValue(value, result);
+
+		return result;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java
new file mode 100644
index 0000000..3e0742d
--- /dev/null
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java
@@ -0,0 +1,242 @@
+/*
+ * Copyright (C) 2016 Jens Reimann <jr...@redhat.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.milo.client;
+
+import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.ushort;
+
+import java.util.Objects;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.milo.NamespaceId;
+import org.apache.camel.component.milo.PartialNodeId;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
+
+@UriEndpoint(scheme = "milo-client", syntax = "milo-client:tcp://user:password@host:port/path/to/service?itemId=item.id&namespaceUri=urn:foo:bar", title = "Milo based OPC UA Client", consumerClass = MiloClientConsumer.class, label = "iot")
+public class MiloClientEndpoint extends DefaultEndpoint implements MiloClientItemConfiguration {
+
+	/**
+	 * The OPC UA server endpoint
+	 */
+	@UriPath
+	@Metadata(required = "true")
+	private final String endpointUri;
+
+	/**
+	 * The node ID as string ID **deprecated**
+	 *
+	 * @deprecated Use "node" instead
+	 */
+	@UriParam
+	@Deprecated
+	private String nodeId;
+
+	/**
+	 * The namespace as URI **deprecated**
+	 *
+	 * @deprecated Use "node" instead
+	 */
+	@UriParam
+	@Deprecated
+	private String namespaceUri;
+
+	/**
+	 * The namespace as numeric index **deprecated**
+	 *
+	 * @deprecated Use "node" instead
+	 */
+	@UriParam
+	@Deprecated
+	private Integer namespaceIndex;
+
+	/**
+	 * The node definition (see Node ID)
+	 */
+	@UriParam
+	private ExpandedNodeId node;
+
+	/**
+	 * The sampling interval in milliseconds
+	 */
+	@UriParam
+	private Double samplingInterval;
+
+	/**
+	 * The client configuration
+	 */
+	@UriParam
+	private MiloClientConfiguration client;
+
+	/**
+	 * Default "await" setting for writes
+	 */
+	@UriParam
+	private boolean defaultAwaitWrites = false;
+
+	private final MiloClientConnection connection;
+	private final MiloClientComponent component;
+
+	public MiloClientEndpoint(final String uri, final MiloClientComponent component,
+			final MiloClientConnection connection, final String endpointUri) {
+		super(uri, component);
+
+		Objects.requireNonNull(component);
+		Objects.requireNonNull(connection);
+		Objects.requireNonNull(endpointUri);
+
+		this.endpointUri = endpointUri;
+
+		this.component = component;
+		this.connection = connection;
+	}
+
+	@Override
+	protected void doStart() throws Exception {
+		super.doStart();
+	}
+
+	@Override
+	protected void doStop() throws Exception {
+		this.component.disposed(this);
+		super.doStop();
+	}
+
+	@Override
+	public Producer createProducer() throws Exception {
+		return new MiloClientProducer(this, this.connection, this, this.defaultAwaitWrites);
+	}
+
+	@Override
+	public Consumer createConsumer(final Processor processor) throws Exception {
+		return new MiloClientConsumer(this, processor, this.connection, this);
+	}
+
+	@Override
+	public boolean isSingleton() {
+		return true;
+	}
+
+	public MiloClientConnection getConnection() {
+		return this.connection;
+	}
+
+	// item configuration
+
+	@Override
+	public PartialNodeId makePartialNodeId() {
+		PartialNodeId result = null;
+
+		if (this.node != null) {
+			result = PartialNodeId.fromExpandedNodeId(this.node);
+		}
+
+		if (result == null && this.nodeId != null) {
+			result = new PartialNodeId(this.nodeId);
+		}
+
+		if (result == null) {
+			throw new IllegalStateException("Missing or invalid node id configuration");
+		} else {
+			return result;
+		}
+	}
+
+	@Override
+	public NamespaceId makeNamespaceId() {
+		NamespaceId result = null;
+
+		if (this.node != null) {
+			result = NamespaceId.fromExpandedNodeId(this.node);
+		}
+
+		if (result == null && this.namespaceIndex != null) {
+			result = new NamespaceId(ushort(this.namespaceIndex));
+		}
+		if (result == null && this.namespaceUri != null) {
+			result = new NamespaceId(this.namespaceUri);
+		}
+
+		if (result == null) {
+			throw new IllegalStateException("Missing or invalid node id configuration");
+		} else {
+			return result;
+		}
+	}
+
+	public String getNodeId() {
+		return this.nodeId;
+	}
+
+	public void setNodeId(final String nodeId) {
+		this.nodeId = nodeId;
+	}
+
+	public String getNamespaceUri() {
+		return this.namespaceUri;
+	}
+
+	public void setNamespaceUri(final String namespaceUri) {
+		this.namespaceUri = namespaceUri;
+	}
+
+	public Integer getNamespaceIndex() {
+		return this.namespaceIndex;
+	}
+
+	public void setNamespaceIndex(final int namespaceIndex) {
+		this.namespaceIndex = namespaceIndex;
+	}
+
+	public void setNode(final String node) {
+		if (node == null) {
+			this.node = null;
+		} else {
+			this.node = ExpandedNodeId.parse(node);
+		}
+	}
+
+	public String getNode() {
+		if (this.node != null) {
+			return this.node.toParseableString();
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	public Double getSamplingInterval() {
+		return this.samplingInterval;
+	}
+
+	public void setSamplingInterval(final Double samplingInterval) {
+		this.samplingInterval = samplingInterval;
+	}
+
+	public boolean isDefaultAwaitWrites() {
+		return this.defaultAwaitWrites;
+	}
+
+	public void setDefaultAwaitWrites(final boolean defaultAwaitWrites) {
+		this.defaultAwaitWrites = defaultAwaitWrites;
+	}
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientItemConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientItemConfiguration.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientItemConfiguration.java
new file mode 100644
index 0000000..8664ef7
--- /dev/null
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientItemConfiguration.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2016 Jens Reimann <jr...@redhat.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.milo.client;
+
+import org.apache.camel.component.milo.NamespaceId;
+import org.apache.camel.component.milo.PartialNodeId;
+
+public interface MiloClientItemConfiguration {
+	public NamespaceId makeNamespaceId();
+
+	public PartialNodeId makePartialNodeId();
+
+	public Double getSamplingInterval();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java
new file mode 100644
index 0000000..edc5718
--- /dev/null
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2016 Jens Reimann <jr...@redhat.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.milo.client;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.milo.NamespaceId;
+import org.apache.camel.component.milo.PartialNodeId;
+import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MiloClientProducer extends DefaultProducer {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MiloClientProducer.class);
+
+	private final MiloClientConnection connection;
+
+	private final NamespaceId namespaceId;
+
+	private final PartialNodeId partialNodeId;
+
+	private final boolean defaultAwaitWrites;
+
+	public MiloClientProducer(final Endpoint endpoint, final MiloClientConnection connection,
+			final MiloClientItemConfiguration configuration, final boolean defaultAwaitWrites) {
+		super(endpoint);
+
+		this.connection = connection;
+		this.defaultAwaitWrites = defaultAwaitWrites;
+
+		this.namespaceId = configuration.makeNamespaceId();
+		this.partialNodeId = configuration.makePartialNodeId();
+	}
+
+	@Override
+	public void process(final Exchange exchange) throws Exception {
+		final Message msg = exchange.getIn();
+		final Object value = msg.getBody();
+
+		LOG.debug("Processing message: {}", value);
+
+		final Boolean await = msg.getHeader("await", this.defaultAwaitWrites, Boolean.class);
+
+		this.connection.writeValue(this.namespaceId, this.partialNodeId, value, await != null ? await : false);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b3837fa/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
----------------------------------------------------------------------
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
new file mode 100644
index 0000000..95acfe4
--- /dev/null
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
@@ -0,0 +1,570 @@
+/*
+ * Copyright (C) 2016, 2017 Jens Reimann <jr...@redhat.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.milo.client.internal;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import org.apache.camel.component.milo.NamespaceId;
+import org.apache.camel.component.milo.PartialNodeId;
+import org.apache.camel.component.milo.client.MiloClientConfiguration;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.CompositeProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager.SubscriptionListener;
+import org.eclipse.milo.opcua.stack.client.UaTcpStackClient;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.StatusCodes;
+import org.eclipse.milo.opcua.stack.core.UaException;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UShort;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
+import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SubscriptionManager {
+
+	private final static Logger LOG = LoggerFactory.getLogger(SubscriptionManager.class);
+
+	private final AtomicLong clientHandleCounter = new AtomicLong(0);
+
+	private final class SubscriptionListenerImpl implements SubscriptionListener {
+		@Override
+		public void onSubscriptionTransferFailed(final UaSubscription subscription, final StatusCode statusCode) {
+			LOG.info("Transfer failed {} : {}", subscription.getSubscriptionId(), statusCode);
+
+			// we simply tear it down and build it up again
+			handleConnectionFailue(new RuntimeException("Subscription failed to reconnect"));
+		}
+
+		@Override
+		public void onStatusChanged(final UaSubscription subscription, final StatusCode status) {
+			LOG.info("Subscription status changed {} : {}", subscription.getSubscriptionId(), status);
+		}
+
+		@Override
+		public void onPublishFailure(final UaException exception) {
+		}
+
+		@Override
+		public void onNotificationDataLost(final UaSubscription subscription) {
+		}
+
+		@Override
+		public void onKeepAlive(final UaSubscription subscription, final DateTime publishTime) {
+		}
+	}
+
+	public interface Worker<T> {
+		public void work(T on) throws Exception;
+	}
+
+	private static class Subscription {
+		private final NamespaceId namespaceId;
+		private final PartialNodeId partialNodeId;
+		private final Double samplingInterval;
+
+		private final Consumer<DataValue> valueConsumer;
+
+		public Subscription(final NamespaceId namespaceId, final PartialNodeId partialNodeId,
+				final Double samplingInterval, final Consumer<DataValue> valueConsumer) {
+			this.namespaceId = namespaceId;
+			this.partialNodeId = partialNodeId;
+			this.samplingInterval = samplingInterval;
+			this.valueConsumer = valueConsumer;
+		}
+
+		public NamespaceId getNamespaceId() {
+			return this.namespaceId;
+		}
+
+		public PartialNodeId getPartialNodeId() {
+			return this.partialNodeId;
+		}
+
+		public Double getSamplingInterval() {
+			return this.samplingInterval;
+		}
+
+		public Consumer<DataValue> getValueConsumer() {
+			return this.valueConsumer;
+		}
+	}
+
+	private class Connected {
+		private OpcUaClient client;
+		private final UaSubscription manager;
+
+		private final Map<UInteger, Subscription> badSubscriptions = new HashMap<>();
+
+		private final Map<UInteger, UaMonitoredItem> goodSubscriptions = new HashMap<>();
+
+		private final Map<String, UShort> namespaceCache = new ConcurrentHashMap<>();
+
+		public Connected(final OpcUaClient client, final UaSubscription manager) {
+			this.client = client;
+			this.manager = manager;
+		}
+
+		public void putSubscriptions(final Map<UInteger, Subscription> subscriptions) throws Exception {
+
+			if (subscriptions.isEmpty()) {
+				return;
+			}
+
+			// convert to requests
+
+			final List<MonitoredItemCreateRequest> items = new ArrayList<>(subscriptions.size());
+
+			for (final Map.Entry<UInteger, Subscription> entry : subscriptions.entrySet()) {
+				final Subscription s = entry.getValue();
+
+				UShort namespaceIndex;
+				if (s.getNamespaceId().isNumeric()) {
+					namespaceIndex = s.getNamespaceId().getNumeric();
+				} else {
+					namespaceIndex = lookupNamespace(s.getNamespaceId().getUri());
+				}
+
+				if (namespaceIndex == null) {
+					handleSubscriptionError(new StatusCode(StatusCodes.Bad_InvalidArgument), entry.getKey(), s);
+				} else {
+					final NodeId nodeId = s.getPartialNodeId().toNodeId(namespaceIndex);
+					final ReadValueId itemId = new ReadValueId(nodeId, AttributeId.Value.uid(), null,
+							QualifiedName.NULL_VALUE);
+					final MonitoringParameters parameters = new MonitoringParameters(entry.getKey(),
+							s.getSamplingInterval(), null, null, null);
+					items.add(new MonitoredItemCreateRequest(itemId, MonitoringMode.Reporting, parameters));
+				}
+			}
+
+			if (!items.isEmpty())
+
+			{
+
+				// create monitors
+
+				this.manager.createMonitoredItems(TimestampsToReturn.Both, items, (item, idx) -> {
+
+					// set value listener
+
+					final Subscription s = subscriptions.get(item.getClientHandle());
+
+					if (item.getStatusCode().isBad()) {
+						handleSubscriptionError(item.getStatusCode(), item.getClientHandle(), s);
+					} else {
+						this.goodSubscriptions.put(item.getClientHandle(), item);
+						item.setValueConsumer(s.getValueConsumer());
+					}
+
+				}).get();
+			}
+
+			if (!this.badSubscriptions.isEmpty()) {
+				SubscriptionManager.this.executor.schedule(this::resubscribe, SubscriptionManager.this.reconnectTimeout,
+						TimeUnit.MILLISECONDS);
+			}
+		}
+
+		private void handleSubscriptionError(final StatusCode statusCode, final UInteger clientHandle,
+				final Subscription s) {
+			this.badSubscriptions.put(clientHandle, s);
+			s.getValueConsumer().accept(new DataValue(statusCode));
+		}
+
+		private void resubscribe() {
+			final Map<UInteger, Subscription> subscriptions = new HashMap<>(this.badSubscriptions);
+			this.badSubscriptions.clear();
+			try {
+				putSubscriptions(subscriptions);
+			} catch (final Exception e) {
+				handleConnectionFailue(e);
+			}
+		}
+
+		public void activate(final UInteger clientHandle, final Subscription subscription) throws Exception {
+			putSubscriptions(Collections.singletonMap(clientHandle, subscription));
+		}
+
+		public void deactivate(final UInteger clientHandle) throws Exception {
+			final UaMonitoredItem item = this.goodSubscriptions.remove(clientHandle);
+			if (item != null) {
+				this.manager.deleteMonitoredItems(Collections.singletonList(item)).get();
+			} else {
+				this.badSubscriptions.remove(clientHandle);
+			}
+		}
+
+		private UShort lookupNamespace(final String namespaceUri) throws Exception {
+			return lookupNamespaceIndex(namespaceUri).get();
+		}
+
+		private CompletableFuture<UShort> lookupNamespaceIndex(final String namespaceUri) {
+
+			LOG.trace("Looking up namespace: {}", namespaceUri);
+
+			// check cache
+			{
+				final UShort result = this.namespaceCache.get(namespaceUri);
+				if (result != null) {
+					LOG.trace("Found namespace in cache: {} -> {}", namespaceUri, result);
+					return CompletableFuture.completedFuture(result);
+				}
+			}
+
+			/*
+			 * We always read the server side table since the cache did not help
+			 * us and the namespace might have been added to the server at a
+			 * later time.
+			 */
+
+			LOG.debug("Looking up namespace on server: {}", namespaceUri);
+
+			final CompletableFuture<DataValue> future = this.client.readValue(0, TimestampsToReturn.Neither,
+					Identifiers.Server_NamespaceArray);
+
+			return future.thenApply(value -> {
+				final Object rawValue = value.getValue().getValue();
+
+				if (rawValue instanceof String[]) {
+					final String[] namespaces = (String[]) rawValue;
+					for (int i = 0; i < namespaces.length; i++) {
+						if (namespaces[i].equals(namespaceUri)) {
+							final UShort result = Unsigned.ushort(i);
+							this.namespaceCache.putIfAbsent(namespaceUri, result);
+							return result;
+						}
+					}
+				}
+				return null;
+			});
+		}
+
+		public void dispose() {
+			if (this.client != null) {
+				this.client.disconnect();
+				this.client = null;
+			}
+		}
+
+		public CompletableFuture<StatusCode> write(final NamespaceId namespaceId, final PartialNodeId partialNodeId,
+				final DataValue value) {
+
+			final CompletableFuture<UShort> future;
+
+			LOG.trace("Namespace: {}", namespaceId);
+			if (namespaceId.isNumeric()) {
+				LOG.trace("Using provided index: {}", namespaceId.getNumeric());
+				future = CompletableFuture.completedFuture(namespaceId.getNumeric());
+			} else {
+				LOG.trace("Looking up namespace: {}", namespaceId.getUri());
+				future = lookupNamespaceIndex(namespaceId.getUri());
+			}
+
+			return future.thenCompose(index -> {
+
+				final NodeId nodeId = partialNodeId.toNodeId(index);
+				LOG.debug("Node - partial: {}, full: {}", partialNodeId, nodeId);
+
+				return this.client.writeValue(nodeId, value).whenComplete((status, error) -> {
+					if (status != null) {
+						LOG.debug("Write to ns={}/{}, id={} = {} -> {}", namespaceId, index, nodeId, value, status);
+					} else {
+						LOG.debug("Failed to write", error);
+					}
+				});
+
+			});
+		}
+
+	}
+
+	private final MiloClientConfiguration configuration;
+	private final OpcUaClientConfigBuilder clientBuilder;
+	private final ScheduledExecutorService executor;
+	private final long reconnectTimeout;
+
+	private Connected connected;
+	private boolean disposed;
+	private Future<?> reconnectJob;
+	private final Map<UInteger, Subscription> subscriptions = new HashMap<>();
+
+	public SubscriptionManager(final MiloClientConfiguration configuration,
+			final OpcUaClientConfigBuilder clientBuilder, final ScheduledExecutorService executor,
+			final long reconnectTimeout) {
+
+		this.configuration = configuration;
+		this.clientBuilder = clientBuilder;
+		this.executor = executor;
+		this.reconnectTimeout = reconnectTimeout;
+
+		connect();
+	}
+
+	private synchronized void handleConnectionFailue(final Throwable e) {
+		if (this.connected != null) {
+			this.connected.dispose();
+			this.connected = null;
+		}
+
+		// log
+
+		LOG.info("Connection failed", e);
+
+		// always trigger re-connect
+
+		triggerReconnect(true);
+	}
+
+	private void connect() {
+		LOG.info("Starting connect");
+
+		synchronized (this) {
+			this.reconnectJob = null;
+
+			if (this.disposed) {
+				// we woke up disposed
+				return;
+			}
+		}
+
+		try {
+			final Connected connected = performConnect();
+			LOG.debug("Connect call done");
+			synchronized (this) {
+				if (this.disposed) {
+					// we got disposed during connect
+					return;
+				}
+
+				try {
+					LOG.debug("Setting subscriptions: {}", this.subscriptions.size());
+					connected.putSubscriptions(this.subscriptions);
+
+					LOG.debug("Update state : {} -> {}", this.connected, connected);
+					final Connected oldConnected = this.connected;
+					this.connected = connected;
+
+					if (oldConnected != null) {
+						LOG.debug("Dispose old state");
+						oldConnected.dispose();
+					}
+
+				} catch (final Exception e) {
+					LOG.info("Failed to set subscriptions", e);
+					connected.dispose();
+					throw e;
+				}
+			}
+		} catch (final Exception e) {
+			LOG.info("Failed to connect", e);
+			triggerReconnect(false);
+		}
+	}
+
+	public void dispose() {
+		Connected connected;
+
+		synchronized (this) {
+			if (this.disposed) {
+				return;
+			}
+			this.disposed = true;
+			connected = this.connected;
+		}
+
+		if (connected != null) {
+			// dispose outside of lock
+			connected.dispose();
+		}
+	}
+
+	private synchronized void triggerReconnect(final boolean immediate) {
+		LOG.info("Trigger re-connect (immediate: {})", immediate);
+
+		if (this.reconnectJob != null) {
+			LOG.info("Re-connect already scheduled");
+			return;
+		}
+
+		if (immediate) {
+			this.reconnectJob = this.executor.submit(this::connect);
+		} else {
+			this.reconnectJob = this.executor.schedule(this::connect, this.reconnectTimeout, TimeUnit.MILLISECONDS);
+		}
+	}
+
+	private Connected performConnect() throws Exception {
+		final EndpointDescription endpoint = UaTcpStackClient.getEndpoints(this.configuration.getEndpointUri())
+				.thenApply(endpoints -> {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Found enpoints:");
+						for (final EndpointDescription ep : endpoints) {
+							LOG.debug("\t{}", ep);
+						}
+					}
+
+					return findEndpoint(endpoints);
+				}).get();
+
+		LOG.debug("Selected endpoint: {}", endpoint);
+
+		final URI uri = URI.create(this.configuration.getEndpointUri());
+
+		// set identity providers
+
+		final List<IdentityProvider> providers = new LinkedList<>();
+
+		final String user = uri.getUserInfo();
+		if (user != null && !user.isEmpty()) {
+			final String[] creds = user.split(":", 2);
+			if (creds != null && creds.length == 2) {
+				LOG.debug("Enable username/password provider: {}", creds[0]);
+			}
+			providers.add(new UsernameProvider(creds[0], creds[1]));
+		}
+
+		// FIXME: need a way to clone
+		final OpcUaClientConfigBuilder cfg = this.clientBuilder;
+
+		providers.add(new AnonymousProvider());
+		cfg.setIdentityProvider(new CompositeProvider(providers));
+
+		// set endpoint
+
+		cfg.setEndpoint(endpoint);
+
+		final OpcUaClient client = new OpcUaClient(cfg.build());
+
+		try {
+			final UaSubscription manager = client.getSubscriptionManager().createSubscription(1_000.0).get();
+			client.getSubscriptionManager().addSubscriptionListener(new SubscriptionListenerImpl());
+
+			return new Connected(client, manager);
+		} catch (final Throwable e) {
+			if (client != null) {
+				// clean up
+				client.disconnect();
+			}
+			throw e;
+		}
+	}
+
+	private EndpointDescription findEndpoint(final EndpointDescription[] endpoints) {
+		EndpointDescription best = null;
+		for (final EndpointDescription ep : endpoints) {
+			if (best == null || ep.getSecurityLevel().compareTo(best.getSecurityLevel()) > 0) {
+				best = ep;
+			}
+		}
+		return best;
+	}
+
+	protected synchronized void whenConnected(final Worker<Connected> worker) {
+		if (this.connected != null) {
+			try {
+				worker.work(this.connected);
+			} catch (final Exception e) {
+				handleConnectionFailue(e);
+			}
+		}
+	}
+
+	public UInteger registerItem(final NamespaceId namespaceId, final PartialNodeId partialNodeId,
+			final Double samplingInterval, final Consumer<DataValue> valueConsumer) {
+
+		final UInteger clientHandle = Unsigned.uint(this.clientHandleCounter.incrementAndGet());
+		final Subscription subscription = new Subscription(namespaceId, partialNodeId, samplingInterval, valueConsumer);
+
+		synchronized (this) {
+			this.subscriptions.put(clientHandle, subscription);
+
+			whenConnected(connected -> {
+				connected.activate(clientHandle, subscription);
+			});
+		}
+
+		return clientHandle;
+	}
+
+	public synchronized void unregisterItem(final UInteger clientHandle) {
+		if (this.subscriptions.remove(clientHandle) != null) {
+			whenConnected(connected -> {
+				connected.deactivate(clientHandle);
+			});
+		}
+	}
+
+	public void write(final NamespaceId namespaceId, final PartialNodeId partialNodeId, final DataValue value,
+			final boolean await) {
+		CompletableFuture<Object> future = null;
+
+		synchronized (this) {
+			if (this.connected != null) {
+				future = this.connected.write(namespaceId, partialNodeId, value).handleAsync((status, e) -> {
+					// handle outside the lock, running using
+					// handleAsync
+					if (e != null) {
+						handleConnectionFailue(e);
+					}
+					return null;
+				}, this.executor);
+			}
+		}
+
+		if (await && future != null) {
+			try {
+				future.get();
+			} catch (InterruptedException | ExecutionException e) {
+				// should never happen since our previous handler should not
+				// fail
+				LOG.warn("Failed to wait for completion", e);
+			}
+		}
+	}
+
+}