You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/06/26 15:30:19 UTC
[1/2] camel git commit: camel-atomix: makes cluster service simpler
Repository: camel
Updated Branches:
refs/heads/master 2993c5145 -> 37a15c727
camel-atomix: makes cluster service simpler
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c7384488
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c7384488
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c7384488
Branch: refs/heads/master
Commit: c73844880d659267fef9f662a1459c0ca3b6dd4f
Parents: 2993c51
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon Jun 26 15:26:25 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Mon Jun 26 15:26:25 2017 +0200
----------------------------------------------------------------------
.../atomix/AtomixConfigurationAware.java | 29 --------------------
.../atomix/ha/AtomixClusterClientService.java | 7 ++---
.../atomix/ha/AtomixClusterService.java | 7 ++---
.../component/atomix/ha/AtomixClusterView.java | 8 ++----
4 files changed, 7 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/c7384488/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java
deleted file mode 100644
index b13de80..0000000
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.atomix;
-
-public interface AtomixConfigurationAware<C extends AtomixConfiguration> {
- /**
- * @return the configuration
- */
- C getConfiguration();
-
- /**
- * @param configuration the configuration
- */
- void setConfiguration(C configuration);
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/c7384488/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
index 00695c0..cc135ca 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
@@ -22,7 +22,6 @@ import io.atomix.AtomixClient;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Transport;
import org.apache.camel.CamelContext;
-import org.apache.camel.component.atomix.AtomixConfigurationAware;
import org.apache.camel.component.atomix.client.AtomixClientConfiguration;
import org.apache.camel.component.atomix.client.AtomixClientHelper;
import org.apache.camel.impl.ha.AbstractCamelClusterService;
@@ -30,7 +29,7 @@ import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class AtomixClusterClientService extends AbstractCamelClusterService<AtomixClusterView> implements AtomixConfigurationAware<AtomixClientConfiguration> {
+public final class AtomixClusterClientService extends AbstractCamelClusterService<AtomixClusterView> {
private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClusterClientService.class);
private AtomixClientConfiguration configuration;
private AtomixClient atomix;
@@ -49,12 +48,10 @@ public final class AtomixClusterClientService extends AbstractCamelClusterServic
// Properties
// **********************************
- @Override
public AtomixClientConfiguration getConfiguration() {
return configuration;
}
- @Override
public void setConfiguration(AtomixClientConfiguration configuration) {
this.configuration = configuration.copy();
}
@@ -109,7 +106,7 @@ public final class AtomixClusterClientService extends AbstractCamelClusterServic
@Override
protected AtomixClusterView createView(String namespace) throws Exception {
- return new AtomixClusterView(this, namespace, getOrCreateClient());
+ return new AtomixClusterView(this, namespace, getOrCreateClient(), configuration);
}
private AtomixClient getOrCreateClient() throws Exception {
http://git-wip-us.apache.org/repos/asf/camel/blob/c7384488/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
index a5a8e51..a7901ce 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
@@ -23,13 +23,12 @@ import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Transport;
import io.atomix.copycat.server.storage.StorageLevel;
import org.apache.camel.CamelContext;
-import org.apache.camel.component.atomix.AtomixConfigurationAware;
import org.apache.camel.impl.ha.AbstractCamelClusterService;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class AtomixClusterService extends AbstractCamelClusterService<AtomixClusterView> implements AtomixConfigurationAware<AtomixClusterConfiguration> {
+public final class AtomixClusterService extends AbstractCamelClusterService<AtomixClusterView> {
private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClusterService.class);
private Address address;
@@ -63,12 +62,10 @@ public final class AtomixClusterService extends AbstractCamelClusterService<Atom
this.address = address;
}
- @Override
public AtomixClusterConfiguration getConfiguration() {
return configuration;
}
- @Override
public void setConfiguration(AtomixClusterConfiguration configuration) {
this.configuration = configuration.copy();
}
@@ -139,7 +136,7 @@ public final class AtomixClusterService extends AbstractCamelClusterService<Atom
@Override
protected AtomixClusterView createView(String namespace) throws Exception {
- return new AtomixClusterView(this, namespace, getOrCreateReplica());
+ return new AtomixClusterView(this, namespace, getOrCreateReplica(), configuration);
}
private AtomixReplica getOrCreateReplica() throws Exception {
http://git-wip-us.apache.org/repos/asf/camel/blob/c7384488/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
index 62104a7..c7bbf2d 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
@@ -27,7 +27,6 @@ import io.atomix.group.DistributedGroup;
import io.atomix.group.GroupMember;
import io.atomix.group.LocalMember;
import org.apache.camel.component.atomix.AtomixConfiguration;
-import org.apache.camel.component.atomix.AtomixConfigurationAware;
import org.apache.camel.ha.CamelClusterMember;
import org.apache.camel.ha.CamelClusterService;
import org.apache.camel.impl.ha.AbstractCamelClusterView;
@@ -40,12 +39,14 @@ final class AtomixClusterView extends AbstractCamelClusterView {
private final Atomix atomix;
private final AtomixLocalMember localMember;
+ private final AtomixConfiguration<?> configuration;
private DistributedGroup group;
- AtomixClusterView(CamelClusterService cluster, String namespace, Atomix atomix) {
+ AtomixClusterView(CamelClusterService cluster, String namespace, Atomix atomix, AtomixConfiguration<?> configuration) {
super(cluster, namespace);
this.atomix = atomix;
+ this.configuration = configuration;
this.localMember = new AtomixLocalMember();
}
@@ -89,9 +90,6 @@ final class AtomixClusterView extends AbstractCamelClusterView {
if (!localMember.hasJoined()) {
LOGGER.debug("Get group {}", getNamespace());
- final AtomixConfigurationAware service = AtomixConfigurationAware.class.cast(getClusterService());
- final AtomixConfiguration<?> configuration = service.getConfiguration();
-
group = this.atomix.getGroup(
getNamespace(),
new DistributedGroup.Config(configuration.getResourceConfig(getNamespace())),
[2/2] camel git commit: camel-zookeeper: create a zookeeper
cluster-service
Posted by lb...@apache.org.
camel-zookeeper: create a zookeeper cluster-service
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/37a15c72
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/37a15c72
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/37a15c72
Branch: refs/heads/master
Commit: 37a15c72716dffe37562a4893b34d21a1dc8dd7f
Parents: c738448
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon Jun 26 15:27:13 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Mon Jun 26 17:21:42 2017 +0200
----------------------------------------------------------------------
.../zookeeper/ha/ZooKeeperClusterService.java | 279 +++++++++++++++++++
.../zookeeper/ha/ZooKeeperClusterView.java | 166 +++++++++++
.../zookeeper/ZooKeeperTestSupport.java | 2 +-
.../ha/ZooKeeperClientRoutePolicyTest.java | 122 ++++++++
.../src/test/resources/log4j2.properties | 2 +-
5 files changed, 569 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/37a15c72/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java
new file mode 100644
index 0000000..389d83c
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.ha;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.camel.impl.ha.AbstractCamelClusterService;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperClusterService extends AbstractCamelClusterService<ZooKeeperClusterView> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClusterService.class);
+
+ private CuratorFramework client;
+ private List<String> nodes;
+ private String namespace;
+ private long reconnectBaseSleepTime;
+ private TimeUnit reconnectBaseSleepTimeUnit;
+ private int reconnectMaxRetries;
+ private long sessionTimeout;
+ private TimeUnit sessionTimeoutUnit;
+ private long connectionTimeout;
+ private TimeUnit connectionTimeotUnit;
+ private List<AuthInfo> authInfoList;
+ private long maxCloseWait;
+ private TimeUnit maxCloseWaitUnit;
+ private boolean closeOnStop;
+ private RetryPolicy retryPolicy;
+
+ public ZooKeeperClusterService() {
+ this.reconnectBaseSleepTime = 1000;
+ this.reconnectBaseSleepTimeUnit = TimeUnit.MILLISECONDS;
+ this.reconnectMaxRetries = 3;
+ this.closeOnStop = true;
+
+ // from org.apache.curator.framework.CuratorFrameworkFactory
+ this.sessionTimeout = Integer.getInteger("curator-default-session-timeout", 60 * 1000);
+ this.sessionTimeoutUnit = TimeUnit.MILLISECONDS;
+
+ // from org.apache.curator.framework.CuratorFrameworkFactory
+ this.connectionTimeout = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
+ this.connectionTimeotUnit = TimeUnit.MILLISECONDS;
+
+ // from org.apache.curator.framework.CuratorFrameworkFactory
+ this.maxCloseWait = 1000;
+ this.maxCloseWaitUnit = TimeUnit.MILLISECONDS;
+ }
+
+ // *********************************************
+ // Properties
+ // *********************************************
+
+ public CuratorFramework getClient() {
+ return client;
+ }
+
+ public void setClient(CuratorFramework client) {
+ this.client = client;
+ }
+
+ public List<String> getNodes() {
+ return nodes;
+ }
+
+ public void setNodes(String nodes) {
+ this.nodes = Collections.unmodifiableList(
+ Arrays.stream(nodes.split(",")).collect(Collectors.toList())
+ );
+ }
+
+ public void setNodes(List<String> nodes) {
+ this.nodes = Collections.unmodifiableList(new ArrayList<>(nodes));
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ public long getReconnectBaseSleepTime() {
+ return reconnectBaseSleepTime;
+ }
+
+ public void setReconnectBaseSleepTime(long reconnectBaseSleepTime) {
+ this.reconnectBaseSleepTime = reconnectBaseSleepTime;
+ }
+
+ public void setReconnectBaseSleepTime(long reconnectBaseSleepTime, TimeUnit reconnectBaseSleepTimeUnit) {
+ this.reconnectBaseSleepTime = reconnectBaseSleepTime;
+ this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit;
+ }
+
+ public TimeUnit getReconnectBaseSleepTimeUnit() {
+ return reconnectBaseSleepTimeUnit;
+ }
+
+ public void setReconnectBaseSleepTimeUnit(TimeUnit reconnectBaseSleepTimeUnit) {
+ this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit;
+ }
+
+ public int getReconnectMaxRetries() {
+ return reconnectMaxRetries;
+ }
+
+ public void setReconnectMaxRetries(int reconnectMaxRetries) {
+ this.reconnectMaxRetries = reconnectMaxRetries;
+ }
+
+ public long getSessionTimeout() {
+ return sessionTimeout;
+ }
+
+ public void setSessionTimeout(long sessionTimeout) {
+ this.sessionTimeout = sessionTimeout;
+ }
+
+ public void setSessionTimeout(long sessionTimeout, TimeUnit sessionTimeoutUnit) {
+ this.sessionTimeout = sessionTimeout;
+ this.sessionTimeoutUnit = sessionTimeoutUnit;
+ }
+
+ public TimeUnit getSessionTimeoutUnit() {
+ return sessionTimeoutUnit;
+ }
+
+ public void setSessionTimeoutUnit(TimeUnit sessionTimeoutUnit) {
+ this.sessionTimeoutUnit = sessionTimeoutUnit;
+ }
+
+ public long getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public void setConnectionTimeout(long connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ public void setConnectionTimeout(long connectionTimeout, TimeUnit connectionTimeotUnit) {
+ this.connectionTimeout = connectionTimeout;
+ this.connectionTimeotUnit = connectionTimeotUnit;
+ }
+
+ public TimeUnit getConnectionTimeotUnit() {
+ return connectionTimeotUnit;
+ }
+
+ public void setConnectionTimeotUnit(TimeUnit connectionTimeotUnit) {
+ this.connectionTimeotUnit = connectionTimeotUnit;
+ }
+
+ public List<AuthInfo> getAuthInfoList() {
+ return authInfoList;
+ }
+
+ public void setAuthInfoList(List<AuthInfo> authInfoList) {
+ this.authInfoList = authInfoList;
+ }
+
+ public long getMaxCloseWait() {
+ return maxCloseWait;
+ }
+
+ public void setMaxCloseWait(long maxCloseWait) {
+ this.maxCloseWait = maxCloseWait;
+ }
+
+ public TimeUnit getMaxCloseWaitUnit() {
+ return maxCloseWaitUnit;
+ }
+
+ public void setMaxCloseWaitUnit(TimeUnit maxCloseWaitUnit) {
+ this.maxCloseWaitUnit = maxCloseWaitUnit;
+ }
+
+ public boolean isCloseOnStop() {
+ return closeOnStop;
+ }
+
+ public void setCloseOnStop(boolean closeOnStop) {
+ this.closeOnStop = closeOnStop;
+ }
+
+ public RetryPolicy getRetryPolicy() {
+ return retryPolicy;
+ }
+
+ public void setRetryPolicy(RetryPolicy retryPolicy) {
+ this.retryPolicy = retryPolicy;
+ }
+
+ // *********************************************
+ //
+ // *********************************************
+
+ @Override
+ protected ZooKeeperClusterView createView(String namespace) throws Exception {
+ return new ZooKeeperClusterView(this, getOrCreateClient(), namespace);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ // instantiate a new CuratorFramework
+ getOrCreateClient();
+
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+
+ if (client != null && closeOnStop) {
+ client.close();
+ }
+ }
+
+ private CuratorFramework getOrCreateClient() throws Exception {
+ if (client == null) {
+ // Validate parameters
+ ObjectHelper.notNull(getCamelContext(), "Camel Context");
+ ObjectHelper.notNull(nodes, "ZooKeeper Nodes");
+
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+ .connectString(String.join(",", nodes))
+ .sessionTimeoutMs((int)sessionTimeoutUnit.toMillis(sessionTimeout))
+ .connectionTimeoutMs((int)connectionTimeotUnit.toMillis(connectionTimeout))
+ .maxCloseWaitMs((int)maxCloseWaitUnit.toMillis(maxCloseWait))
+ .retryPolicy(retryPolicy());
+
+ Optional.ofNullable(namespace).ifPresent(builder::namespace);
+ Optional.ofNullable(authInfoList).ifPresent(builder::authorization);
+
+ LOGGER.debug("Connect to ZooKeeper with namespace {}, nodes: {}", namespace, nodes);
+ client = builder.build();
+
+ LOGGER.debug("Starting ZooKeeper client");
+ client.start();
+ }
+
+ return this.client;
+ }
+
+ private RetryPolicy retryPolicy() {
+ return retryPolicy != null
+ ? retryPolicy
+ : new ExponentialBackoffRetry(
+ (int)reconnectBaseSleepTimeUnit.toMillis(reconnectBaseSleepTime),
+ reconnectMaxRetries);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/37a15c72/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
new file mode 100644
index 0000000..86e530f
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.ha;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.ha.CamelClusterMember;
+import org.apache.camel.ha.CamelClusterService;
+import org.apache.camel.impl.ha.AbstractCamelClusterView;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.recipes.leader.Participant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ZooKeeperClusterView extends AbstractCamelClusterView {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClusterView.class);
+
+ private final CuratorFramework client;
+ private final CuratorLocalMember localMember;
+ private LeaderSelector leaderSelector;
+
+ public ZooKeeperClusterView(CamelClusterService cluster, CuratorFramework client, String namespace) {
+ super(cluster, namespace);
+
+ this.localMember = new CuratorLocalMember();
+ this.client = client;
+ }
+
+ @Override
+ public CamelClusterMember getLocalMember() {
+ return this.localMember;
+ }
+
+ @Override
+ public Optional<CamelClusterMember> getMaster() {
+ if (leaderSelector == null) {
+ return Optional.empty();
+ }
+
+ try {
+ return Optional.of(new CuratorClusterMember(leaderSelector.getLeader()));
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+
+ @Override
+ public List<CamelClusterMember> getMembers() {
+ if (leaderSelector == null) {
+ return Collections.emptyList();
+ }
+
+ try {
+ return leaderSelector.getParticipants()
+ .stream()
+ .map(CuratorClusterMember::new)
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (leaderSelector == null) {
+ leaderSelector = new LeaderSelector(client, getNamespace(), new CamelLeaderElectionListener());
+ leaderSelector.setId(getClusterService().getId());
+ leaderSelector.start();
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (leaderSelector != null) {
+ leaderSelector.interruptLeadership();
+ leaderSelector.close();
+ leaderSelector = null;
+ }
+ }
+
+ class CamelLeaderElectionListener extends LeaderSelectorListenerAdapter {
+ @Override
+ public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
+ localMember.setMaster(true);
+ fireLeadershipChangedEvent(localMember);
+
+ while (isRunAllowed()) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ break;
+ }
+ }
+
+ localMember.setMaster(false);
+ getMaster().ifPresent(leader -> fireLeadershipChangedEvent(leader));
+ }
+ }
+
+ // ***********************************************
+ //
+ // ***********************************************
+
+ private final class CuratorLocalMember implements CamelClusterMember {
+ private AtomicBoolean master = new AtomicBoolean(false);
+
+ void setMaster(boolean master) {
+ this.master.set(master);
+ }
+
+ @Override
+ public boolean isMaster() {
+ return master.get();
+ }
+
+ @Override
+ public String getId() {
+ return getClusterService().getId();
+ }
+ }
+
+ private final class CuratorClusterMember implements CamelClusterMember {
+ private final Participant participant;
+
+ CuratorClusterMember(Participant participant) {
+ this.participant = participant;
+ }
+
+ @Override
+ public String getId() {
+ return participant.getId();
+ }
+
+ @Override
+ public boolean isMaster() {
+ try {
+ return leaderSelector.getLeader().equals(this.participant);
+ } catch (Exception e) {
+ LOGGER.debug("", e);
+ return false;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/37a15c72/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
index 6225e60..ae27c78 100644
--- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
@@ -225,7 +225,7 @@ public class ZooKeeperTestSupport extends CamelTestSupport {
// Wait methods are taken directly from the Zookeeper tests. A tests jar
// would be nice! Another good reason the keeper folks should move to maven.
- private static boolean waitForServerUp(String hp, long timeout) {
+ public static boolean waitForServerUp(String hp, long timeout) {
long start = System.currentTimeMillis();
while (true) {
try {
http://git-wip-us.apache.org/repos/asf/camel/blob/37a15c72/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClientRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClientRoutePolicyTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClientRoutePolicyTest.java
new file mode 100644
index 0000000..564940d
--- /dev/null
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClientRoutePolicyTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.ha;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport.TestZookeeperServer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ZooKeeperClientRoutePolicyTest {
+ private static final int PORT = AvailablePortFinder.getNextAvailable();
+ private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClientRoutePolicyTest.class);
+ private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList());
+ private static final List<String> RESULTS = new ArrayList<>();
+ private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2);
+ private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size());
+
+ // ************************************
+ // Test
+ // ************************************
+
+ @Test
+ public void test() throws Exception {
+ TestZookeeperServer server = null;
+
+ try {
+ server = new TestZookeeperServer(PORT, true);
+ ZooKeeperTestSupport.waitForServerUp("localhost:" + PORT, 1000);
+
+ for (String id : CLIENTS) {
+ SCHEDULER.submit(() -> run(id));
+ }
+
+ LATCH.await(1, TimeUnit.MINUTES);
+ SCHEDULER.shutdownNow();
+
+ Assert.assertEquals(CLIENTS.size(), RESULTS.size());
+ Assert.assertTrue(RESULTS.containsAll(CLIENTS));
+ } finally {
+ if (server != null) {
+ server.shutdown();
+ }
+ }
+ }
+
+ // ************************************
+ // Run a Camel node
+ // ************************************
+
+ private static void run(String id) {
+ try {
+ CountDownLatch contextLatch = new CountDownLatch(1);
+
+ ZooKeeperClusterService service = new ZooKeeperClusterService();
+ service.setId("node-" + id);
+ service.setNodes("localhost:" + PORT);
+ service.setNamespace(null );
+
+ DefaultCamelContext context = new DefaultCamelContext();
+ context.disableJMX();
+ context.setName("context-" + id);
+ context.addService(service);
+ context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("/my-ns"));
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("timer:zookeeper?delay=1s&period=1s&repeatCount=1")
+ .routeId("route-" + id)
+ .process(e -> {
+ LOGGER.debug("Node {} done", id);
+ RESULTS.add(id);
+ // Shutdown the context later on to give a chance to
+ // other members to catch-up
+ SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
+ });
+ }
+ });
+
+ // Start the context after some random time so the startup order
+ // changes for each test.
+ Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+ context.start();
+
+ contextLatch.await();
+ context.stop();
+
+ LATCH.countDown();
+ } catch (Exception e) {
+ LOGGER.warn("", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/37a15c72/components/camel-zookeeper/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/resources/log4j2.properties b/components/camel-zookeeper/src/test/resources/log4j2.properties
index d7bb987..2e7cfa5 100644
--- a/components/camel-zookeeper/src/test/resources/log4j2.properties
+++ b/components/camel-zookeeper/src/test/resources/log4j2.properties
@@ -43,6 +43,6 @@ logger.springframework.name = org.springframework
logger.springframework.level = WARN
rootLogger.level = INFO
#rootLogger.appenderRef.stdout.ref = out
-rootLogger.appenderRef.file.ref = file
+rootLogger.appenderRef.file.ref = out