You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2014/08/18 23:00:15 UTC
svn commit: r1618732 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/
bookkeeper-server/src/test/java/org/apache/bookkeeper/test/
Author: ivank
Date: Mon Aug 18 21:00:15 2014
New Revision: 1618732
URL: http://svn.apache.org/r1618732
Log:
BOOKKEEPER-704: reconnectable zookeeper client wrapper (sijie via ivank)
Change-Id: I00c73788f4ed5911713906b4d7622ca6dcec79a5
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/BoundExponentialBackoffRetryPolicy.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/RetryPolicy.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooWorker.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1618732&r1=1618731&r2=1618732&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Aug 18 21:00:15 2014
@@ -212,6 +212,8 @@ Trunk (unreleased changes)
BOOKKEEPER-774: Flaky test org.apache.bookkeeper.test.ReadOnlyBookieTest.testBookieShouldTurnWritableFromReadOnly (sijie)
+ BOOKKEEPER-704: reconnectable zookeeper client wrapper (sijie via ivank)
+
bookkeeper-benchmark:
BOOKKEEPER-768: fix typo 'seconds' to milliseconds in benchmark output (jialin via sijie)
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/BoundExponentialBackoffRetryPolicy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/BoundExponentialBackoffRetryPolicy.java?rev=1618732&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/BoundExponentialBackoffRetryPolicy.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/BoundExponentialBackoffRetryPolicy.java Mon Aug 18 21:00:15 2014
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.zookeeper;
+
+/**
+ * Retry policy that retries a set number of times with an increasing (up to a
+ * maximum bound) backoff time between retries.
+ */
+public class BoundExponentialBackoffRetryPolicy extends ExponentialBackoffRetryPolicy {
+
+ private final long maxBackoffTime;
+
+ public BoundExponentialBackoffRetryPolicy(long baseBackoffTime, long maxBackoffTime, int maxRetries) {
+ super(baseBackoffTime, maxRetries);
+ this.maxBackoffTime = maxBackoffTime;
+ }
+
+ @Override
+ public long nextRetryWaitTime(int retryCount, long elapsedRetryTime) {
+ return Math.min(maxBackoffTime, super.nextRetryWaitTime(retryCount, elapsedRetryTime));
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java?rev=1618732&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java Mon Aug 18 21:00:15 2014
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import java.util.Random;
+
+public class ExponentialBackoffRetryPolicy implements RetryPolicy {
+
+ private final Random random;
+ private final int maxRetries;
+ private final long baseBackoffTime;
+
+ public ExponentialBackoffRetryPolicy(long baseBackoffTime, int maxRetries) {
+ this.maxRetries = maxRetries;
+ this.baseBackoffTime = baseBackoffTime;
+ this.random = new Random(System.currentTimeMillis());
+ }
+
+ @Override
+ public boolean allowRetry(int retryCount, long elapsedRetryTime) {
+ return retryCount <= maxRetries;
+ }
+
+ @Override
+ public long nextRetryWaitTime(int retryCount, long elapsedRetryTime) {
+ return baseBackoffTime * Math.max(1, random.nextInt(1 << (retryCount + 1)));
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/RetryPolicy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/RetryPolicy.java?rev=1618732&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/RetryPolicy.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/RetryPolicy.java Mon Aug 18 21:00:15 2014
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.zookeeper;
+
+/**
+ * Interface of the policy to use when retrying operations.
+ */
+public interface RetryPolicy {
+
+ /**
+ * Called when retrying an operation failed for some reason. Return true if
+ * another attempt is allowed to make.
+ *
+ * @param retryCount
+ * The number of times retried so far (1 for the first time).
+ * @param elapsedRetryTime
+ * The elapsed time since the operation attempted. (in
+ * milliseconds)
+ * @return true if anther attempt is allowed to make. otherwise, false.
+ */
+ public boolean allowRetry(int retryCount, long elapsedRetryTime);
+
+ /**
+ * Called before making an attempt to retry a failed operation. Return 0 if
+ * an attempt needs to be made immediately.
+ *
+ * @param retryCount
+ * The number of times retried so far (0 for the first time).
+ * @param elapsedRetryTime
+ * The elapsed time since the operation attempted. (in
+ * milliseconds)
+ * @return the elapsed time that the attempt needs to wait before retrying.
+ * (in milliseconds)
+ */
+ public long nextRetryWaitTime(int retryCount, long elapsedRetryTime);
+
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java?rev=1618732&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java Mon Aug 18 21:00:15 2014
@@ -0,0 +1,1046 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.zookeeper.ZooWorker.ZooCallable;
+import org.apache.zookeeper.AsyncCallback.ACLCallback;
+import org.apache.zookeeper.AsyncCallback.Children2Callback;
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.Transaction;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide a zookeeper client to handle session expire
+ */
+public class ZooKeeperClient extends ZooKeeper implements Watcher {
+
+ final static Logger logger = LoggerFactory.getLogger(ZooKeeperClient.class);
+
+ // ZooKeeper client connection variables
+ private final String connectString;
+ private final int sessionTimeoutMs;
+
+ // state for the zookeeper client
+ private final AtomicReference<ZooKeeper> zk = new AtomicReference<ZooKeeper>();
+ private final ZooKeeperWatcherBase watcherManager;
+
+ private final ScheduledExecutorService retryExecutor;
+ private final ExecutorService connectExecutor;
+
+ // retry polices
+ private final RetryPolicy connectRetryPolicy;
+ private final RetryPolicy operationRetryPolicy;
+
+ private final Callable<ZooKeeper> clientCreator = new Callable<ZooKeeper>() {
+
+ @Override
+ public ZooKeeper call() throws Exception {
+ try {
+ return ZooWorker.syncCallWithRetries(null, new ZooCallable<ZooKeeper>() {
+
+ @Override
+ public ZooKeeper call() throws KeeperException, InterruptedException {
+ logger.info("Reconnecting zookeeper {}.", connectString);
+ ZooKeeper newZk;
+ try {
+ newZk = createZooKeeper();
+ } catch (IOException ie) {
+ logger.error("Failed to create zookeeper instance to " + connectString, ie);
+ throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+ }
+ // close the previous one
+ closeZkHandle();
+ zk.set(newZk);
+ if (logger.isDebugEnabled()) {
+ logger.debug("ZooKeeper session {} is created to {}.",
+ Long.toHexString(newZk.getSessionId()), connectString);
+ }
+ return newZk;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("ZooKeeper Client Creator (%s)", connectString);
+ }
+
+ }, connectRetryPolicy);
+ } catch (Exception e) {
+ logger.error("Gave up reconnecting to ZooKeeper : ", e);
+ Runtime.getRuntime().exit(-1);
+ return null;
+ }
+ }
+
+ };
+
+ public static ZooKeeper createConnectedZooKeeper(String connectString, int sessionTimeoutMs)
+ throws KeeperException, InterruptedException, IOException {
+ ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(sessionTimeoutMs);
+ ZooKeeper zk = new ZooKeeper(connectString, sessionTimeoutMs, watcher);
+ try {
+ watcher.waitForConnection();
+ } catch (KeeperException ke) {
+ zk.close();
+ throw ke;
+ } catch (InterruptedException ie) {
+ zk.close();
+ throw ie;
+ }
+ return zk;
+ }
+
+ public static ZooKeeperClient createConnectedZooKeeperClient(String connectString, int sessionTimeoutMs)
+ throws KeeperException, InterruptedException, IOException {
+ ZooKeeperWatcherBase watcherManager = new ZooKeeperWatcherBase(sessionTimeoutMs);
+ ZooKeeperClient client = new ZooKeeperClient(connectString, sessionTimeoutMs, watcherManager,
+ new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0));
+ try {
+ watcherManager.waitForConnection();
+ } catch (KeeperException ke) {
+ client.close();
+ throw ke;
+ } catch (InterruptedException ie) {
+ client.close();
+ throw ie;
+ }
+ return client;
+ }
+
+ public static ZooKeeperClient createConnectedZooKeeperClient(
+ String connectString, int sessionTimeoutMs, RetryPolicy operationRetryPolicy)
+ throws KeeperException, InterruptedException, IOException {
+ ZooKeeperWatcherBase watcherManager = new ZooKeeperWatcherBase(sessionTimeoutMs);
+ ZooKeeperClient client = new ZooKeeperClient(connectString, sessionTimeoutMs, watcherManager,
+ operationRetryPolicy);
+ try {
+ watcherManager.waitForConnection();
+ } catch (KeeperException ke) {
+ client.close();
+ throw ke;
+ } catch (InterruptedException ie) {
+ client.close();
+ throw ie;
+ }
+ return client;
+ }
+
+ public static ZooKeeperClient createConnectedZooKeeperClient(
+ String connectString, int sessionTimeoutMs, Set<Watcher> childWatchers,
+ RetryPolicy operationRetryPolicy)
+ throws KeeperException, InterruptedException, IOException {
+ ZooKeeperWatcherBase watcherManager =
+ new ZooKeeperWatcherBase(sessionTimeoutMs, childWatchers);
+ ZooKeeperClient client = new ZooKeeperClient(connectString, sessionTimeoutMs, watcherManager,
+ operationRetryPolicy);
+ try {
+ watcherManager.waitForConnection();
+ } catch (KeeperException ke) {
+ client.close();
+ throw ke;
+ } catch (InterruptedException ie) {
+ client.close();
+ throw ie;
+ }
+ return client;
+ }
+
+ ZooKeeperClient(String connectString, int sessionTimeoutMs, ZooKeeperWatcherBase watcherManager,
+ RetryPolicy operationRetryPolicy) throws IOException {
+ this(connectString, sessionTimeoutMs, watcherManager,
+ new BoundExponentialBackoffRetryPolicy(6000, 60000, Integer.MAX_VALUE),
+ operationRetryPolicy);
+ }
+
+ private ZooKeeperClient(String connectString, int sessionTimeoutMs,
+ ZooKeeperWatcherBase watcherManager,
+ RetryPolicy connectRetryPolicy, RetryPolicy operationRetryPolicy) throws IOException {
+ super(connectString, sessionTimeoutMs, watcherManager);
+ this.connectString = connectString;
+ this.sessionTimeoutMs = sessionTimeoutMs;
+ this.watcherManager = watcherManager;
+ this.connectRetryPolicy = connectRetryPolicy;
+ this.operationRetryPolicy = operationRetryPolicy;
+ this.retryExecutor =
+ Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
+ this.connectExecutor =
+ Executors.newSingleThreadExecutor();
+ // added itself to the watcher
+ watcherManager.addChildWatcher(this);
+ }
+
+ @Override
+ public void close() throws InterruptedException {
+ connectExecutor.shutdown();
+ retryExecutor.shutdown();
+ closeZkHandle();
+ }
+
+ private void closeZkHandle() throws InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ super.close();
+ } else {
+ zkHandle.close();
+ }
+ }
+
+ protected void waitForConnection() throws KeeperException, InterruptedException {
+ watcherManager.waitForConnection();
+ }
+
+ protected ZooKeeper createZooKeeper() throws IOException {
+ return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager);
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getType() == EventType.None &&
+ event.getState() == KeeperState.Expired) {
+ onExpired();
+ }
+ }
+
+ private void onExpired() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ZooKeeper session {} is expired from {}.",
+ Long.toHexString(getSessionId()), connectString);
+ }
+ try {
+ connectExecutor.submit(clientCreator);
+ } catch (RejectedExecutionException ree) {
+ logger.error("ZooKeeper reconnect task is rejected : ", ree);
+ }
+ }
+
+
+ static abstract class RetryRunnable implements Runnable {
+
+ final ZooWorker worker;
+ final Runnable that;
+
+ RetryRunnable(RetryPolicy retryPolicy) {
+ worker = new ZooWorker(retryPolicy);
+ that = this;
+ }
+
+ }
+
+ // inherits from ZooKeeper client for all operations
+
+ @Override
+ public long getSessionId() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return super.getSessionId();
+ }
+ return zkHandle.getSessionId();
+ }
+
+ @Override
+ public byte[] getSessionPasswd() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return super.getSessionPasswd();
+ }
+ return zkHandle.getSessionPasswd();
+ }
+
+ @Override
+ public int getSessionTimeout() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return super.getSessionTimeout();
+ }
+ return zkHandle.getSessionTimeout();
+ }
+
+ @Override
+ public void addAuthInfo(String scheme, byte[] auth) {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ super.addAuthInfo(scheme, auth);
+ return;
+ }
+ zkHandle.addAuthInfo(scheme, auth);
+ }
+
+ @Override
+ public synchronized void register(Watcher watcher) {
+ watcherManager.addChildWatcher(watcher);
+ }
+
+ @Override
+ public List<OpResult> multi(final Iterable<Op> ops) throws InterruptedException, KeeperException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<OpResult>>() {
+
+ @Override
+ public List<OpResult> call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.multi(ops);
+ }
+ return zkHandle.multi(ops);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ @Deprecated
+ public Transaction transaction() {
+ // since there is no reference about which client that the transaction could use
+ // so just use ZooKeeper instance directly.
+ // you'd better to use {@link #multi}.
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return super.transaction();
+ }
+ return zkHandle.transaction();
+ }
+
+ @Override
+ public List<ACL> getACL(final String path, final Stat stat) throws KeeperException, InterruptedException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<ACL>>() {
+
+ @Override
+ public List<ACL> call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.getACL(path, stat);
+ }
+ return zkHandle.getACL(path, stat);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public void getACL(final String path, final Stat stat, final ACLCallback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final ACLCallback aclCb = new ACLCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context, acl, stat);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.getACL(path, stat, aclCb, worker);
+ } else {
+ zkHandle.getACL(path, stat, aclCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+ @Override
+ public Stat setACL(final String path, final List<ACL> acl, final int version)
+ throws KeeperException, InterruptedException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
+
+ @Override
+ public Stat call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.setACL(path, acl, version);
+ }
+ return zkHandle.setACL(path, acl, version);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public void setACL(final String path, final List<ACL> acl, final int version,
+ final StatCallback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final StatCallback stCb = new StatCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context, stat);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.setACL(path, acl, version, stCb, worker);
+ } else {
+ zkHandle.setACL(path, acl, version, stCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+ @Override
+ public void sync(final String path, final VoidCallback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final VoidCallback vCb = new VoidCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.sync(path, vCb, worker);
+ } else {
+ zkHandle.sync(path, vCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+ @Override
+ public States getState() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.getState();
+ } else {
+ return zkHandle.getState();
+ }
+ }
+
+ @Override
+ public String toString() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.toString();
+ } else {
+ return zkHandle.toString();
+ }
+ }
+
+ @Override
+ public String create(final String path, final byte[] data,
+ final List<ACL> acl, final CreateMode createMode)
+ throws KeeperException, InterruptedException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<String>() {
+
+ @Override
+ public String call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.create(path, data, acl, createMode);
+ }
+ return zkHandle.create(path, data, acl, createMode);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public void create(final String path, final byte[] data, final List<ACL> acl,
+ final CreateMode createMode, final StringCallback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final StringCallback createCb = new StringCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context, name);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.create(path, data, acl, createMode, createCb, worker);
+ } else {
+ zkHandle.create(path, data, acl, createMode, createCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+ @Override
+ public void delete(final String path, final int version) throws KeeperException, InterruptedException {
+ ZooWorker.syncCallWithRetries(this, new ZooCallable<Void>() {
+
+ @Override
+ public Void call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.delete(path, version);
+ } else {
+ zkHandle.delete(path, version);
+ }
+ return null;
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public void delete(final String path, final int version, final VoidCallback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final VoidCallback deleteCb = new VoidCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.delete(path, version, deleteCb, worker);
+ } else {
+ zkHandle.delete(path, version, deleteCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+ @Override
+ public Stat exists(final String path, final Watcher watcher) throws KeeperException, InterruptedException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
+
+ @Override
+ public Stat call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.exists(path, watcher);
+ }
+ return zkHandle.exists(path, watcher);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public Stat exists(final String path, final boolean watch) throws KeeperException, InterruptedException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
+
+ @Override
+ public Stat call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.exists(path, watch);
+ }
+ return zkHandle.exists(path, watch);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public void exists(final String path, final Watcher watcher, final StatCallback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final StatCallback stCb = new StatCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context, stat);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.exists(path, watcher, stCb, worker);
+ } else {
+ zkHandle.exists(path, watcher, stCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+ @Override
+ public void exists(final String path, final boolean watch, final StatCallback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final StatCallback stCb = new StatCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context, stat);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.exists(path, watch, stCb, worker);
+ } else {
+ zkHandle.exists(path, watch, stCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+ @Override
+ public byte[] getData(final String path, final Watcher watcher, final Stat stat)
+ throws KeeperException, InterruptedException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<byte[]>() {
+
+ @Override
+ public byte[] call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.getData(path, watcher, stat);
+ }
+ return zkHandle.getData(path, watcher, stat);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public byte[] getData(final String path, final boolean watch, final Stat stat)
+ throws KeeperException, InterruptedException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<byte[]>() {
+
+ @Override
+ public byte[] call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.getData(path, watch, stat);
+ }
+ return zkHandle.getData(path, watch, stat);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final DataCallback dataCb = new DataCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context, data, stat);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.getData(path, watcher, dataCb, worker);
+ } else {
+ zkHandle.getData(path, watcher, dataCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+ @Override
+ public void getData(final String path, final boolean watch, final DataCallback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final DataCallback dataCb = new DataCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context, data, stat);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.getData(path, watch, dataCb, worker);
+ } else {
+ zkHandle.getData(path, watch, dataCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+ @Override
+ public Stat setData(final String path, final byte[] data, final int version)
+ throws KeeperException, InterruptedException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
+
+ @Override
+ public Stat call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.setData(path, data, version);
+ }
+ return zkHandle.setData(path, data, version);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public void setData(final String path, final byte[] data, final int version,
+ final StatCallback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final StatCallback stCb = new StatCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context, stat);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.setData(path, data, version, stCb, worker);
+ } else {
+ zkHandle.setData(path, data, version, stCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+ @Override
+ public List<String> getChildren(final String path, final Watcher watcher, final Stat stat)
+ throws KeeperException, InterruptedException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() {
+
+ @Override
+ public List<String> call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.getChildren(path, watcher, stat);
+ }
+ return zkHandle.getChildren(path, watcher, stat);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public List<String> getChildren(final String path, final boolean watch, final Stat stat)
+ throws KeeperException, InterruptedException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() {
+
+ @Override
+ public List<String> call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.getChildren(path, watch, stat);
+ }
+ return zkHandle.getChildren(path, watch, stat);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public void getChildren(final String path, final Watcher watcher,
+ final Children2Callback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final Children2Callback childCb = new Children2Callback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx,
+ List<String> children, Stat stat) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context, children, stat);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.getChildren(path, watcher, childCb, worker);
+ } else {
+ zkHandle.getChildren(path, watcher, childCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+ @Override
+ public void getChildren(final String path, final boolean watch, final Children2Callback cb,
+ final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final Children2Callback childCb = new Children2Callback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx,
+ List<String> children, Stat stat) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context, children, stat);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.getChildren(path, watch, childCb, worker);
+ } else {
+ zkHandle.getChildren(path, watch, childCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+
+ @Override
+ public List<String> getChildren(final String path, final Watcher watcher)
+ throws KeeperException, InterruptedException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() {
+
+ @Override
+ public List<String> call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.getChildren(path, watcher);
+ }
+ return zkHandle.getChildren(path, watcher);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public List<String> getChildren(final String path, final boolean watch)
+ throws KeeperException, InterruptedException {
+ return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() {
+
+ @Override
+ public List<String> call() throws KeeperException, InterruptedException {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ return ZooKeeperClient.super.getChildren(path, watch);
+ }
+ return zkHandle.getChildren(path, watch);
+ }
+
+ }, operationRetryPolicy);
+ }
+
+ @Override
+ public void getChildren(final String path, final Watcher watcher,
+ final ChildrenCallback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final ChildrenCallback childCb = new ChildrenCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx,
+ List<String> children) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context, children);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.getChildren(path, watcher, childCb, worker);
+ } else {
+ zkHandle.getChildren(path, watcher, childCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+ @Override
+ public void getChildren(final String path, final boolean watch,
+ final ChildrenCallback cb, final Object context) {
+ final Runnable proc = new RetryRunnable(operationRetryPolicy) {
+
+ final ChildrenCallback childCb = new ChildrenCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx,
+ List<String> children) {
+ ZooWorker worker = (ZooWorker)ctx;
+ if (worker.allowRetry(rc)) {
+ retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS);
+ } else {
+ cb.processResult(rc, path, context, children);
+ }
+ }
+
+ };
+
+ @Override
+ public void run() {
+ ZooKeeper zkHandle = zk.get();
+ if (null == zkHandle) {
+ ZooKeeperClient.super.getChildren(path, watch, childCb, worker);
+ } else {
+ zkHandle.getChildren(path, watch, childCb, worker);
+ }
+ }
+ };
+ // execute it immediately
+ proc.run();
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java?rev=1618732&r1=1618731&r2=1618732&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java Mon Aug 18 21:00:15 2014
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.zookeeper;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -39,17 +41,36 @@ public class ZooKeeperWatcherBase implem
private final int zkSessionTimeOut;
private CountDownLatch clientConnectLatch = new CountDownLatch(1);
+ private final CopyOnWriteArraySet<Watcher> childWatchers =
+ new CopyOnWriteArraySet<Watcher>();
public ZooKeeperWatcherBase(int zkSessionTimeOut) {
this.zkSessionTimeOut = zkSessionTimeOut;
}
+ public ZooKeeperWatcherBase(int zkSessionTimeOut, Set<Watcher> childWatchers) {
+ this.zkSessionTimeOut = zkSessionTimeOut;
+ this.childWatchers.addAll(childWatchers);
+ }
+
+ public ZooKeeperWatcherBase addChildWatcher(Watcher watcher) {
+ this.childWatchers.add(watcher);
+ return this;
+ }
+
+ public ZooKeeperWatcherBase removeChildWatcher(Watcher watcher) {
+ this.childWatchers.remove(watcher);
+ return this;
+ }
+
@Override
public void process(WatchedEvent event) {
// If event type is NONE, this is a connection status change
if (event.getType() != EventType.None) {
LOG.debug("Recieved event: {}, path: {} from ZooKeeper server",
event.getType(), event.getPath());
+ // notify the child watchers
+ notifyEvent(event);
return;
}
@@ -60,16 +81,19 @@ public class ZooKeeperWatcherBase implem
clientConnectLatch.countDown();
break;
case Disconnected:
+ clientConnectLatch = new CountDownLatch(1);
LOG.debug("Ignoring Disconnected event from ZooKeeper server");
break;
case Expired:
- LOG.error("ZooKeeper client connection to the "
- + "ZooKeeper server has expired!");
+ clientConnectLatch = new CountDownLatch(1);
+ LOG.error("ZooKeeper client connection to the ZooKeeper server has expired!");
break;
default:
// do nothing
break;
}
+ // notify the child watchers
+ notifyEvent(event);
}
/**
@@ -80,8 +104,7 @@ public class ZooKeeperWatcherBase implem
* @throws InterruptedException
* interrupted while waiting for connection
*/
- public void waitForConnection() throws KeeperException,
- InterruptedException {
+ public void waitForConnection() throws KeeperException, InterruptedException {
if (!clientConnectLatch.await(zkSessionTimeOut, TimeUnit.MILLISECONDS)) {
throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
}
@@ -94,4 +117,17 @@ public class ZooKeeperWatcherBase implem
return zkSessionTimeOut;
}
+ /**
+ * Notify Event to child watchers.
+ *
+ * @param event
+ * Watched event received from ZooKeeper.
+ */
+ private void notifyEvent(WatchedEvent event) {
+ // notify child watchers
+ for (Watcher w : childWatchers) {
+ w.process(event);
+ }
+ }
+
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooWorker.java?rev=1618732&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooWorker.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooWorker.java Mon Aug 18 21:00:15 2014
@@ -0,0 +1,147 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide a mechanism to perform an operation on ZooKeeper that is safe on disconnections
+ * and recoverable errors.
+ */
+class ZooWorker {
+
+ static final Logger logger = LoggerFactory.getLogger(ZooWorker.class);
+
+ int attempts = 0;
+ long startTimeMs;
+ long elapsedTimeMs = 0L;
+ final RetryPolicy retryPolicy;
+
+ ZooWorker(RetryPolicy retryPolicy) {
+ this.retryPolicy = retryPolicy;
+ this.startTimeMs = MathUtils.now();
+ }
+
+ public boolean allowRetry(int rc) {
+ if (!ZooWorker.isRecoverableException(rc)) {
+ return false;
+ }
+ ++attempts;
+ elapsedTimeMs = MathUtils.now() - startTimeMs;
+ return retryPolicy.allowRetry(attempts, elapsedTimeMs);
+ }
+
+ public long nextRetryWaitTime() {
+ return retryPolicy.nextRetryWaitTime(attempts, elapsedTimeMs);
+ }
+
+ /**
+ * Check whether the given result code is recoverable by retry.
+ *
+ * @param rc result code
+ * @return true if given result code is recoverable.
+ */
+ public static boolean isRecoverableException(int rc) {
+ return KeeperException.Code.CONNECTIONLOSS.intValue() == rc ||
+ KeeperException.Code.OPERATIONTIMEOUT.intValue() == rc ||
+ KeeperException.Code.SESSIONMOVED.intValue() == rc ||
+ KeeperException.Code.SESSIONEXPIRED.intValue() == rc;
+ }
+
+ /**
+ * Check whether the given exception is recoverable by retry.
+ *
+ * @param exception given exception
+ * @return true if given exception is recoverable.
+ */
+ public static boolean isRecoverableException(KeeperException exception) {
+ return isRecoverableException(exception.code().intValue());
+ }
+
+ static interface ZooCallable<T> {
+ /**
+ * Be compatible with ZooKeeper interface.
+ *
+ * @return value
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public T call() throws InterruptedException, KeeperException;
+ }
+
+ /**
+ * Execute a sync zookeeper operation with a given retry policy.
+ *
+ * @param client
+ * ZooKeeper client.
+ * @param proc
+ * Synchronous zookeeper operation wrapped in a {@link Callable}.
+ * @param retryPolicy
+ * Retry policy to execute the synchronous operation.
+ * @return result of the zookeeper operation
+ * @throws KeeperException any non-recoverable exception or recoverable exception exhausted all retires.
+ * @throws InterruptedException the operation is interrupted.
+ */
+ public static<T> T syncCallWithRetries(
+ ZooKeeperClient client, ZooCallable<T> proc, RetryPolicy retryPolicy)
+ throws KeeperException, InterruptedException {
+ T result = null;
+ boolean isDone = false;
+ int attempts = 0;
+ long startTimeMs = MathUtils.now();
+ while (!isDone) {
+ try {
+ if (null != client) {
+ client.waitForConnection();
+ }
+ logger.debug("Execute {} at {} retry attempt.", proc, attempts);
+ result = proc.call();
+ isDone = true;
+ } catch (KeeperException e) {
+ ++attempts;
+ boolean rethrow = true;
+ long elapsedTime = MathUtils.now() - startTimeMs;
+ if (((null != client && isRecoverableException(e)) || null == client) &&
+ retryPolicy.allowRetry(attempts, elapsedTime)) {
+ rethrow = false;
+ }
+ if (rethrow) {
+ logger.debug("Stopped executing {} after {} attempts.", proc, attempts);
+ throw e;
+ }
+ TimeUnit.MILLISECONDS.sleep(retryPolicy.nextRetryWaitTime(attempts, elapsedTime));
+ }
+ }
+ return result;
+ }
+
+ static<T> T syncCallWithRetries(
+ ZooCallable<T> proc, RetryPolicy retryPolicy) throws KeeperException, InterruptedException {
+ return syncCallWithRetries(null, proc, retryPolicy);
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java?rev=1618732&r1=1618731&r2=1618732&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java Mon Aug 18 21:00:15 2014
@@ -80,14 +80,23 @@ public class ZooKeeperUtil {
ZkTmpDir.delete();
ZkTmpDir.mkdir();
+ // start the server and client.
+ restartServer();
+
+ // initialize the zk client with values
+ zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ public void restartServer() throws Exception {
zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir,
- ZooKeeperServer.DEFAULT_TICK_TIME);
+ ZooKeeperServer.DEFAULT_TICK_TIME);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(zkaddr, 100);
serverFactory.startup(zks);
boolean b = ClientBase.waitForServerUp(getZooKeeperConnectString(),
- ClientBase.CONNECTION_TIMEOUT);
+ ClientBase.CONNECTION_TIMEOUT);
LOG.debug("Server up: " + b);
// create a zookeeper client
@@ -95,10 +104,6 @@ public class ZooKeeperUtil {
ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000);
zkc = ZkUtils.createConnectedZookeeperClient(
getZooKeeperConnectString(), w);
-
- // initialize the zk client with values
- zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public void sleepServer(final int seconds, final CountDownLatch l)
@@ -137,7 +142,7 @@ public class ZooKeeperUtil {
zk2.close();
}
- public void killServer() throws Exception {
+ public void stopServer() throws Exception {
if (zkc != null) {
zkc.close();
}
@@ -146,12 +151,16 @@ public class ZooKeeperUtil {
if (serverFactory != null) {
serverFactory.shutdown();
assertTrue("waiting for server down",
- ClientBase.waitForServerDown(getZooKeeperConnectString(),
- ClientBase.CONNECTION_TIMEOUT));
+ ClientBase.waitForServerDown(getZooKeeperConnectString(),
+ ClientBase.CONNECTION_TIMEOUT));
}
if (zks != null) {
zks.getTxnLogFactory().close();
}
+ }
+
+ public void killServer() throws Exception {
+ stopServer();
// ServerStats.unregister();
FileUtils.deleteDirectory(ZkTmpDir);
}