You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/06/16 15:39:10 UTC

[09/13] camel git commit: CAMEL-10054: Create camel-atomix component

CAMEL-10054: Create camel-atomix component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a5d4df8f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a5d4df8f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a5d4df8f

Branch: refs/heads/master
Commit: a5d4df8fa301e5ba16619eb2176ee4d1a815441c
Parents: abb0ef4
Author: lburgazzoli <lb...@gmail.com>
Authored: Fri Jun 2 08:13:14 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Fri Jun 16 17:37:54 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/atomix-cluster-component.adoc |  19 +-
 .../cluster/AtomixClusterConfiguration.java     |  15 +-
 .../atomix/cluster/AtomixClusterHelper.java     |  77 +++++++
 .../component/atomix/ha/AtomixCluster.java      | 157 +++++++++++--
 .../atomix/ha/AtomixClusterService.java         | 224 -------------------
 .../atomix/ha/AtomixRoutePolicyMain.java        |  59 ++---
 6 files changed, 253 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a5d4df8f/components/camel-atomix/src/main/docs/atomix-cluster-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/docs/atomix-cluster-component.adoc b/components/camel-atomix/src/main/docs/atomix-cluster-component.adoc
index 80f2f7e..5ae5df1 100644
--- a/components/camel-atomix/src/main/docs/atomix-cluster-component.adoc
+++ b/components/camel-atomix/src/main/docs/atomix-cluster-component.adoc
@@ -6,22 +6,9 @@ The camel atomix component allows you to work with Atomix, a fault-tolerant dist
 
 
 mvn \
-    -Datomix.cluster="127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003" \
-    -Dexec.mainClass=org.apache.camel.component.atomix.ha.AtomixRoutePolicyMain \
-    -Dexec.classpathScope=test \
-    test-compile \
-    exec:java
-
-mvn \
-    -Datomix.cluster="127.0.0.1:9002,127.0.0.1:9001,127.0.0.1:9003" \
     -Dexec.mainClass=org.apache.camel.component.atomix.ha.AtomixRoutePolicyMain \
     -Dexec.classpathScope=test \
     test-compile \
-    exec:java
-
-mvn \
-    -Datomix.cluster="127.0.0.1:9003,127.0.0.1:9001,127.0.0.1:9002" \
-    -Dexec.mainClass=org.apache.camel.component.atomix.ha.AtomixRoutePolicyMain \
-    -Dexec.classpathScope=test \
-    test-compile \
-    exec:java
\ No newline at end of file
+    exec:java \
+    -Datomix.cluster="127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003" \
+    -Datomix.index="0"

http://git-wip-us.apache.org/repos/asf/camel/blob/a5d4df8f/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
index 5d1cbb1..6a61eb9 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
@@ -17,12 +17,13 @@ package org.apache.camel.component.atomix.cluster;
 
 import io.atomix.AtomixReplica;
 import io.atomix.copycat.server.storage.StorageLevel;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.atomix.AtomixConfiguration;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 
 @UriParams
