You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/01/19 03:05:54 UTC
[4/5] curator git commit: Merge branch 'master' into CURATOR-287
Merge branch 'master' into CURATOR-287
Conflicts:
curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/649e0ba2
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/649e0ba2
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/649e0ba2
Branch: refs/heads/CURATOR-3.0
Commit: 649e0ba24b4123829e1267c755318a288f6f9e0c
Parents: 55ca08c b08b543
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 18 19:10:14 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 18 19:10:14 2016 -0500
----------------------------------------------------------------------
.../org/apache/curator/ConnectionState.java | 4 ++
.../apache/curator/CuratorZookeeperClient.java | 2 +
.../main/java/org/apache/curator/RetryLoop.java | 2 +
.../apache/curator/SessionFailRetryLoop.java | 2 +
.../exhibitor/ExhibitorEnsembleProvider.java | 2 +
.../org/apache/curator/utils/ThreadUtils.java | 23 +++++++
.../java/org/apache/curator/utils/ZKPaths.java | 2 +-
.../src/main/java/locking/LockingExample.java | 7 ++-
.../framework/api/VersionPathAndBytesable.java | 25 ++++++++
.../transaction/TransactionCreateBuilder.java | 5 +-
.../transaction/TransactionSetDataBuilder.java | 4 +-
.../curator/framework/imps/Backgrounding.java | 2 +
.../framework/imps/CreateBuilderImpl.java | 15 +++--
.../framework/imps/CuratorFrameworkImpl.java | 46 +++++++++-----
.../framework/imps/DeleteBuilderImpl.java | 2 +
.../framework/imps/FailedDeleteManager.java | 5 +-
.../FindAndDeleteProtectedNodeInBackground.java | 3 +
.../framework/imps/GetDataBuilderImpl.java | 2 +
.../curator/framework/imps/NamespaceImpl.java | 2 +
.../framework/imps/NamespaceWatcher.java | 2 +
.../framework/imps/OperationAndData.java | 11 +++-
.../framework/imps/SetDataBuilderImpl.java | 8 +--
.../framework/listen/ListenerContainer.java | 2 +
.../framework/state/ConnectionStateManager.java | 14 +++--
.../framework/imps/TestTransactions.java | 46 +++++++++++++-
.../recipes/AfterConnectionEstablished.java | 1 +
.../recipes/cache/DefaultTreeCacheSelector.java | 37 +++++++++++
.../framework/recipes/cache/NodeCache.java | 4 ++
.../recipes/cache/PathChildrenCache.java | 4 ++
.../framework/recipes/cache/TreeCache.java | 45 +++++++++----
.../recipes/cache/TreeCacheSelector.java | 66 ++++++++++++++++++++
.../framework/recipes/leader/LeaderLatch.java | 5 ++
.../recipes/leader/LeaderSelector.java | 10 ++-
.../framework/recipes/locks/ChildReaper.java | 1 +
.../recipes/locks/InterProcessMultiLock.java | 4 ++
.../recipes/locks/InterProcessSemaphore.java | 4 ++
.../recipes/locks/InterProcessSemaphoreV2.java | 2 +
.../framework/recipes/locks/LockInternals.java | 2 +
.../curator/framework/recipes/locks/Reaper.java | 1 +
.../framework/recipes/nodes/GroupMember.java | 3 +
.../framework/recipes/nodes/PersistentNode.java | 3 +
.../recipes/queue/DistributedQueue.java | 43 ++++++++-----
.../framework/recipes/queue/QueueSharder.java | 16 +++--
.../framework/recipes/shared/SharedValue.java | 2 +
...estResetConnectionWithBackgroundFailure.java | 36 +++++------
.../framework/recipes/cache/TestTreeCache.java | 57 +++++++++++++++--
.../framework/recipes/locks/TestLockACLs.java | 50 ++++++++++++---
.../curator/test/TestingZooKeeperMain.java | 31 ++++++++-
.../entity/JsonServiceInstanceMarshaller.java | 3 +
.../entity/JsonServiceInstancesMarshaller.java | 2 +
.../server/rest/DiscoveryResource.java | 6 ++
.../discovery/server/rest/InstanceCleanup.java | 2 +
.../discovery/details/ServiceDiscoveryImpl.java | 3 +
.../x/rpc/idl/discovery/DiscoveryService.java | 8 +++
.../idl/discovery/DiscoveryServiceLowLevel.java | 7 +++
.../idl/services/CuratorProjectionService.java | 25 ++++++++
56 files changed, 606 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/649e0ba2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index cf0bf38,0000000..0d7ab9d
mode 100644,000000..100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@@ -1,382 -1,0 +1,385 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.nodes;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.PathUtils;
++import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * <p>
+ * A persistent node is a node that attempts to stay present in
+ * ZooKeeper, even through connection and session interruptions.
+ * </p>
+ * <p>
+ * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
+ * </p>
+ */
+public class PersistentNode implements Closeable
+{
+ private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFramework client;
+ private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
+ private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
+ private final String basePath;
+ private final CreateMode mode;
+ private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final AtomicBoolean authFailure = new AtomicBoolean(false);
+ private final BackgroundCallback backgroundCallback;
+ private final boolean useProtection;
+ private final CuratorWatcher watcher = new CuratorWatcher()
+ {
+ @Override
+ public void process(WatchedEvent event) throws Exception
+ {
+ if ( event.getType() == EventType.NodeDeleted )
+ {
+ createNode();
+ }
+ else if ( event.getType() == EventType.NodeDataChanged )
+ {
+ watchNode();
+ }
+ }
+ };
+ private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
+ {
+ createNode();
+ }
+ else
+ {
+ boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
+ if ( isEphemeral != mode.isEphemeral() )
+ {
+ log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
+ }
+ }
+ }
+ };
+ private final BackgroundCallback setDataCallback = new BackgroundCallback()
+ {
+
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event)
+ throws Exception
+ {
+ //If the result is ok then initialisation is complete (if we're still initialising)
+ //Don't retry on other errors as the only recoverable cases will be connection loss
+ //and the node not existing, both of which are already handled by other watches.
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ //Update is ok, mark initialisation as complete if required.
+ initialisationComplete();
+ }
+ }
+ };
+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ createNode();
+ }
+ }
+ };
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ /**
+ * @param client client instance
+ * @param mode creation mode
+ * @param useProtection if true, call {@link CreateBuilder#withProtection()}
+ * @param basePath the base path for the node
+ * @param initData data for the node
+ */
+ public PersistentNode(CuratorFramework client, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
+ {
+ this.useProtection = useProtection;
+ this.client = Preconditions.checkNotNull(client, "client cannot be null");
+ this.basePath = PathUtils.validatePath(basePath);
+ this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+ final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
+
+ backgroundCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ String path = null;
+ boolean nodeExists = false;
+ if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
+ {
+ path = event.getPath();
+ nodeExists = true;
+ }
+ else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ path = event.getName();
+ }
+ else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
+ {
+ log.warn("Client does not have authorisation to write node at path {}", event.getPath());
+ authFailure.set(true);
+ return;
+ }
+ if ( path != null )
+ {
+ authFailure.set(false);
+ nodePath.set(path);
+ watchNode();
+
+ if ( nodeExists )
+ {
+ client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
+ }
+ else
+ {
+ initialisationComplete();
+ }
+ }
+ else
+ {
+ createNode();
+ }
+ }
+ };
+
+ createMethod = useProtection ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
+ this.data.set(Arrays.copyOf(data, data.length));
+ }
+
+ private void initialisationComplete()
+ {
+ CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+ if ( localLatch != null )
+ {
+ localLatch.countDown();
+ }
+ }
+
+ /**
+ * You must call start() to initiate the persistent node. An attempt to create the node
+ * in the background will be started
+ */
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ createNode();
+ }
+
+ /**
+ * Block until the either initial node creation initiated by {@link #start()} succeeds or
+ * the timeout elapses.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit time unit
+ * @return if the node was created before timeout
+ * @throws InterruptedException if the thread is interrupted
+ */
+ public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ Preconditions.checkState(state.get() == State.STARTED, "Not started");
+
+ CountDownLatch localLatch = initialCreateLatch.get();
+ return (localLatch == null) || localLatch.await(timeout, unit);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ return;
+ }
+
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+
+ try
+ {
+ deleteNode();
+ }
+ catch ( Exception e )
+ {
++ ThreadUtils.checkInterrupted(e);
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Returns the currently set path or null if the node does not exist
+ *
+ * @return node path or null
+ */
+ public String getActualPath()
+ {
+ return nodePath.get();
+ }
+
+ /**
+ * Set data that node should set in ZK also writes the data to the node
+ *
+ * @param data new data value
+ * @throws Exception errors
+ */
+ public void setData(byte[] data) throws Exception
+ {
+ data = Preconditions.checkNotNull(data, "data cannot be null");
+ this.data.set(Arrays.copyOf(data, data.length));
+ if ( isActive() )
+ {
+ client.setData().inBackground().forPath(getActualPath(), getData());
+ }
+ }
+
+ /**
+ * Return the current value of our data
+ *
+ * @return our data
+ */
+ public byte[] getData()
+ {
+ return this.data.get();
+ }
+
+ private void deleteNode() throws Exception
+ {
+ String localNodePath = nodePath.getAndSet(null);
+ if ( localNodePath != null )
+ {
+ try
+ {
+ client.delete().guaranteed().forPath(localNodePath);
+ }
+ catch ( KeeperException.NoNodeException ignore )
+ {
+ // ignore
+ }
+ }
+ }
+
+ private void createNode()
+ {
+ if ( !isActive() )
+ {
+ return;
+ }
+
+ try
+ {
+ String existingPath = nodePath.get();
+ String createPath = (existingPath != null && !useProtection) ? existingPath : basePath;
+ createMethod.withMode(getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data.get());
+ }
+ catch ( Exception e )
+ {
++ ThreadUtils.checkInterrupted(e);
+ throw new RuntimeException("Creating node. BasePath: " + basePath, e); // should never happen unless there's a programming error - so throw RuntimeException
+ }
+ }
+
+ private CreateMode getCreateMode(boolean pathIsSet)
+ {
+ if ( pathIsSet )
+ {
+ switch ( mode )
+ {
+ default:
+ {
+ break;
+ }
+
+ case EPHEMERAL_SEQUENTIAL:
+ {
+ return CreateMode.EPHEMERAL; // protection case - node already set
+ }
+
+ case PERSISTENT_SEQUENTIAL:
+ {
+ return CreateMode.PERSISTENT; // protection case - node already set
+ }
+ }
+ }
+ return mode;
+ }
+
+ private void watchNode() throws Exception
+ {
+ if ( !isActive() )
+ {
+ return;
+ }
+
+ String localNodePath = nodePath.get();
+ if ( localNodePath != null )
+ {
+ client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
+ }
+ }
+
+ private boolean isActive()
+ {
+ return (state.get() == State.STARTED);
+ }
+
+ @VisibleForTesting
+ boolean isAuthFailure()
+ {
+ return authFailure.get();
+ }
+}