You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/01/19 01:10:52 UTC

[4/4] curator git commit: Merge branch 'master' into CURATOR-287

Merge branch 'master' into CURATOR-287

Conflicts:
	curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/649e0ba2
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/649e0ba2
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/649e0ba2

Branch: refs/heads/master
Commit: 649e0ba24b4123829e1267c755318a288f6f9e0c
Parents: 55ca08c b08b543
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 18 19:10:14 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 18 19:10:14 2016 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |  4 ++
 .../apache/curator/CuratorZookeeperClient.java  |  2 +
 .../main/java/org/apache/curator/RetryLoop.java |  2 +
 .../apache/curator/SessionFailRetryLoop.java    |  2 +
 .../exhibitor/ExhibitorEnsembleProvider.java    |  2 +
 .../org/apache/curator/utils/ThreadUtils.java   | 23 +++++++
 .../java/org/apache/curator/utils/ZKPaths.java  |  2 +-
 .../src/main/java/locking/LockingExample.java   |  7 ++-
 .../framework/api/VersionPathAndBytesable.java  | 25 ++++++++
 .../transaction/TransactionCreateBuilder.java   |  5 +-
 .../transaction/TransactionSetDataBuilder.java  |  4 +-
 .../curator/framework/imps/Backgrounding.java   |  2 +
 .../framework/imps/CreateBuilderImpl.java       | 15 +++--
 .../framework/imps/CuratorFrameworkImpl.java    | 46 +++++++++-----
 .../framework/imps/DeleteBuilderImpl.java       |  2 +
 .../framework/imps/FailedDeleteManager.java     |  5 +-
 .../FindAndDeleteProtectedNodeInBackground.java |  3 +
 .../framework/imps/GetDataBuilderImpl.java      |  2 +
 .../curator/framework/imps/NamespaceImpl.java   |  2 +
 .../framework/imps/NamespaceWatcher.java        |  2 +
 .../framework/imps/OperationAndData.java        | 11 +++-
 .../framework/imps/SetDataBuilderImpl.java      |  8 +--
 .../framework/listen/ListenerContainer.java     |  2 +
 .../framework/state/ConnectionStateManager.java | 14 +++--
 .../framework/imps/TestTransactions.java        | 46 +++++++++++++-
 .../recipes/AfterConnectionEstablished.java     |  1 +
 .../recipes/cache/DefaultTreeCacheSelector.java | 37 +++++++++++
 .../framework/recipes/cache/NodeCache.java      |  4 ++
 .../recipes/cache/PathChildrenCache.java        |  4 ++
 .../framework/recipes/cache/TreeCache.java      | 45 +++++++++----
 .../recipes/cache/TreeCacheSelector.java        | 66 ++++++++++++++++++++
 .../framework/recipes/leader/LeaderLatch.java   |  5 ++
 .../recipes/leader/LeaderSelector.java          | 10 ++-
 .../framework/recipes/locks/ChildReaper.java    |  1 +
 .../recipes/locks/InterProcessMultiLock.java    |  4 ++
 .../recipes/locks/InterProcessSemaphore.java    |  4 ++
 .../recipes/locks/InterProcessSemaphoreV2.java  |  2 +
 .../framework/recipes/locks/LockInternals.java  |  2 +
 .../curator/framework/recipes/locks/Reaper.java |  1 +
 .../framework/recipes/nodes/GroupMember.java    |  3 +
 .../framework/recipes/nodes/PersistentNode.java |  3 +
 .../recipes/queue/DistributedQueue.java         | 43 ++++++++-----
 .../framework/recipes/queue/QueueSharder.java   | 16 +++--
 .../framework/recipes/shared/SharedValue.java   |  2 +
 ...estResetConnectionWithBackgroundFailure.java | 36 +++++------
 .../framework/recipes/cache/TestTreeCache.java  | 57 +++++++++++++++--
 .../framework/recipes/locks/TestLockACLs.java   | 50 ++++++++++++---
 .../curator/test/TestingZooKeeperMain.java      | 31 ++++++++-
 .../entity/JsonServiceInstanceMarshaller.java   |  3 +
 .../entity/JsonServiceInstancesMarshaller.java  |  2 +
 .../server/rest/DiscoveryResource.java          |  6 ++
 .../discovery/server/rest/InstanceCleanup.java  |  2 +
 .../discovery/details/ServiceDiscoveryImpl.java |  3 +
 .../x/rpc/idl/discovery/DiscoveryService.java   |  8 +++
 .../idl/discovery/DiscoveryServiceLowLevel.java |  7 +++
 .../idl/services/CuratorProjectionService.java  | 25 ++++++++
 56 files changed, 606 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/649e0ba2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index cf0bf38,0000000..0d7ab9d