-public class AtomixClusterConfiguration extends AtomixConfiguration {
+public class AtomixClusterConfiguration extends AtomixConfiguration implements Cloneable {
 
     @UriParam
     private String storagePath;
@@ -82,4 +83,16 @@ public class AtomixClusterConfiguration extends AtomixConfiguration {
     public void setReplica(AtomixReplica replica) {
         this.replica = replica;
     }
+
+    // ****************************************
+    // Copy
+    // ****************************************
+
+    public AtomixClusterConfiguration copy() {
+        try {
+            return (AtomixClusterConfiguration) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a5d4df8f/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
new file mode 100644
index 0000000..4e03627
--- /dev/null
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.apache.camel.component.atomix.cluster;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+import io.atomix.AtomixReplica;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.copycat.server.storage.Storage;
+import org.apache.camel.CamelContext;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ResourceHelper;
+
+public final class AtomixClusterHelper {
+    private AtomixClusterHelper() {
+    }
+
+    public static AtomixReplica createReplica(CamelContext camelContext, String address, AtomixClusterConfiguration configuration) throws Exception {
+        return createReplica(camelContext, new Address(address), configuration);
+    }
+
+    public static AtomixReplica createReplica(CamelContext camelContext, Address address, AtomixClusterConfiguration configuration) throws Exception {
+        AtomixReplica atomix = configuration.getReplica();
+
+        if (atomix == null) {
+            if (configuration.getReplicaRef() != null) {
+                atomix = CamelContextHelper.mandatoryLookup(camelContext, configuration.getReplicaRef(), AtomixReplica.class);
+            } else {
+                final AtomixReplica.Builder atomixBuilder;
+
+                String uri = configuration.getConfigurationUri();
+                if (ObjectHelper.isNotEmpty(uri)) {
+                    uri = camelContext.resolvePropertyPlaceholders(uri);
+                    try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, uri)) {
+                        Properties properties = new Properties();
+                        properties.load(is);
+
+                        atomixBuilder = AtomixReplica.builder(address, properties);
+                    }
+                } else {
+                    atomixBuilder = AtomixReplica.builder(address);
+                }
+
+                Storage.Builder storageBuilder = Storage.builder();
+                ObjectHelper.ifNotEmpty(configuration.getStorageLevel(), storageBuilder::withStorageLevel);
+                ObjectHelper.ifNotEmpty(configuration.getStoragePath(), storageBuilder::withDirectory);
+
+                atomixBuilder.withStorage(storageBuilder.build());
+
+                if (configuration.getTransport() != null) {
+                    atomixBuilder.withTransport(
+                        camelContext.getInjector().newInstance(configuration.getTransport())
+                    );
+                }
+
+                atomix = atomixBuilder.build();
+            }
+        }
+
+        return atomix;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a5d4df8f/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixCluster.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixCluster.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixCluster.java
index 10d53bc..ac8dbc7 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixCluster.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixCluster.java
@@ -16,13 +16,16 @@
  */
 package org.apache.camel.component.atomix.ha;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
+import io.atomix.Atomix;
 import io.atomix.AtomixReplica;
 import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Transport;
+import io.atomix.copycat.server.storage.StorageLevel;
 import org.apache.camel.CamelContext;
+import org.apache.camel.component.atomix.cluster.AtomixClusterConfiguration;
+import org.apache.camel.component.atomix.cluster.AtomixClusterHelper;
 import org.apache.camel.impl.ha.AbstractCamelCluster;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -31,30 +34,129 @@ import org.slf4j.LoggerFactory;
 public final class AtomixCluster extends AbstractCamelCluster<AtomixClusterView> {
     private static final Logger LOGGER = LoggerFactory.getLogger(AtomixCluster.class);
 
-    private final AtomixReplica atomix;
-    private final List<Address> addresses;
+    private CamelContext camelContext;
+    private Address address;
+    private AtomixClusterConfiguration configuration;
+    private AtomixReplica atomix;
 
-    public AtomixCluster(AtomixReplica atomix) {
-        this(null, atomix, Collections.emptyList());
+    public AtomixCluster() {
+        super("atomix");
+
+        this.configuration = new AtomixClusterConfiguration();
+    }
+
+    public AtomixCluster(CamelContext camelContext, Address address, AtomixClusterConfiguration configuration) {
+        super("atomix");
+
+        this.camelContext = camelContext;
+        this.address = address;
+        this.configuration = configuration.copy();
+    }
+
+    // **********************************
+    // Properties
+    // **********************************
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public Address getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = new Address(address);
+    }
+
+    public void setAddress(Address address) {
+        this.address = address;
+    }
+
+    public AtomixClusterConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(AtomixClusterConfiguration configuration) {
+        this.configuration = configuration.copy();
+    }
+
+    public String getStoragePath() {
+        return configuration.getStoragePath();
+    }
+
+    public void setStoragePath(String storagePath) {
+        configuration.setStoragePath(storagePath);
+    }
+
+    public List<Address> getNodes() {
+        return configuration.getNodes();
     }
 
-    public AtomixCluster(AtomixReplica atomix, List<Address> addresses) {
-       this(null, atomix, addresses);
+    public StorageLevel getStorageLevel() {
+        return configuration.getStorageLevel();
     }
 
-    public AtomixCluster(CamelContext camelContext, AtomixReplica atomix, List<Address> addresses) {
-        super("camel-atomix", camelContext);
+    public void setNodes(List<Address> nodes) {
+        configuration.setNodes(nodes);
+    }
+
+    public void setStorageLevel(StorageLevel storageLevel) {
+        configuration.setStorageLevel(storageLevel);
+    }
+
+    public void setNodes(String nodes) {
+        configuration.setNodes(nodes);
+    }
+
+    public Class<? extends Transport> getTransport() {
+        return configuration.getTransport();
+    }
 
-        this.atomix = atomix;
-        this.addresses = new ArrayList<>(addresses);
+    public void setTransport(Class<? extends Transport> transport) {
+        configuration.setTransport(transport);
     }
 
+    public String getReplicaRef() {
+        return configuration.getReplicaRef();
+    }
+
+    public void setReplicaRef(String clusterref) {
+        configuration.setReplicaRef(clusterref);
+    }
+
+    public Atomix getReplica() {
+        return configuration.getReplica();
+    }
+
+    public void setReplica(AtomixReplica replica) {
+        configuration.setReplica(replica);
+    }
+
+    public String getConfigurationUri() {
+        return configuration.getConfigurationUri();
+    }
+
+    public void setConfigurationUri(String configurationUri) {
+        configuration.setConfigurationUri(configurationUri);
+    }
+
+    // *********************************************
+    // Lifecycle
+    // *********************************************
+
     @Override
     protected void doStart() throws Exception {
         // Assume that if addresses are provided the cluster needs be bootstrapped.
-        if (ObjectHelper.isNotEmpty(addresses)) {
-            LOGGER.debug("Bootstrap cluster for nodes: {}", addresses);
-            this.atomix.bootstrap(addresses).join();
+        if (ObjectHelper.isNotEmpty(configuration.getNodes())) {
+            LOGGER.debug("Bootstrap cluster on address {} for nodes: {}", address, configuration.getNodes());
+            getOrCreateAtomix().bootstrap(configuration.getNodes()).join();
             LOGGER.debug("Bootstrap cluster done");
         }
 
@@ -62,7 +164,28 @@ public final class AtomixCluster extends AbstractCamelCluster<AtomixClusterView>
     }
 
     @Override
-    protected AtomixClusterView doCreateView(String namespace) throws Exception {
-        return new AtomixClusterView(this, namespace, atomix);
+    protected AtomixClusterView createView(String namespace) throws Exception {
+        return new AtomixClusterView(this, namespace, getOrCreateAtomix());
+    }
+
+
+    private AtomixReplica getOrCreateAtomix() throws Exception {
+        if (atomix == null) {
+            // Validate parameters
+            ObjectHelper.notNull(camelContext, "Camel Context");
+            ObjectHelper.notNull(address, "Atomix Node Address");
+            ObjectHelper.notNull(configuration, "Atomix Node Configuration");
+
+            atomix = AtomixClusterHelper.createReplica(camelContext, address, configuration);
+
+            // Assume that if addresses are provided the cluster needs be bootstrapped.
+            if (ObjectHelper.isNotEmpty(configuration.getNodes())) {
+                LOGGER.debug("Bootstrap cluster on address {} for nodes: {}", address, configuration.getNodes());
+                this.atomix.bootstrap(configuration.getNodes()).join();
+                LOGGER.debug("Bootstrap cluster done");
+            }
+        }
+
+        return this.atomix;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a5d4df8f/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
deleted file mode 100644
index 61fc817..0000000
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.atomix.ha;
-
-import java.io.InputStream;
-import java.util.List;
-import java.util.Properties;
-
-import io.atomix.Atomix;
-import io.atomix.AtomixReplica;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Transport;
-import io.atomix.copycat.server.storage.Storage;
-import io.atomix.copycat.server.storage.StorageLevel;
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.component.atomix.cluster.AtomixClusterConfiguration;
-import org.apache.camel.ha.CamelCluster;
-import org.apache.camel.ha.CamelClusterService;
-import org.apache.camel.ha.CamelClusterView;
-import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ResourceHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AtomixClusterService extends ServiceSupport implements CamelContextAware, CamelClusterService {
-    private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClusterService.class);
-
-    private final AtomixClusterConfiguration configuration;
-
-    private CamelContext camelContext;
-    private Address address;
-    private AtomixCluster cluster;
-
-    public AtomixClusterService() {
-        this.configuration = new AtomixClusterConfiguration();
-    }
-
-    // **********************************
-    // Properties
-    // **********************************
-
-    @Override
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
-    }
-
-    public Address getAddress() {
-        return address;
-    }
-
-    public void setAddress(String address) {
-        this.address = new Address(address);
-    }
-
-    public void setAddress(Address address) {
-        this.address = address;
-    }
-
-    public String getStoragePath() {
-        return configuration.getStoragePath();
-    }
-
-    public void setStoragePath(String storagePath) {
-        configuration.setStoragePath(storagePath);
-    }
-
-    public List<Address> getNodes() {
-        return configuration.getNodes();
-    }
-
-    public StorageLevel getStorageLevel() {
-        return configuration.getStorageLevel();
-    }
-
-    public void setNodes(List<Address> nodes) {
-        configuration.setNodes(nodes);
-    }
-
-    public void setStorageLevel(StorageLevel storageLevel) {
-        configuration.setStorageLevel(storageLevel);
-    }
-
-    public void setNodes(String nodes) {
-        configuration.setNodes(nodes);
-    }
-
-    public Class<? extends Transport> getTransport() {
-        return configuration.getTransport();
-    }
-
-    public void setTransport(Class<? extends Transport> transport) {
-        configuration.setTransport(transport);
-    }
-
-    public String getReplicaRef() {
-        return configuration.getReplicaRef();
-    }
-
-    /**
-     * Set the reference of an instance of {@link AtomixReplica}.
-     * @param clusterref
-     */
-    public void setReplicaRef(String clusterref) {
-        configuration.setReplicaRef(clusterref);
-    }
-
-    public Atomix getReplica() {
-        return configuration.getReplica();
-    }
-
-    /**
-     * Set an instance of {@link AtomixReplica}.
-     * @param replica
-     */
-    public void setReplica(AtomixReplica replica) {
-        configuration.setReplica(replica);
-    }
-
-    public String getConfigurationUri() {
-        return configuration.getConfigurationUri();
-    }
-
-    public void setConfigurationUri(String configurationUri) {
-        configuration.setConfigurationUri(configurationUri);
-    }
-
-    // **********************************
-    // Cluster
-    // **********************************
-
-    @Override
-    public synchronized CamelCluster getCluster() throws Exception  {
-        if (this.cluster == null) {
-            AtomixReplica atomix = configuration.getReplica();
-
-            if (atomix == null) {
-                if (configuration.getReplicaRef() != null) {
-                    atomix = CamelContextHelper.mandatoryLookup(camelContext, configuration.getReplicaRef(), AtomixReplica.class);
-                } else {
-                    ObjectHelper.notNull(this.address, "Atomix Address");
-
-                    final AtomixReplica.Builder atomixBuilder;
-
-                    String uri = configuration.getConfigurationUri();
-                    if (ObjectHelper.isNotEmpty(uri)) {
-                        uri = camelContext.resolvePropertyPlaceholders(uri);
-                        try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, uri)) {
-                            Properties properties = new Properties();
-                            properties.load(is);
-
-                            atomixBuilder = AtomixReplica.builder(this.address, properties);
-                        }
-                    } else {
-                        atomixBuilder = AtomixReplica.builder(this.address);
-                    }
-
-                    Storage.Builder storageBuilder = Storage.builder();
-                    ObjectHelper.ifNotEmpty(configuration.getStorageLevel(), storageBuilder::withStorageLevel);
-                    ObjectHelper.ifNotEmpty(configuration.getStoragePath(), storageBuilder::withDirectory);
-
-                    atomixBuilder.withStorage(storageBuilder.build());
-
-                    if (configuration.getTransport() != null) {
-                        atomixBuilder.withTransport(
-                            camelContext.getInjector().newInstance(configuration.getTransport())
-                        );
-                    }
-
-                    atomix = atomixBuilder.build();
-                }
-            }
-
-            this.cluster = new AtomixCluster(atomix, configuration.getNodes());
-            this.cluster.setCamelContext(camelContext);
-        }
-
-        return this.cluster;
-    }
-
-    @Override
-    public CamelClusterView createView(String namespace) throws Exception {
-        return getCluster().createView(namespace);
-    }
-
-    // **********************************
-    // Service
-    // **********************************
-
-    @Override
-    protected void doStart() throws Exception {
-        LOGGER.debug("Starting cluster on address {}", address);
-        getCluster().start();
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (this.cluster != null) {
-            this.cluster.stop();
-            this.cluster = null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/a5d4df8f/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
index e622900..e0e253f 100644
--- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
+++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
@@ -16,19 +16,14 @@
  */
 package org.apache.camel.component.atomix.ha;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 
 import io.atomix.catalyst.transport.Address;
 import io.atomix.copycat.server.storage.StorageLevel;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.ha.CamelClusterService;
-import org.apache.camel.ha.CamelClusterView;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.ha.ClusteredRoutePolicy;
-import org.apache.camel.spi.RoutePolicy;
-import org.apache.camel.util.FileUtil;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,58 +31,42 @@ public final class AtomixRoutePolicyMain {
     private static final Logger LOGGER = LoggerFactory.getLogger(AtomixRoutePolicyMain.class);
 
     public static void main(final String[] args) throws Exception {
-        String[] addresses = System.getProperty("atomix.cluster").split(",");
+        final Integer index = Integer.getInteger("atomix.index");
+        final String[] addresses = System.getProperty("atomix.cluster").split(",");
 
-        List<Address> cluster = new ArrayList<>();
+        List<Address> nodes = new ArrayList<>();
         for (int i = 0; i < addresses.length; i++) {
             String[] parts = addresses[i].split(":");
-            cluster.add(new Address(parts[0], Integer.valueOf(parts[1])));
+            nodes.add(new Address(parts[0], Integer.valueOf(parts[1])));
         }
 
-        final String id = String.format("atomix-%d", cluster.get(0).port());
-        final File path = new File("target", id);
-
-        // Cleanup
-        FileUtil.removeDir(path);
-
-        AtomixClusterService service = new AtomixClusterService();
-        service.setStoragePath(path.getAbsolutePath());
-        service.setStorageLevel(StorageLevel.DISK);
-        service.setAddress(cluster.get(0));
-        service.setNodes(cluster);
+        AtomixCluster cluster = new AtomixCluster();
+        cluster.setStorageLevel(StorageLevel.MEMORY);
+        cluster.setAddress(nodes.get(index));
+        cluster.setNodes(nodes);
 
         DefaultCamelContext context = new DefaultCamelContext();
-        context.addService(service);
+        context.addService(cluster);
+        context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                CamelClusterService cluster = getContext().hasService(AtomixClusterService.class);
-                CamelClusterView view = cluster.createView("my-view");
-                RoutePolicy policy = ClusteredRoutePolicy.forView(view);
-
-                fromF("timer:%s-1?period=2s", id)
-                    .routeId(id + "-1")
-                    .routePolicy(policy)
+                fromF("timer:atomix-%d-1?period=2s", nodes.get(index).port())
                     .log("${routeId} (1)");
-                fromF("timer:%s-2?period=5s", id)
-                    .routeId(id + "-2")
-                    .routePolicy(policy)
+                fromF("timer:atomix-%d-2?period=5s", nodes.get(index).port())
                     .log("${routeId} (2)");
             }
         });
 
         context.start();
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                try {
-                    context.stop();
-                } catch (Exception e) {
-                    LOGGER.warn("", e);
-                }
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+                context.stop();
+            } catch (Exception e) {
+                LOGGER.warn("", e);
             }
-        });
+        }));
 
         for (int i = 0; i < Integer.MAX_VALUE; i++) {
             Thread.sleep(1000);