You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/01/18 01:30:00 UTC
[9/9] curator git commit: Merge branch 'master' into CURATOR-3.0
Merge branch 'master' into CURATOR-3.0
Conflicts:
curator-client/src/main/java/org/apache/curator/RetryLoop.java
curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9a03ea93
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9a03ea93
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9a03ea93
Branch: refs/heads/CURATOR-3.0
Commit: 9a03ea93937af047e8ad13c2e3e3559520abfb0a
Parents: 24aa3c3 36a72d9
Author: randgalt <ra...@apache.org>
Authored: Sun Jan 17 17:40:52 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jan 17 17:40:52 2016 -0500
----------------------------------------------------------------------
.../org/apache/curator/ConnectionState.java | 4 ++
.../apache/curator/CuratorZookeeperClient.java | 2 +
.../apache/curator/SessionFailRetryLoop.java | 2 +
.../ClassicConnectionHandlingPolicy.java | 2 +
.../StandardConnectionHandlingPolicy.java | 2 +
.../exhibitor/ExhibitorEnsembleProvider.java | 2 +
.../org/apache/curator/utils/ThreadUtils.java | 23 ++++++++++
.../src/main/java/locking/LockingExample.java | 7 ++-
.../curator/framework/imps/Backgrounding.java | 2 +
.../framework/imps/CreateBuilderImpl.java | 3 ++
.../framework/imps/CuratorFrameworkImpl.java | 46 +++++++++++++-------
.../framework/imps/DeleteBuilderImpl.java | 2 +
.../framework/imps/FailedOperationManager.java | 2 +
.../FindAndDeleteProtectedNodeInBackground.java | 3 ++
.../framework/imps/GetDataBuilderImpl.java | 2 +
.../curator/framework/imps/NamespaceImpl.java | 2 +
.../framework/imps/NamespaceWatcher.java | 2 +
.../framework/imps/OperationAndData.java | 13 ++++--
.../framework/listen/ListenerContainer.java | 2 +
.../framework/state/ConnectionStateManager.java | 14 +++---
.../recipes/AfterConnectionEstablished.java | 1 +
.../framework/recipes/cache/NodeCache.java | 4 ++
.../recipes/cache/PathChildrenCache.java | 4 ++
.../framework/recipes/cache/TreeCache.java | 7 +++
.../framework/recipes/leader/LeaderLatch.java | 5 +++
.../recipes/leader/LeaderSelector.java | 10 ++---
.../framework/recipes/locks/ChildReaper.java | 1 +
.../recipes/locks/InterProcessMultiLock.java | 4 ++
.../recipes/locks/InterProcessSemaphore.java | 4 ++
.../recipes/locks/InterProcessSemaphoreV2.java | 2 +
.../framework/recipes/locks/LockInternals.java | 2 +
.../curator/framework/recipes/locks/Reaper.java | 1 +
.../framework/recipes/nodes/GroupMember.java | 3 ++
.../recipes/nodes/PersistentEphemeralNode.java | 3 ++
.../recipes/queue/DistributedQueue.java | 43 ++++++++++--------
.../framework/recipes/queue/QueueSharder.java | 16 ++++---
.../framework/recipes/shared/SharedValue.java | 2 +
...estResetConnectionWithBackgroundFailure.java | 37 +++++++---------
.../curator/test/TestingZooKeeperMain.java | 24 ++++++++--
.../entity/JsonServiceInstanceMarshaller.java | 3 ++
.../entity/JsonServiceInstancesMarshaller.java | 2 +
.../server/rest/DiscoveryResource.java | 6 +++
.../discovery/server/rest/InstanceCleanup.java | 2 +
.../discovery/details/ServiceDiscoveryImpl.java | 3 ++
.../x/rpc/idl/discovery/DiscoveryService.java | 8 ++++
.../idl/discovery/DiscoveryServiceLowLevel.java | 7 +++
.../idl/services/CuratorProjectionService.java | 25 +++++++++++
47 files changed, 286 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --cc curator-client/src/main/java/org/apache/curator/ConnectionState.java
index 0b21643,dc6ac53..d79ec58
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@@ -18,11 -18,11 +18,12 @@@
*/
package org.apache.curator;
-import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.connection.ConnectionHandlingPolicy;
import org.apache.curator.drivers.TracerDriver;
import org.apache.curator.ensemble.EnsembleProvider;
+import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.DebugUtils;
+ import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-client/src/main/java/org/apache/curator/SessionFailRetryLoop.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --cc curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
index f620ffb,0000000..fe24b42
mode 100644,000000..100644
--- a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
+++ b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
@@@ -1,86 -1,0 +1,88 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.connection;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.RetryLoop;
++import org.apache.curator.utils.ThreadUtils;
+import java.util.concurrent.Callable;
+
+/**
+ * Emulates the pre 3.0.0 Curator connection handling
+ */
+public class ClassicConnectionHandlingPolicy implements ConnectionHandlingPolicy
+{
+ @Override
+ public int getSimulatedSessionExpirationPercent()
+ {
+ return 0;
+ }
+
+ @Override
+ public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
+ {
+ T result = null;
+ RetryLoop retryLoop = client.newRetryLoop();
+ while ( retryLoop.shouldContinue() )
+ {
+ try
+ {
+ client.internalBlockUntilConnectedOrTimedOut();
+ result = proc.call();
+ retryLoop.markComplete();
+ }
+ catch ( Exception e )
+ {
++ ThreadUtils.checkInterrupted(e);
+ retryLoop.takeException(e);
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public CheckTimeoutsResult checkTimeouts(Callable<String> hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
+ {
+ CheckTimeoutsResult result = CheckTimeoutsResult.NOP;
+ int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
+ long elapsed = System.currentTimeMillis() - connectionStartMs;
+ if ( elapsed >= minTimeout )
+ {
+ if ( hasNewConnectionString.call() != null )
+ {
+ result = CheckTimeoutsResult.NEW_CONNECTION_STRING;
+ }
+ else
+ {
+ int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs);
+ if ( elapsed > maxTimeout )
+ {
+ result = CheckTimeoutsResult.RESET_CONNECTION;
+ }
+ else
+ {
+ result = CheckTimeoutsResult.CONNECTION_TIMEOUT;
+ }
+ }
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --cc curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
index 6995815,0000000..8f7a438
mode 100644,000000..100644
--- a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
+++ b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
@@@ -1,87 -1,0 +1,89 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.connection;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.RetryLoop;
++import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.Callable;
+
+/**
+ * Curator's standard connection handling since 3.0.0
+ *
+ * @since 3.0.0
+ */
+public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolicy
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final int expirationPercent;
+
+ public StandardConnectionHandlingPolicy()
+ {
+ this(100);
+ }
+
+ public StandardConnectionHandlingPolicy(int expirationPercent)
+ {
+ Preconditions.checkArgument((expirationPercent > 0) && (expirationPercent <= 100), "expirationPercent must be > 0 and <= 100");
+ this.expirationPercent = expirationPercent;
+ }
+
+ @Override
+ public int getSimulatedSessionExpirationPercent()
+ {
+ return expirationPercent;
+ }
+
+ @Override
+ public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
+ {
+ client.internalBlockUntilConnectedOrTimedOut();
+
+ T result = null;
+ RetryLoop retryLoop = client.newRetryLoop();
+ while ( retryLoop.shouldContinue() )
+ {
+ try
+ {
+ result = proc.call();
+ retryLoop.markComplete();
+ }
+ catch ( Exception e )
+ {
++ ThreadUtils.checkInterrupted(e);
+ retryLoop.takeException(e);
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public CheckTimeoutsResult checkTimeouts(Callable<String> hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
+ {
+ if ( hasNewConnectionString.call() != null )
+ {
+ return CheckTimeoutsResult.NEW_CONNECTION_STRING;
+ }
+ return CheckTimeoutsResult.NOP;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index 5622508,eeb057d..ada4bae
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@@ -25,8 -25,10 +25,9 @@@ import com.google.common.collect.Iterab
import org.apache.curator.RetryLoop;
import org.apache.curator.TimeTrace;
import org.apache.curator.framework.api.*;
-import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.framework.api.transaction.OperationType;
import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
+ import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 10bed18,c3247a1..ab72308
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@@ -27,10 -27,11 +27,11 @@@ import org.apache.curator.framework.api
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.DeleteBuilder;
+import org.apache.curator.framework.api.DeleteBuilderMain;
import org.apache.curator.framework.api.Pathable;
-import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.framework.api.transaction.OperationType;
import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder;
+ import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
index 405561b,0000000..c09e2ec
mode 100644,000000..100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
@@@ -1,68 -1,0 +1,70 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
++import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+abstract class FailedOperationManager<T>
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ protected final CuratorFramework client;
+
+ @VisibleForTesting
+ volatile FailedOperationManagerListener<T> debugListener = null;
+
+ interface FailedOperationManagerListener<T>
+ {
+ public void pathAddedForGuaranteedOperation(T detail);
+ }
+
+ FailedOperationManager(CuratorFramework client)
+ {
+ this.client = client;
+ }
+
+ void addFailedOperation(T details)
+ {
+ if ( debugListener != null )
+ {
+ debugListener.pathAddedForGuaranteedOperation(details);
+ }
+
+
+ if ( client.getState() == CuratorFrameworkState.STARTED )
+ {
+ log.debug("Details being added to guaranteed operation set: " + details);
+ try
+ {
+ executeGuaranteedOperationInBackground(details);
+ }
+ catch ( Exception e )
+ {
++ ThreadUtils.checkInterrupted(e);
+ addFailedOperation(details);
+ }
+ }
+ }
+
+ protected abstract void executeGuaranteedOperationInBackground(T details) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index b46cddb,279eece..5f7b985
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@@ -38,9 -38,8 +38,9 @@@ class OperationAndData<T> implements De
private final ErrorCallback<T> errorCallback;
private final AtomicInteger retryCount = new AtomicInteger(0);
private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
- private final long ordinal = nextOrdinal.getAndIncrement();
+ private final AtomicLong ordinal = new AtomicLong();
private final Object context;
+ private final boolean connectionRequired;
interface ErrorCallback<T>
{
@@@ -54,14 -53,15 +54,21 @@@
this.callback = callback;
this.errorCallback = errorCallback;
this.context = context;
+ this.connectionRequired = connectionRequired;
- }
+ reset();
+ }
+
+ void reset()
+ {
+ retryCount.set(0);
+ ordinal.set(nextOrdinal.getAndIncrement());
+ }
+ OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
+ {
+ this(operation, data, callback, errorCallback, context, true);
+ }
+
Object getContext()
{
return context;
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index b6f2e02,8cc37aa..cbb8d16
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@@ -246,78 -238,36 +246,80 @@@ public class ConnectionStateManager imp
private void processEvents()
{
- try
+ while ( state.get() == State.STARTED )
{
- while ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED) )
+ try
{
- final ConnectionState newState = eventQueue.take();
-
- if ( listeners.size() == 0 )
+ int lastNegotiatedSessionTimeoutMs = client.getZookeeperClient().getLastNegotiatedSessionTimeoutMs();
+ int useSessionTimeoutMs = (lastNegotiatedSessionTimeoutMs > 0) ? lastNegotiatedSessionTimeoutMs : sessionTimeoutMs;
+ int pollMaxMs = (useSessionTimeoutMs * 2) / 3; // 2/3 of session timeout
+ final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
+ if ( newState != null )
{
- log.warn("There are no ConnectionStateListeners registered.");
- }
+ if ( listeners.size() == 0 )
+ {
+ log.warn("There are no ConnectionStateListeners registered.");
+ }
- listeners.forEach
- (
- new Function<ConnectionStateListener, Void>()
- {
- @Override
- public Void apply(ConnectionStateListener listener)
+ listeners.forEach
+ (
+ new Function<ConnectionStateListener, Void>()
{
- listener.stateChanged(client, newState);
- return null;
+ @Override
+ public Void apply(ConnectionStateListener listener)
+ {
+ listener.stateChanged(client, newState);
+ return null;
+ }
}
- }
- );
+ );
+ }
+ else if ( sessionExpirationPercent > 0 )
+ {
+ synchronized(this)
+ {
+ checkSessionExpiration();
+ }
+ }
}
- }
- catch ( InterruptedException e )
- {
- Thread.currentThread().interrupt();
+ catch ( InterruptedException e )
+ {
+ // swallow the interrupt as it's only possible from either a background
+ // operation and, thus, doesn't apply to this loop or the instance
+ // is being closed in which case the while test will get it
+ }
}
}
+
+ private void checkSessionExpiration()
+ {
+ if ( (currentConnectionState == ConnectionState.SUSPENDED) && (startOfSuspendedEpoch != 0) )
+ {
+ long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch;
+ int lastNegotiatedSessionTimeoutMs = client.getZookeeperClient().getLastNegotiatedSessionTimeoutMs();
+ int useSessionTimeoutMs = (lastNegotiatedSessionTimeoutMs > 0) ? lastNegotiatedSessionTimeoutMs : sessionTimeoutMs;
+ useSessionTimeoutMs = (useSessionTimeoutMs * sessionExpirationPercent) / 100;
+ if ( elapsedMs >= useSessionTimeoutMs )
+ {
+ log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d", elapsedMs, useSessionTimeoutMs));
+ try
+ {
+ // LOL - this method was proposed by me (JZ) in 2013 for totally unrelated reasons
+ // it got added to ZK 3.5 and now does exactly what we need
+ // https://issues.apache.org/jira/browse/ZOOKEEPER-1730
+ client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not inject session expiration", e);
+ }
+ }
+ }
+ }
+
+ private void setCurrentConnectionState(ConnectionState newConnectionState)
+ {
+ currentConnectionState = newConnectionState;
+ startOfSuspendedEpoch = (currentConnectionState == ConnectionState.SUSPENDED) ? System.currentTimeMillis() : 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 3bf2ec3,8524075..36dbff4
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@@ -30,8 -31,7 +30,9 @@@ import org.apache.curator.framework.imp
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.PathUtils;
+ import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
index 4b0da11,f712945..dc2f681
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
@@@ -28,7 -27,9 +28,8 @@@ import org.apache.curator.framework.Wat
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.utils.PathUtils;
+ import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 881cde7,cc1159a..c5e9cc2
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@@ -329,10 -326,9 +330,11 @@@ public class PersistentEphemeralNode im
}
catch ( Exception e )
{
+ ThreadUtils.checkInterrupted(e);
throw new IOException(e);
}
+
+ client.removeWatchers();
}
/**
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
----------------------------------------------------------------------
diff --cc curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
index b2389e9,7487557..19535a6
--- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
@@@ -27,35 -23,41 +27,53 @@@ import org.apache.zookeeper.server.Serv
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.management.JMException;
import java.io.IOException;
import java.lang.reflect.Field;
- import java.lang.reflect.Modifier;
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-public class TestingZooKeeperMain extends ZooKeeperServerMain implements ZooKeeperMainFace
+public class TestingZooKeeperMain implements ZooKeeperMainFace
{
+ private static final Logger log = LoggerFactory.getLogger(TestingZooKeeperMain.class);
+
private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicReference<Exception> startingException = new AtomicReference<Exception>(null);
- private static final int MAX_WAIT_MS;
+ private volatile ServerCnxnFactory cnxnFactory;
+ private volatile TestZooKeeperServer zkServer;
+ private volatile ContainerManager containerManager;
+
+ private static final Timing timing = new Timing();
- static final int MAX_WAIT_MS = timing.milliseconds();
++ static final int MAX_WAIT_MS;
+ static
+ {
+ long startMs = System.currentTimeMillis();
+ try
+ {
+ // this can take forever and fails tests - ZK calls it internally so there's nothing we can do
+ // pre flight it and use it to calculate max wait
+ //noinspection ResultOfMethodCallIgnored
+ InetAddress.getLocalHost().getCanonicalHostName();
+ }
+ catch ( UnknownHostException e )
+ {
+ // ignore
+ }
+ long elapsed = System.currentTimeMillis() - startMs;
+ MAX_WAIT_MS = Math.max((int)elapsed * 2, 1000);
+ }
@Override
public void kill()
@@@ -195,95 -169,39 +213,95 @@@
{
e.printStackTrace(); // just ignore - this class is only for testing
}
+ finally
+ {
+ zkServer = null;
+ }
}
- private ServerCnxnFactory getServerConnectionFactory() throws Exception
+ // copied from ZooKeeperServerMain.java
+ private void internalRunFromConfig(ServerConfig config) throws IOException
{
- Field cnxnFactoryField = ZooKeeperServerMain.class.getDeclaredField("cnxnFactory");
- cnxnFactoryField.setAccessible(true);
- ServerCnxnFactory cnxnFactory;
+ log.info("Starting server");
+ FileTxnSnapLog txnLog = null;
+ try {
+ // Note that this thread isn't going to be doing anything else,
+ // so rather than spawning another thread, we will just call
+ // run() in this thread.
+ // create a file logger url from the command line args
+ txnLog = new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir());
+ zkServer = new TestZooKeeperServer(txnLog, config);
- // Wait until the cnxnFactory field is non-null or up to 1s, whichever comes first.
- long startTime = System.currentTimeMillis();
- do
- {
- cnxnFactory = (ServerCnxnFactory)cnxnFactoryField.get(this);
+ try
+ {
+ cnxnFactory = ServerCnxnFactory.createFactory();
+ cnxnFactory.configure(config.getClientPortAddress(),
+ config.getMaxClientCnxns());
+ }
+ catch ( IOException e )
+ {
+ log.info("Could not server. Waiting and trying one more time.", e);
+ timing.sleepABit();
+ cnxnFactory = ServerCnxnFactory.createFactory();
+ cnxnFactory.configure(config.getClientPortAddress(),
+ config.getMaxClientCnxns());
+ }
+ cnxnFactory.startup(zkServer);
- containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.getFirstProcessor(), Integer.getInteger("znode.container.checkIntervalMs", (int)TimeUnit.MINUTES.toMillis(1L)).intValue(), Integer.getInteger("znode.container.maxPerMinute", 10000).intValue());
++ containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.getFirstProcessor(), Integer.getInteger("znode.container.checkIntervalMs", (int)TimeUnit.MINUTES.toMillis(1L)), Integer.getInteger("znode.container.maxPerMinute", 10000));
+ containerManager.start();
+ latch.countDown();
+ cnxnFactory.join();
+ if ( (zkServer != null) && zkServer.isRunning()) {
+ zkServer.shutdown();
+ }
+ } catch (InterruptedException e) {
+ // warn, but generally this is ok
+ Thread.currentThread().interrupt();
+ log.warn("Server interrupted", e);
+ } finally {
+ if (txnLog != null) {
+ txnLog.close();
+ }
}
- while ( (cnxnFactory == null) && ((System.currentTimeMillis() - startTime) < MAX_WAIT_MS) );
-
- return cnxnFactory;
}
- private ZooKeeperServer getZooKeeperServer(ServerCnxnFactory cnxnFactory) throws Exception
+ public static class TestZooKeeperServer extends ZooKeeperServer
{
- Field zkServerField = ServerCnxnFactory.class.getDeclaredField("zkServer");
- zkServerField.setAccessible(true);
- ZooKeeperServer zkServer;
+ public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config)
+ {
+ super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), null);
+ }
- // Wait until the zkServer field is non-null or up to 1s, whichever comes first.
- long startTime = System.currentTimeMillis();
- do
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
+ public RequestProcessor getFirstProcessor()
+ {
+ return firstProcessor;
+ }
+
+ protected void registerJMX()
+ {
+ // NOP
+ }
+
+ @Override
+ protected void unregisterJMX()
{
- zkServer = (ZooKeeperServer)zkServerField.get(cnxnFactory);
+ // NOP
}
- while ( (zkServer == null) && ((System.currentTimeMillis() - startTime) < MAX_WAIT_MS) );
- return zkServer;
+ @Override
+ public boolean isRunning()
+ {
+ return isRunning.get() || super.isRunning();
+ }
+
+ public void noteStartup()
+ {
+ synchronized (this) {
+ isRunning.set(true);
+ this.notifyAll();
+ }
+ }
}
}