mode 100644,000000..100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@@ -1,382 -1,0 +1,385 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.curator.framework.recipes.nodes;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +import org.apache.curator.framework.CuratorFramework;
 +import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
 +import org.apache.curator.framework.api.BackgroundCallback;
 +import org.apache.curator.framework.api.CreateBuilder;
 +import org.apache.curator.framework.api.CreateModable;
 +import org.apache.curator.framework.api.CuratorEvent;
 +import org.apache.curator.framework.api.CuratorWatcher;
 +import org.apache.curator.framework.state.ConnectionState;
 +import org.apache.curator.framework.state.ConnectionStateListener;
 +import org.apache.curator.utils.PathUtils;
++import org.apache.curator.utils.ThreadUtils;
 +import org.apache.zookeeper.CreateMode;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher.Event.EventType;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import java.io.Closeable;
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +/**
 + * <p>
 + * A persistent node is a node that attempts to stay present in
 + * ZooKeeper, even through connection and session interruptions.
 + * </p>
 + * <p>
 + * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
 + * </p>
 + */
 +public class PersistentNode implements Closeable
 +{
 +    private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
 +    private final Logger log = LoggerFactory.getLogger(getClass());
 +    private final CuratorFramework client;
 +    private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
 +    private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
 +    private final String basePath;
 +    private final CreateMode mode;
 +    private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
 +    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
 +    private final AtomicBoolean authFailure = new AtomicBoolean(false);
 +    private final BackgroundCallback backgroundCallback;
 +    private final boolean useProtection;
 +    private final CuratorWatcher watcher = new CuratorWatcher()
 +    {
 +        @Override
 +        public void process(WatchedEvent event) throws Exception
 +        {
 +            if ( event.getType() == EventType.NodeDeleted )
 +            {
 +                createNode();
 +            }
 +            else if ( event.getType() == EventType.NodeDataChanged )
 +            {
 +                watchNode();
 +            }
 +        }
 +    };
 +    private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
 +    {
 +        @Override
 +        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
 +        {
 +            if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
 +            {
 +                createNode();
 +            }
 +            else
 +            {
 +                boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
 +                if ( isEphemeral != mode.isEphemeral() )
 +                {
 +                    log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
 +                }
 +            }
 +        }
 +    };
 +    private final BackgroundCallback setDataCallback = new BackgroundCallback()
 +    {
 +
 +        @Override
 +        public void processResult(CuratorFramework client, CuratorEvent event)
 +            throws Exception
 +        {
 +            //If the result is ok then initialisation is complete (if we're still initialising)
 +            //Don't retry on other errors as the only recoverable cases will be connection loss
 +            //and the node not existing, both of which are already handled by other watches.
 +            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
 +            {
 +                //Update is ok, mark initialisation as complete if required.
 +                initialisationComplete();
 +            }
 +        }
 +    };
 +    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
 +    {
 +        @Override
 +        public void stateChanged(CuratorFramework client, ConnectionState newState)
 +        {
 +            if ( newState == ConnectionState.RECONNECTED )
 +            {
 +                createNode();
 +            }
 +        }
 +    };
 +
 +    private enum State
 +    {
 +        LATENT,
 +        STARTED,
 +        CLOSED
 +    }
 +
 +    /**
 +     * @param client        client instance
 +     * @param mode          creation mode
 +     * @param useProtection if true, call {@link CreateBuilder#withProtection()}
 +     * @param basePath the base path for the node
 +     * @param initData data for the node
 +     */
 +    public PersistentNode(CuratorFramework client, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
 +    {
 +        this.useProtection = useProtection;
 +        this.client = Preconditions.checkNotNull(client, "client cannot be null");
 +        this.basePath = PathUtils.validatePath(basePath);
 +        this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
 +        final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
 +
 +        backgroundCallback = new BackgroundCallback()
 +        {
 +            @Override
 +            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
 +            {
 +                String path = null;
 +                boolean nodeExists = false;
 +                if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
 +                {
 +                    path = event.getPath();
 +                    nodeExists = true;
 +                }
 +                else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
 +                {
 +                    path = event.getName();
 +                }
 +                else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
 +                {
 +                    log.warn("Client does not have authorisation to write node at path {}", event.getPath());
 +                    authFailure.set(true);
 +                    return;
 +                }
 +                if ( path != null )
 +                {
 +                    authFailure.set(false);
 +                    nodePath.set(path);
 +                    watchNode();
 +
 +                    if ( nodeExists )
 +                    {
 +                        client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
 +                    }
 +                    else
 +                    {
 +                        initialisationComplete();
 +                    }
 +                }
 +                else
 +                {
 +                    createNode();
 +                }
 +            }
 +        };
 +
 +        createMethod = useProtection ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
 +        this.data.set(Arrays.copyOf(data, data.length));
 +    }
 +
 +    private void initialisationComplete()
 +    {
 +        CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
 +        if ( localLatch != null )
 +        {
 +            localLatch.countDown();
 +        }
 +    }
 +
 +    /**
 +     * You must call start() to initiate the persistent node. An attempt to create the node
 +     * in the background will be started
 +     */
 +    public void start()
 +    {
 +        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
 +
 +        client.getConnectionStateListenable().addListener(connectionStateListener);
 +        createNode();
 +    }
 +
 +    /**
 +     * Block until the either initial node creation initiated by {@link #start()} succeeds or
 +     * the timeout elapses.
 +     *
 +     * @param timeout the maximum time to wait
 +     * @param unit    time unit
 +     * @return if the node was created before timeout
 +     * @throws InterruptedException if the thread is interrupted
 +     */
 +    public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
 +    {
 +        Preconditions.checkState(state.get() == State.STARTED, "Not started");
 +
 +        CountDownLatch localLatch = initialCreateLatch.get();
 +        return (localLatch == null) || localLatch.await(timeout, unit);
 +    }
 +
 +    @Override
 +    public void close() throws IOException
 +    {
 +        if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
 +        {
 +            return;
 +        }
 +
 +        client.getConnectionStateListenable().removeListener(connectionStateListener);
 +
 +        try
 +        {
 +            deleteNode();
 +        }
 +        catch ( Exception e )
 +        {
++            ThreadUtils.checkInterrupted(e);
 +            throw new IOException(e);
 +        }
 +    }
 +
 +    /**
 +     * Returns the currently set path or null if the node does not exist
 +     *
 +     * @return node path or null
 +     */
 +    public String getActualPath()
 +    {
 +        return nodePath.get();
 +    }
 +
 +    /**
 +     * Set data that node should set in ZK also writes the data to the node
 +     *
 +     * @param data new data value
 +     * @throws Exception errors
 +     */
 +    public void setData(byte[] data) throws Exception
 +    {
 +        data = Preconditions.checkNotNull(data, "data cannot be null");
 +        this.data.set(Arrays.copyOf(data, data.length));
 +        if ( isActive() )
 +        {
 +            client.setData().inBackground().forPath(getActualPath(), getData());
 +        }
 +    }
 +
 +    /**
 +     * Return the current value of our data
 +     *
 +     * @return our data
 +     */
 +    public byte[] getData()
 +    {
 +        return this.data.get();
 +    }
 +
 +    private void deleteNode() throws Exception
 +    {
 +        String localNodePath = nodePath.getAndSet(null);
 +        if ( localNodePath != null )
 +        {
 +            try
 +            {
 +                client.delete().guaranteed().forPath(localNodePath);
 +            }
 +            catch ( KeeperException.NoNodeException ignore )
 +            {
 +                // ignore
 +            }
 +        }
 +    }
 +
 +    private void createNode()
 +    {
 +        if ( !isActive() )
 +        {
 +            return;
 +        }
 +
 +        try
 +        {
 +            String existingPath = nodePath.get();
 +            String createPath = (existingPath != null && !useProtection) ? existingPath : basePath;
 +            createMethod.withMode(getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data.get());
 +        }
 +        catch ( Exception e )
 +        {
++            ThreadUtils.checkInterrupted(e);
 +            throw new RuntimeException("Creating node. BasePath: " + basePath, e);  // should never happen unless there's a programming error - so throw RuntimeException
 +        }
 +    }
 +
 +    private CreateMode getCreateMode(boolean pathIsSet)
 +    {
 +        if ( pathIsSet )
 +        {
 +            switch ( mode )
 +            {
 +            default:
 +            {
 +                break;
 +            }
 +
 +            case EPHEMERAL_SEQUENTIAL:
 +            {
 +                return CreateMode.EPHEMERAL;    // protection case - node already set
 +            }
 +
 +            case PERSISTENT_SEQUENTIAL:
 +            {
 +                return CreateMode.PERSISTENT;    // protection case - node already set
 +            }
 +            }
 +        }
 +        return mode;
 +    }
 +
 +    private void watchNode() throws Exception
 +    {
 +        if ( !isActive() )
 +        {
 +            return;
 +        }
 +
 +        String localNodePath = nodePath.get();
 +        if ( localNodePath != null )
 +        {
 +            client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
 +        }
 +    }
 +
 +    private boolean isActive()
 +    {
 +        return (state.get() == State.STARTED);
 +    }
 +
 +    @VisibleForTesting
 +    boolean isAuthFailure()
 +    {
 +        return authFailure.get();
 +    }
 +}