You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/10/20 03:05:43 UTC
[02/16] git commit: ACCUMULO-3242 Add proper retry logic to ZooReader,
ZooReaderWriter and ZooUtil removing the RetryingInstance.
ACCUMULO-3242 Add proper retry logic to ZooReader, ZooReaderWriter and ZooUtil removing the RetryingInstance.
The RetryingInvocationHandler was a nice way to encapsulate retry logic
on a ZooKeeper; however it fails when you actually need to do additional
handling of the KeeperException. Removing the retrying/non-retrying logic
also simplifies methods that the clients can interact with.
ZooUtil still has some unaddressed issues, but I left some TODOs
in place to mark what still needs to be fixed, usually WRT to
session expirations.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c023f74e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c023f74e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c023f74e
Branch: refs/heads/1.6
Commit: c023f74e4329b8c764c46f9468681b71762e976a
Parents: 6ad16a9
Author: Josh Elser <el...@apache.org>
Authored: Fri Oct 17 01:22:15 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Sun Oct 19 20:15:59 2014 -0400
----------------------------------------------------------------------
.../accumulo/fate/zookeeper/IZooReader.java | 20 +-
.../apache/accumulo/fate/zookeeper/Retry.java | 74 +++++
.../accumulo/fate/zookeeper/RetryFactory.java | 36 +++
.../zookeeper/RetryingInvocationHandler.java | 63 ----
.../accumulo/fate/zookeeper/ZooQueueLock.java | 20 +-
.../accumulo/fate/zookeeper/ZooReader.java | 133 +++++++-
.../fate/zookeeper/ZooReaderWriter.java | 129 +++++---
.../accumulo/fate/zookeeper/ZooSession.java | 42 +--
.../apache/accumulo/fate/zookeeper/ZooUtil.java | 301 ++++++++++++++-----
.../RetryingInvocationHandlerTest.java | 87 ------
.../org/apache/accumulo/server/Accumulo.java | 2 +-
.../accumulo/server/master/LiveTServerSet.java | 2 +-
.../security/handler/ZKAuthenticator.java | 42 +--
.../server/security/handler/ZKAuthorizor.java | 8 +-
.../server/security/handler/ZKPermHandler.java | 42 +--
.../accumulo/server/tables/TableManager.java | 16 +-
.../server/tablets/UniqueNameAllocator.java | 2 +-
.../accumulo/server/zookeeper/ZooQueueLock.java | 8 +-
.../server/zookeeper/ZooReaderWriter.java | 45 ---
.../zookeeper/ZooReaderWriterFactory.java | 22 --
.../java/org/apache/accumulo/master/Master.java | 4 +-
.../master/tableOps/CancelCompactions.java | 34 +--
.../accumulo/master/tableOps/CompactRange.java | 6 +-
.../master/tableOps/RenameNamespace.java | 2 +-
.../accumulo/master/tableOps/RenameTable.java | 2 +-
.../apache/accumulo/master/tableOps/Utils.java | 12 +-
.../apache/accumulo/master/util/FateAdmin.java | 2 +-
.../org/apache/accumulo/tserver/Tablet.java | 10 +-
28 files changed, 688 insertions(+), 478 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
index 0610e79..610b1bd 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
@@ -23,21 +23,23 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
public interface IZooReader {
-
+
byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException;
-
+
+ byte[] getData(String zPath, boolean watch, Stat stat) throws KeeperException, InterruptedException;
+
Stat getStatus(String zPath) throws KeeperException, InterruptedException;
-
+
Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
-
+
List<String> getChildren(String zPath) throws KeeperException, InterruptedException;
-
+
List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
-
+
boolean exists(String zPath) throws KeeperException, InterruptedException;
-
+
boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
-
+
void sync(final String path) throws KeeperException, InterruptedException;
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
new file mode 100644
index 0000000..4a37172
--- /dev/null
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import org.apache.log4j.Logger;
+
+/**
+ *
+ */
+public class Retry {
+ private static final Logger log = Logger.getLogger(Retry.class);
+
+ private final long maxRetries, maxWait, waitIncrement;
+ private long retriesDone, currentWait;
+
+ /**
+ * @param maxRetries
+ * Maximum times to retry
+ * @param startWait
+ * The amount of time (ms) to wait for the initial retry
+ * @param maxWait
+ * The maximum wait (ms)
+ * @param waitIncrement
+ * The amount of time (ms) to increment next wait time by
+ */
+ public Retry(long maxRetries, long startWait, long maxWait, long waitIncrement) {
+ this.maxRetries = maxRetries;
+ this.maxWait = maxWait;
+ this.waitIncrement = waitIncrement;
+ this.retriesDone = 0l;
+ this.currentWait = startWait;
+ }
+
+ public boolean canRetry() {
+ return retriesDone < maxRetries;
+ }
+
+ public void useRetry() {
+ if (!canRetry()) {
+ throw new IllegalStateException("No retries left");
+ }
+
+ retriesDone++;
+ }
+
+ public boolean hasRetried() {
+ return retriesDone > 0;
+ }
+
+ public long retriesCompleted() {
+ return retriesDone;
+ }
+
+ public void waitForNextAttempt() throws InterruptedException {
+ log.debug("Sleeping for " + currentWait + "ms before retrying operation");
+ Thread.sleep(currentWait);
+ currentWait = Math.min(maxWait, currentWait + waitIncrement);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
new file mode 100644
index 0000000..3fcb738
--- /dev/null
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+/**
+ *
+ */
+public class RetryFactory {
+
+ private final long maxRetries, startWait, maxWait, waitIncrement;
+
+ public RetryFactory(long maxRetries, long startWait, long maxWait, long waitIncrement) {
+ this.maxRetries = maxRetries;
+ this.startWait = startWait;
+ this.maxWait = maxWait;
+ this.waitIncrement = waitIncrement;
+ }
+
+ public Retry create() {
+ return new Retry(maxRetries, startWait, maxWait, waitIncrement);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandler.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandler.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandler.java
deleted file mode 100644
index 4597036..0000000
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.fate.zookeeper;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * An invocation handler for ZooKeeper reader/writers that retries calls that fail due to connection loss.
- */
-public class RetryingInvocationHandler implements InvocationHandler {
- private final IZooReaderWriter zrw;
-
- /**
- * Creates a new invocation handler.
- *
- * @param zrw
- * ZooKeeper reader/writer being handled
- */
- public RetryingInvocationHandler(IZooReaderWriter zrw) {
- this.zrw = zrw;
- }
-
- private static final long INITIAL_RETRY_TIME = 250L;
- private static final long RETRY_INCREMENT = 250L;
- private static final long MAXIMUM_RETRY_TIME = 5000L;
-
- @Override
- public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
- long retryTime = INITIAL_RETRY_TIME;
- while (true) {
- try {
- return method.invoke(zrw, args);
- } catch (InvocationTargetException e) {
- if (e.getCause() instanceof KeeperException.ConnectionLossException) {
- Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + String.format("%.2f secs", retryTime / 1000.0), e.getCause());
- UtilWaitThread.sleep(retryTime);
- retryTime = Math.min(MAXIMUM_RETRY_TIME, retryTime + RETRY_INCREMENT);
- } else {
- throw e.getCause();
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java
index f30cda3..f9195f3 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java
@@ -29,26 +29,26 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NotEmptyException;
public class ZooQueueLock implements QueueLock {
-
+
private static final String PREFIX = "lock-";
-
+
// private static final Logger log = Logger.getLogger(ZooQueueLock.class);
-
+
private IZooReaderWriter zoo;
private String path;
private boolean ephemeral;
-
+
public ZooQueueLock(String zookeepers, int timeInMillis, String scheme, byte[] auth, String path, boolean ephemeral) throws KeeperException,
InterruptedException {
- this(ZooReaderWriter.getRetryingInstance(zookeepers, timeInMillis, scheme, auth), path, ephemeral);
+ this(ZooReaderWriter.getInstance(zookeepers, timeInMillis, scheme, auth), path, ephemeral);
}
-
+
protected ZooQueueLock(IZooReaderWriter zrw, String path, boolean ephemeral) {
this.zoo = zrw;
this.path = path;
this.ephemeral = ephemeral;
}
-
+
@Override
public long addEntry(byte[] data) {
String newPath;
@@ -72,7 +72,7 @@ public class ZooQueueLock implements QueueLock {
throw new RuntimeException(ex);
}
}
-
+
@Override
public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
SortedMap<Long,byte[]> result = new TreeMap<Long,byte[]>();
@@ -83,7 +83,7 @@ public class ZooQueueLock implements QueueLock {
} catch (KeeperException.NoNodeException ex) {
// the path does not exist (it was deleted or not created yet), that is ok there are no earlier entries then
}
-
+
for (String name : children) {
// this try catch must be done inside the loop because some subset of the children may exist
try {
@@ -100,7 +100,7 @@ public class ZooQueueLock implements QueueLock {
}
return result;
}
-
+
@Override
public void removeEntry(long entry) {
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
index 60660d6..994f395 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
@@ -20,6 +20,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.log4j.Logger;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
@@ -28,8 +29,11 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class ZooReader implements IZooReader {
+ private static final Logger log = Logger.getLogger(ZooReader.class);
+
protected String keepers;
protected int timeout;
+ protected final RetryFactory retryFactory;
protected ZooKeeper getSession(String keepers, int timeout, String scheme, byte[] auth) {
return ZooSession.getSession(keepers, timeout, scheme, auth);
@@ -39,39 +43,153 @@ public class ZooReader implements IZooReader {
return getSession(keepers, timeout, null, null);
}
+ protected void retryOrThrow(Retry retry, KeeperException e) throws KeeperException {
+ log.warn("Saw (possibly) transient exception communicating with ZooKeeper", e);
+ if (retry.canRetry()) {
+ retry.useRetry();
+ return;
+ }
+
+ log.error("Retry attempts (" + retry.retriesCompleted() + ") exceeded trying to communicate with ZooKeeper");
+ throw e;
+ }
+
@Override
public byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException {
- return getZooKeeper().getData(zPath, false, stat);
+ return getData(zPath, false, stat);
+ }
+
+ @Override
+ public byte[] getData(String zPath, boolean watch, Stat stat) throws KeeperException, InterruptedException {
+ final Retry retry = retryFactory.create();
+ while (true) {
+ try {
+ return getZooKeeper().getData(zPath, watch, stat);
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
@Override
public Stat getStatus(String zPath) throws KeeperException, InterruptedException {
- return getZooKeeper().exists(zPath, false);
+ final Retry retry = retryFactory.create();
+ while (true) {
+ try {
+ return getZooKeeper().exists(zPath, false);
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
@Override
public Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
- return getZooKeeper().exists(zPath, watcher);
+ final Retry retry = retryFactory.create();
+ while (true) {
+ try {
+ return getZooKeeper().exists(zPath, watcher);
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
@Override
public List<String> getChildren(String zPath) throws KeeperException, InterruptedException {
- return getZooKeeper().getChildren(zPath, false);
+ final Retry retry = retryFactory.create();
+ while (true) {
+ try {
+ return getZooKeeper().getChildren(zPath, false);
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
@Override
public List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
- return getZooKeeper().getChildren(zPath, watcher);
+ final Retry retry = retryFactory.create();
+ while (true) {
+ try {
+ return getZooKeeper().getChildren(zPath, watcher);
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
@Override
public boolean exists(String zPath) throws KeeperException, InterruptedException {
- return getZooKeeper().exists(zPath, false) != null;
+ final Retry retry = retryFactory.create();
+ while (true) {
+ try {
+ return getZooKeeper().exists(zPath, false) != null;
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
@Override
public boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
- return getZooKeeper().exists(zPath, watcher) != null;
+ final Retry retry = retryFactory.create();
+ while (true) {
+ try {
+ return getZooKeeper().exists(zPath, watcher) != null;
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
@Override
@@ -101,5 +219,6 @@ public class ZooReader implements IZooReader {
public ZooReader(String keepers, int timeout) {
this.keepers = keepers;
this.timeout = timeout;
+ this.retryFactory = new RetryFactory(10l, 250l, 250l, 5000l);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
index 1f0ae14..b29b88a 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
@@ -16,31 +16,29 @@
*/
package org.apache.accumulo.fate.zookeeper;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Proxy;
import java.security.SecurityPermission;
import java.util.Arrays;
import java.util.List;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.BadVersionException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
public class ZooReaderWriter extends ZooReader implements IZooReaderWriter {
-
+ private static final Logger log = Logger.getLogger(ZooReaderWriter.class);
+
private static SecurityPermission ZOOWRITER_PERMISSION = new SecurityPermission("zookeeperWriterPermission");
-
+
private static ZooReaderWriter instance = null;
- private static IZooReaderWriter retryingInstance = null;
private final String scheme;
private final byte[] auth;
-
+
@Override
public ZooKeeper getZooKeeper() {
SecurityManager sm = System.getSecurityManager();
@@ -49,127 +47,158 @@ public class ZooReaderWriter extends ZooReader implements IZooReaderWriter {
}
return getSession(keepers, timeout, scheme, auth);
}
-
+
public ZooReaderWriter(String string, int timeInMillis, String scheme, byte[] auth) {
super(string, timeInMillis);
this.scheme = scheme;
this.auth = Arrays.copyOf(auth, auth.length);
}
-
+
@Override
public void recursiveDelete(String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
ZooUtil.recursiveDelete(getZooKeeper(), zPath, policy);
}
-
+
@Override
public void recursiveDelete(String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
ZooUtil.recursiveDelete(getZooKeeper(), zPath, version, policy);
}
-
+
/**
* Create a persistent node with the default ACL
- *
+ *
* @return true if the node was created or altered; false if it was skipped
*/
@Override
public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
return ZooUtil.putPersistentData(getZooKeeper(), zPath, data, policy);
}
-
+
@Override
public boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
return ZooUtil.putPrivatePersistentData(getZooKeeper(), zPath, data, policy);
}
-
+
@Override
public void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
ZooUtil.putPersistentData(getZooKeeper(), zPath, data, version, policy);
}
-
+
@Override
public String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
return ZooUtil.putPersistentSequential(getZooKeeper(), zPath, data);
}
-
+
@Override
public String putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException {
return ZooUtil.putEphemeralData(getZooKeeper(), zPath, data);
}
-
+
@Override
public String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
return ZooUtil.putEphemeralSequential(getZooKeeper(), zPath, data);
}
-
+
@Override
public void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
ZooUtil.recursiveCopyPersistent(getZooKeeper(), source, destination, policy);
}
-
+
@Override
public void delete(String path, int version) throws InterruptedException, KeeperException {
- getZooKeeper().delete(path, version);
+ final Retry retry = retryFactory.create();
+ while (true) {
+ try {
+ getZooKeeper().delete(path, version);
+ return;
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.NONODE) {
+ if (retry.hasRetried()) {
+ // A retried delete could have deleted the node, assume that was the case
+ log.debug("Delete saw no node on a retry. Assuming node was deleted");
+ return;
+ }
+
+ throw e;
+ } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+ // retry if we have more attempts to do so
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
-
+
@Override
public byte[] mutate(String zPath, byte[] createValue, List<ACL> acl, Mutator mutator) throws Exception {
if (createValue != null) {
- try {
- getZooKeeper().create(zPath, createValue, acl, CreateMode.PERSISTENT);
- return createValue;
- } catch (NodeExistsException ex) {
- // expected
+ while (true) {
+ final Retry retry = retryFactory.create();
+ try {
+ getZooKeeper().create(zPath, createValue, acl, CreateMode.PERSISTENT);
+ return createValue;
+ } catch (KeeperException ex) {
+ final Code code = ex.code();
+ if (code == Code.NODEEXISTS) {
+ // expected
+ break;
+ } else if (code == Code.OPERATIONTIMEOUT || code == Code.CONNECTIONLOSS || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, ex);
+ } else {
+ throw ex;
+ }
+ }
+
+ retry.waitForNextAttempt();
}
}
do {
+ final Retry retry = retryFactory.create();
Stat stat = new Stat();
- byte[] data = getZooKeeper().getData(zPath, false, stat);
+ byte[] data = getData(zPath, false, stat);
data = mutator.mutate(data);
if (data == null)
return data;
try {
getZooKeeper().setData(zPath, data, stat.getVersion());
return data;
- } catch (BadVersionException ex) {
- //
+ } catch (KeeperException ex) {
+ final Code code = ex.code();
+ if (code == Code.BADVERSION) {
+ // Retry, but don't increment. This makes it backwards compatible with the infinite
+ // loop that previously happened. I'm not sure if that's really desirable though.
+ } else if (code == Code.OPERATIONTIMEOUT || code == Code.CONNECTIONLOSS || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, ex);
+ retry.waitForNextAttempt();
+ } else {
+ throw ex;
+ }
}
} while (true);
}
-
+
public static synchronized ZooReaderWriter getInstance(String zookeepers, int timeInMillis, String scheme, byte[] auth) {
if (instance == null)
instance = new ZooReaderWriter(zookeepers, timeInMillis, scheme, auth);
return instance;
}
-
- /**
- * get an instance that retries when zookeeper connection errors occur
- *
- * @return an instance that retries when Zookeeper connection errors occur.
- */
- public static synchronized IZooReaderWriter getRetryingInstance(String zookeepers, int timeInMillis, String scheme, byte[] auth) {
-
- if (retryingInstance == null) {
- IZooReaderWriter inst = getInstance(zookeepers, timeInMillis, scheme, auth);
- InvocationHandler ih = new RetryingInvocationHandler(inst);
- retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(ZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
- }
-
- return retryingInstance;
- }
-
+
@Override
public boolean isLockHeld(ZooUtil.LockID lockID) throws KeeperException, InterruptedException {
return ZooUtil.isLockHeld(getZooKeeper(), lockID);
}
-
+
@Override
public void mkdirs(String path) throws KeeperException, InterruptedException {
if (path.equals(""))
return;
if (!path.startsWith("/"))
throw new IllegalArgumentException(path + "does not start with /");
- if (getZooKeeper().exists(path, false) != null)
+ if (exists(path))
return;
String parent = path.substring(0, path.lastIndexOf("/"));
mkdirs(parent);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
index 33bd77b..b6890c6 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
@@ -33,7 +33,7 @@ import org.apache.zookeeper.ZooKeeper.States;
public class ZooSession {
private static final Charset UTF8 = Charset.forName("UTF-8");
-
+
public static class ZooSessionShutdownException extends RuntimeException {
private static final long serialVersionUID = 1L;
@@ -41,31 +41,32 @@ public class ZooSession {
}
private static final Logger log = Logger.getLogger(ZooSession.class);
-
+
private static class ZooSessionInfo {
public ZooSessionInfo(ZooKeeper zooKeeper, ZooWatcher watcher) {
this.zooKeeper = zooKeeper;
}
-
+
ZooKeeper zooKeeper;
}
-
+
private static Map<String,ZooSessionInfo> sessions = new HashMap<String,ZooSessionInfo>();
-
+
private static String sessionKey(String keepers, int timeout, String scheme, byte[] auth) {
return keepers + ":" + timeout + ":" + (scheme == null ? "" : scheme) + ":" + (auth == null ? "" : new String(auth, UTF8));
}
-
+
private static class ZooWatcher implements Watcher {
-
+
+ @Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.Expired) {
log.debug("Session expired, state of current session : " + event.getState());
}
}
-
+
}
-
+
/**
* @param host comma separated list of zk servers
* @param timeout in milliseconds
@@ -79,9 +80,9 @@ public class ZooSession {
boolean tryAgain = true;
long sleepTime = 100;
ZooKeeper zooKeeper = null;
-
+
long startTime = System.currentTimeMillis();
-
+
while (tryAgain) {
try {
zooKeeper = new ZooKeeper(host, timeout, watcher);
@@ -94,7 +95,7 @@ public class ZooSession {
} else
UtilWaitThread.sleep(TIME_BETWEEN_CONNECT_CHECKS_MS);
}
-
+
} catch (IOException e) {
if (e instanceof UnknownHostException) {
/*
@@ -116,13 +117,13 @@ public class ZooSession {
if (System.currentTimeMillis() - startTime > 2 * timeout) {
throw new RuntimeException("Failed to connect to zookeeper (" + host + ") within 2x zookeeper timeout period " + timeout);
}
-
+
if (tryAgain) {
if (startTime + 2 * timeout < System.currentTimeMillis() + sleepTime + connectTimeWait)
sleepTime = startTime + 2 * timeout - System.currentTimeMillis() - connectTimeWait;
if (sleepTime < 0)
{
- connectTimeWait -= sleepTime;
+ connectTimeWait -= sleepTime;
sleepTime = 0;
}
UtilWaitThread.sleep(sleepTime);
@@ -130,31 +131,32 @@ public class ZooSession {
sleepTime = sleepTime + (long)(sleepTime * Math.random());
}
}
-
+
return zooKeeper;
}
-
+
public static synchronized ZooKeeper getSession(String zooKeepers, int timeout) {
return getSession(zooKeepers, timeout, null, null);
}
-
+
public static synchronized ZooKeeper getSession(String zooKeepers, int timeout, String scheme, byte[] auth) {
-
+
if (sessions == null)
throw new ZooSessionShutdownException();
String sessionKey = sessionKey(zooKeepers, timeout, scheme, auth);
-
+
// a read-only session can use a session with authorizations, so cache a copy for it w/out auths
String readOnlySessionKey = sessionKey(zooKeepers, timeout, null, null);
ZooSessionInfo zsi = sessions.get(sessionKey);
if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) {
+ log.debug("Removing closed ZooKeeper session to " + zooKeepers);
if (auth != null && sessions.get(readOnlySessionKey) == zsi)
sessions.remove(readOnlySessionKey);
zsi = null;
sessions.remove(sessionKey);
}
-
+
if (zsi == null) {
ZooWatcher watcher = new ZooWatcher();
log.debug("Connecting to " + zooKeepers + " with timeout " + timeout + " with auth");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
index 5b856c0..e0d8831 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
@@ -21,12 +21,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.ZooKeeper;
@@ -34,27 +33,29 @@ import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
public class ZooUtil {
+ private static final Logger log = Logger.getLogger(ZooUtil.class);
+
public enum NodeExistsPolicy {
SKIP, OVERWRITE, FAIL
}
-
+
public enum NodeMissingPolicy {
SKIP, CREATE, FAIL
}
-
+
public static class LockID {
public long eid;
public String path;
public String node;
-
+
public LockID(String root, String serializedLID) {
String sa[] = serializedLID.split("\\$");
int lastSlash = sa[0].lastIndexOf('/');
-
+
if (sa.length != 2 || lastSlash < 0) {
throw new IllegalArgumentException("Malformed serialized lock id " + serializedLID);
}
-
+
if (lastSlash == 0)
path = root;
else
@@ -62,37 +63,50 @@ public class ZooUtil {
node = sa[0].substring(lastSlash + 1);
eid = new BigInteger(sa[1], 16).longValue();
}
-
+
public LockID(String path, String node, long eid) {
this.path = path;
this.node = node;
this.eid = eid;
}
-
+
public String serialize(String root) {
-
+
return path.substring(root.length()) + "/" + node + "$" + Long.toHexString(eid);
}
-
+
@Override
public String toString() {
return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid);
}
}
-
+
public static final List<ACL> PRIVATE;
public static final List<ACL> PUBLIC;
+ private static final RetryFactory RETRY_FACTORY;
static {
PRIVATE = new ArrayList<ACL>();
PRIVATE.addAll(Ids.CREATOR_ALL_ACL);
PUBLIC = new ArrayList<ACL>();
PUBLIC.addAll(PRIVATE);
PUBLIC.add(new ACL(Perms.READ, Ids.ANYONE_ID_UNSAFE));
+ RETRY_FACTORY = new RetryFactory(10l, 250l, 250l, 5000l);
}
-
+
+ protected static void retryOrThrow(Retry retry, KeeperException e) throws KeeperException {
+ log.warn("Saw (possibly) transient exception communicating with ZooKeeper", e);
+ if (retry.canRetry()) {
+ retry.useRetry();
+ return;
+ }
+
+ log.error("Retry attempts (" + retry.retriesCompleted() + ") exceeded trying to communicate with ZooKeeper");
+ throw e;
+ }
+
/**
* This method will delete a node and all its children from zookeeper
- *
+ *
* @param zPath
* the path to delete
*/
@@ -100,82 +114,180 @@ public class ZooUtil {
if (policy.equals(NodeMissingPolicy.CREATE))
throw new IllegalArgumentException(policy.name() + " is invalid for this operation");
try {
- for (String child : zk.getChildren(zPath, false))
+ List<String> children;
+ final Retry retry = RETRY_FACTORY.create();
+ while (true) {
+ try {
+ children = zk.getChildren(zPath, false);
+ break;
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+ retry.waitForNextAttempt();
+ }
+ for (String child : children)
recursiveDelete(zk, zPath + "/" + child, NodeMissingPolicy.SKIP);
-
+
Stat stat;
- if ((stat = zk.exists(zPath, null)) != null)
- zk.delete(zPath, stat.getVersion());
+ while (true) {
+ try {
+ stat = zk.exists(zPath, null);
+ // Node exists
+ if (stat != null) {
+ try {
+ // Try to delete it
+ zk.delete(zPath, stat.getVersion());
+ return;
+ } catch (NoNodeException e) {
+ // If the node is gone now, it's ok if we have SKIP
+ if (policy.equals(NodeMissingPolicy.SKIP)) {
+ return;
+ }
+ throw e;
+ }
+ // Let other KeeperException bubble to the outer catch
+ }
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
} catch (KeeperException e) {
if (policy.equals(NodeMissingPolicy.SKIP) && e.code().equals(KeeperException.Code.NONODE))
return;
throw e;
}
}
-
+
public static void recursiveDelete(ZooKeeper zk, String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
recursiveDelete(zk, zPath, -1, policy);
}
-
+
/**
* Create a persistent node with the default ACL
- *
+ *
* @return true if the node was created or altered; false if it was skipped
*/
public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PUBLIC);
}
-
+
public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException,
InterruptedException {
return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, PUBLIC);
}
-
+
public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version, NodeExistsPolicy policy, List<ACL> acls)
throws KeeperException, InterruptedException {
return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, acls);
}
-
+
private static boolean putData(ZooKeeper zk, String zPath, byte[] data, CreateMode mode, int version, NodeExistsPolicy policy, List<ACL> acls)
throws KeeperException, InterruptedException {
if (policy == null)
policy = NodeExistsPolicy.FAIL;
-
+
+ final Retry retry = RETRY_FACTORY.create();
while (true) {
try {
zk.create(zPath, data, acls, mode);
return true;
- } catch (NodeExistsException nee) {
- switch (policy) {
- case SKIP:
- return false;
- case OVERWRITE:
- try {
- zk.setData(zPath, data, version);
- return true;
- } catch (NoNodeException nne) {
- // node delete between create call and set data, so try create call again
- continue;
- }
- default:
- throw nee;
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.NODEEXISTS) {
+ switch (policy) {
+ case SKIP:
+ return false;
+ case OVERWRITE:
+ // overwrite the data in the node when it already exists
+ try {
+ zk.setData(zPath, data, version);
+ return true;
+ } catch (KeeperException e2) {
+ final Code code2 = e2.code();
+ if (code2 == Code.NONODE) {
+ // node delete between create call and set data, so try create call again
+ continue;
+ } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+ // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+ retryOrThrow(retry, e);
+ } else {
+ // unhandled exception on setData()
+ throw e;
+ }
+ }
+ default:
+ throw e;
+ }
+ } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+ // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+ retryOrThrow(retry, e);
+ } else {
+ // unhandled exception on create()
+ throw e;
}
}
+
+ // Catch all to wait before retrying
+ retry.waitForNextAttempt();
}
}
-
+
public static byte[] getData(ZooKeeper zk, String zPath, Stat stat) throws KeeperException, InterruptedException {
- return zk.getData(zPath, false, stat);
+ final Retry retry = RETRY_FACTORY.create();
+ while (true) {
+ try {
+ return zk.getData(zPath, false, stat);
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
-
+
public static Stat getStatus(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException {
- return zk.exists(zPath, false);
+ final Retry retry = RETRY_FACTORY.create();
+ while (true) {
+ try {
+ return zk.exists(zPath, false);
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
-
+
public static boolean exists(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException {
return getStatus(zk, zPath) != null;
}
-
+
public static void recursiveCopyPersistent(ZooKeeper zk, String source, String destination, NodeExistsPolicy policy) throws KeeperException,
InterruptedException {
Stat stat = null;
@@ -192,9 +304,10 @@ public class ZooUtil {
throw KeeperException.create(Code.NODEEXISTS, source);
}
}
-
+
stat = new Stat();
- byte[] data = zk.getData(source, false, stat);
+ byte[] data = getData(zk, source, stat);
+
if (stat.getEphemeralOwner() == 0) {
if (data == null)
throw KeeperException.create(Code.NONODE, source);
@@ -204,61 +317,113 @@ public class ZooUtil {
recursiveCopyPersistent(zk, source + "/" + child, destination + "/" + child, policy);
}
}
-
+
public static boolean putPrivatePersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PRIVATE);
}
-
+
public static String putPersistentSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
- return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL);
+ final Retry retry = RETRY_FACTORY.create();
+ while (true) {
+ try {
+ return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL);
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
-
+
public static String putEphemeralData(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
- return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL);
+ final Retry retry = RETRY_FACTORY.create();
+ while (true) {
+ try {
+ return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL);
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
public static String putEphemeralSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
- return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL);
+ final Retry retry = RETRY_FACTORY.create();
+ while (true) {
+ try {
+ return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL);
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
}
-
+
public static byte[] getLockData(ZooCache zc, String path) {
-
+
List<String> children = zc.getChildren(path);
-
+
if (children == null || children.size() == 0) {
return null;
}
-
+
children = new ArrayList<String>(children);
Collections.sort(children);
-
+
String lockNode = children.get(0);
-
+
return zc.get(path + "/" + lockNode);
}
-
+
public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
-
+ final Retry retry = RETRY_FACTORY.create();
while (true) {
try {
+ // TODO push down retry to getChildren and exists
List<String> children = zk.getChildren(lid.path, false);
-
+
if (children.size() == 0) {
return false;
}
-
+
Collections.sort(children);
-
+
String lockNode = children.get(0);
if (!lid.node.equals(lockNode))
return false;
-
+
Stat stat = zk.exists(lid.path + "/" + lid.node, false);
return stat != null && stat.getEphemeralOwner() == lid.eid;
- } catch (KeeperException.ConnectionLossException ex) {
- UtilWaitThread.sleep(1000);
+ } catch (KeeperException ex) {
+ final Code c = ex.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+ retryOrThrow(retry, ex);
+ }
}
+
+ retry.waitForNextAttempt();
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandlerTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandlerTest.java
deleted file mode 100644
index 0613a1f..0000000
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandlerTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.fate.zookeeper;
-
-import java.lang.reflect.Method;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.easymock.EasyMock.aryEq;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-
-public class RetryingInvocationHandlerTest {
- private static final String PATH = "/path/to/somewhere";
- private static final byte[] DATA = {(byte) 1, (byte) 2};
- private static final Object[] ARGS = {PATH, DATA};
- private static final String RV = "OK";
-
- private static Method putMethod;
-
- @BeforeClass
- public static void setUpClass() throws Exception {
- putMethod = IZooReaderWriter.class.getMethod("putEphemeralData", String.class, byte[].class);
- }
-
- private IZooReaderWriter zrw;
- private RetryingInvocationHandler ih;
-
- @Before
- public void setUp() throws Exception {
- zrw = createMock(IZooReaderWriter.class);
- ih = new RetryingInvocationHandler(zrw);
- }
-
- @Test
- public void testInvokeSuccessful() throws Throwable {
- expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andReturn(RV);
- replay(zrw);
- Object rv = ih.invoke(null, putMethod, ARGS);
- verify(zrw);
- assertEquals(RV, rv);
- }
-
- @Test
- public void testInvokeRetrySuccessful() throws Throwable {
- ConnectionLossException e = createMock(ConnectionLossException.class);
- expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andThrow(e);
- expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andThrow(e);
- expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andReturn(RV);
- replay(zrw);
- Object rv = ih.invoke(null, putMethod, ARGS);
- verify(zrw);
- assertEquals(RV, rv);
- }
-
- @Test(expected = InterruptedException.class)
- public void testInvokeRetryFailure() throws Throwable {
- ConnectionLossException e = createMock(ConnectionLossException.class);
- expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andThrow(e);
- expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andThrow(new InterruptedException());
- replay(zrw);
- try {
- ih.invoke(null, putMethod, ARGS);
- } finally {
- verify(zrw);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
index ca70efe..6ba05cc 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -301,7 +301,7 @@ public class Accumulo {
public static void abortIfFateTransactions() {
try {
final ReadOnlyTStore<Accumulo> fate = new ReadOnlyStore<Accumulo>(new ZooStore<Accumulo>(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZFATE,
- ZooReaderWriter.getRetryingInstance()));
+ ZooReaderWriter.getInstance()));
if (!(fate.list().isEmpty())) {
throw new AccumuloException("Aborting upgrade because there are outstanding FATE transactions from a previous Accumulo version. Please see the README document for instructions on what to do under your previous version.");
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
index 63bd894..e8d5bbf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
@@ -388,7 +388,7 @@ public class LiveTServerSet implements Watcher {
log.info("Removing zookeeper lock for " + server);
String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath;
try {
- ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP);
+ ZooReaderWriter.getInstance().recursiveDelete(fullpath, SKIP);
} catch (Exception e) {
String msg = "error removing tablet server lock";
log.fatal(msg, e);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
index 1646a28..5f6ff71 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
@@ -39,40 +39,40 @@ import org.apache.zookeeper.KeeperException;
public final class ZKAuthenticator implements Authenticator {
static final Logger log = Logger.getLogger(ZKAuthenticator.class);
private static Authenticator zkAuthenticatorInstance = null;
-
+
private String ZKUserPath;
private final ZooCache zooCache;
-
+
public static synchronized Authenticator getInstance() {
if (zkAuthenticatorInstance == null)
zkAuthenticatorInstance = new ZKAuthenticator();
return zkAuthenticatorInstance;
}
-
+
public ZKAuthenticator() {
zooCache = new ZooCache();
}
-
+
@Override
public void initialize(String instanceId, boolean initialize) {
ZKUserPath = Constants.ZROOT + "/" + instanceId + "/users";
}
-
+
@Override
public void initializeSecurity(TCredentials credentials, String principal, byte[] token) throws AccumuloSecurityException {
try {
// remove old settings from zookeeper first, if any
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
synchronized (zooCache) {
zooCache.clear();
if (zoo.exists(ZKUserPath)) {
zoo.recursiveDelete(ZKUserPath, NodeMissingPolicy.SKIP);
log.info("Removed " + ZKUserPath + "/" + " from zookeeper");
}
-
+
// prep parent node of users with root username
zoo.putPersistentData(ZKUserPath, principal.getBytes(Constants.UTF8), NodeExistsPolicy.FAIL);
-
+
constructUser(principal, ZKSecurityTool.createPass(token));
}
} catch (KeeperException e) {
@@ -86,23 +86,23 @@ public final class ZKAuthenticator implements Authenticator {
throw new RuntimeException(e);
}
}
-
+
/**
* Sets up the user in ZK for the provided user. No checking for existence is done here, it should be done before calling.
*/
private void constructUser(String user, byte[] pass) throws KeeperException, InterruptedException {
synchronized (zooCache) {
zooCache.clear();
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
zoo.putPrivatePersistentData(ZKUserPath + "/" + user, pass, NodeExistsPolicy.FAIL);
}
}
-
+
@Override
public Set<String> listUsers() {
return new TreeSet<String>(zooCache.getChildren(ZKUserPath));
}
-
+
/**
* Creates a user with no permissions whatsoever
*/
@@ -125,13 +125,13 @@ public final class ZKAuthenticator implements Authenticator {
throw new AccumuloSecurityException(principal, SecurityErrorCode.DEFAULT_SECURITY_ERROR, e);
}
}
-
+
@Override
public void dropUser(String user) throws AccumuloSecurityException {
try {
synchronized (zooCache) {
zooCache.clear();
- ZooReaderWriter.getRetryingInstance().recursiveDelete(ZKUserPath + "/" + user, NodeMissingPolicy.FAIL);
+ ZooReaderWriter.getInstance().recursiveDelete(ZKUserPath + "/" + user, NodeMissingPolicy.FAIL);
}
} catch (InterruptedException e) {
log.error(e, e);
@@ -143,7 +143,7 @@ public final class ZKAuthenticator implements Authenticator {
throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
}
}
-
+
@Override
public void changePassword(String principal, AuthenticationToken token) throws AccumuloSecurityException {
if (!(token instanceof PasswordToken))
@@ -153,7 +153,7 @@ public final class ZKAuthenticator implements Authenticator {
try {
synchronized (zooCache) {
zooCache.clear(ZKUserPath + "/" + principal);
- ZooReaderWriter.getRetryingInstance().putPrivatePersistentData(ZKUserPath + "/" + principal, ZKSecurityTool.createPass(pt.getPassword()),
+ ZooReaderWriter.getInstance().putPrivatePersistentData(ZKUserPath + "/" + principal, ZKSecurityTool.createPass(pt.getPassword()),
NodeExistsPolicy.OVERWRITE);
}
} catch (KeeperException e) {
@@ -169,7 +169,7 @@ public final class ZKAuthenticator implements Authenticator {
} else
throw new AccumuloSecurityException(principal, SecurityErrorCode.USER_DOESNT_EXIST); // user doesn't exist
}
-
+
/**
* Checks if a user exists
*/
@@ -177,12 +177,12 @@ public final class ZKAuthenticator implements Authenticator {
public boolean userExists(String user) {
return zooCache.get(ZKUserPath + "/" + user) != null;
}
-
+
@Override
public boolean validSecurityHandlers(Authorizor auth, PermissionHandler pm) {
return true;
}
-
+
@Override
public boolean authenticateUser(String principal, AuthenticationToken token) throws AccumuloSecurityException {
if (!(token instanceof PasswordToken))
@@ -199,14 +199,14 @@ public final class ZKAuthenticator implements Authenticator {
}
return result;
}
-
+
@Override
public Set<Class<? extends AuthenticationToken>> getSupportedTokenTypes() {
Set<Class<? extends AuthenticationToken>> cs = new HashSet<Class<? extends AuthenticationToken>>();
cs.add(PasswordToken.class);
return cs;
}
-
+
@Override
public boolean validTokenClass(String tokenClass) {
return tokenClass.equals(PasswordToken.class.getName());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
index bbaf592..75b73fc 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
@@ -81,7 +81,7 @@ public class ZKAuthorizor implements Authorizor {
@Override
public void initializeSecurity(TCredentials itw, String rootuser) throws AccumuloSecurityException {
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
// create the root user with all system privileges, no table privileges, and no record-level authorizations
Set<SystemPermission> rootPerms = new TreeSet<SystemPermission>();
@@ -110,7 +110,7 @@ public class ZKAuthorizor implements Authorizor {
@Override
public void initUser(String user) throws AccumuloSecurityException {
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
try {
zoo.putPersistentData(ZKUserPath + "/" + user, new byte[0], NodeExistsPolicy.SKIP);
} catch (KeeperException e) {
@@ -126,7 +126,7 @@ public class ZKAuthorizor implements Authorizor {
public void dropUser(String user) throws AccumuloSecurityException {
try {
synchronized (zooCache) {
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserAuths, NodeMissingPolicy.SKIP);
zooCache.clear(ZKUserPath + "/" + user);
}
@@ -147,7 +147,7 @@ public class ZKAuthorizor implements Authorizor {
try {
synchronized (zooCache) {
zooCache.clear();
- ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserAuths, ZKSecurityTool.convertAuthorizations(authorizations),
+ ZooReaderWriter.getInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserAuths, ZKSecurityTool.convertAuthorizations(authorizations),
NodeExistsPolicy.OVERWRITE);
}
} catch (KeeperException e) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
index 1b7e7d3..3cac2e9 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
@@ -81,14 +81,14 @@ public class ZKPermHandler implements PermissionHandler {
byte[] serializedPerms;
try {
String path = ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table;
- ZooReaderWriter.getRetryingInstance().sync(path);
- serializedPerms = ZooReaderWriter.getRetryingInstance().getData(path, null);
+ ZooReaderWriter.getInstance().sync(path);
+ serializedPerms = ZooReaderWriter.getInstance().getData(path, null);
} catch (KeeperException e) {
if (e.code() == Code.NONODE) {
// maybe the table was just deleted?
try {
// check for existence:
- ZooReaderWriter.getRetryingInstance().getData(ZKTablePath + "/" + table, null);
+ ZooReaderWriter.getInstance().getData(ZKTablePath + "/" + table, null);
// it's there, you don't have permission
return false;
} catch (InterruptedException ex) {
@@ -129,14 +129,14 @@ public class ZKPermHandler implements PermissionHandler {
byte[] serializedPerms;
try {
String path = ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace;
- ZooReaderWriter.getRetryingInstance().sync(path);
- serializedPerms = ZooReaderWriter.getRetryingInstance().getData(path, null);
+ ZooReaderWriter.getInstance().sync(path);
+ serializedPerms = ZooReaderWriter.getInstance().getData(path, null);
} catch (KeeperException e) {
if (e.code() == Code.NONODE) {
// maybe the namespace was just deleted?
try {
// check for existence:
- ZooReaderWriter.getRetryingInstance().getData(ZKNamespacePath + "/" + namespace, null);
+ ZooReaderWriter.getInstance().getData(ZKNamespacePath + "/" + namespace, null);
// it's there, you don't have permission
return false;
} catch (InterruptedException ex) {
@@ -187,7 +187,7 @@ public class ZKPermHandler implements PermissionHandler {
if (perms.add(permission)) {
synchronized (zooCache) {
zooCache.clear();
- ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(perms),
+ ZooReaderWriter.getInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(perms),
NodeExistsPolicy.OVERWRITE);
}
}
@@ -213,7 +213,7 @@ public class ZKPermHandler implements PermissionHandler {
if (tablePerms.add(permission)) {
synchronized (zooCache) {
zooCache.clear(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, ZKSecurityTool.convertTablePermissions(tablePerms),
NodeExistsPolicy.OVERWRITE);
}
@@ -240,7 +240,7 @@ public class ZKPermHandler implements PermissionHandler {
if (namespacePerms.add(permission)) {
synchronized (zooCache) {
zooCache.clear(ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace);
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, ZKSecurityTool.convertNamespacePermissions(namespacePerms),
NodeExistsPolicy.OVERWRITE);
}
@@ -268,7 +268,7 @@ public class ZKPermHandler implements PermissionHandler {
if (sysPerms.remove(permission)) {
synchronized (zooCache) {
zooCache.clear();
- ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(sysPerms),
+ ZooReaderWriter.getInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(sysPerms),
NodeExistsPolicy.OVERWRITE);
}
}
@@ -293,7 +293,7 @@ public class ZKPermHandler implements PermissionHandler {
try {
if (tablePerms.remove(permission)) {
zooCache.clear();
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
if (tablePerms.size() == 0)
zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, NodeMissingPolicy.SKIP);
else
@@ -321,7 +321,7 @@ public class ZKPermHandler implements PermissionHandler {
try {
if (namespacePerms.remove(permission)) {
zooCache.clear();
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
if (namespacePerms.size() == 0)
zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, NodeMissingPolicy.SKIP);
else
@@ -342,7 +342,7 @@ public class ZKPermHandler implements PermissionHandler {
try {
synchronized (zooCache) {
zooCache.clear();
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
for (String user : zooCache.getChildren(ZKUserPath))
zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, NodeMissingPolicy.SKIP);
}
@@ -360,7 +360,7 @@ public class ZKPermHandler implements PermissionHandler {
try {
synchronized (zooCache) {
zooCache.clear();
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
for (String user : zooCache.getChildren(ZKUserPath))
zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, NodeMissingPolicy.SKIP);
}
@@ -375,7 +375,7 @@ public class ZKPermHandler implements PermissionHandler {
@Override
public void initializeSecurity(TCredentials itw, String rootuser) throws AccumuloSecurityException {
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
// create the root user with all system privileges, no table privileges, and no record-level authorizations
Set<SystemPermission> rootPerms = new TreeSet<SystemPermission>();
@@ -412,7 +412,7 @@ public class ZKPermHandler implements PermissionHandler {
@Override
public void initUser(String user) throws AccumuloSecurityException {
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
try {
zoo.putPersistentData(ZKUserPath + "/" + user, new byte[0], NodeExistsPolicy.SKIP);
zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms, new byte[0], NodeExistsPolicy.SKIP);
@@ -432,7 +432,7 @@ public class ZKPermHandler implements PermissionHandler {
private void createTablePerm(String user, String table, Set<TablePermission> perms) throws KeeperException, InterruptedException {
synchronized (zooCache) {
zooCache.clear();
- ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table,
+ ZooReaderWriter.getInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table,
ZKSecurityTool.convertTablePermissions(perms), NodeExistsPolicy.FAIL);
}
}
@@ -443,7 +443,7 @@ public class ZKPermHandler implements PermissionHandler {
private void createNamespacePerm(String user, String namespace, Set<NamespacePermission> perms) throws KeeperException, InterruptedException {
synchronized (zooCache) {
zooCache.clear();
- ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace,
+ ZooReaderWriter.getInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace,
ZKSecurityTool.convertNamespacePermissions(perms), NodeExistsPolicy.FAIL);
}
}
@@ -452,7 +452,7 @@ public class ZKPermHandler implements PermissionHandler {
public void cleanUser(String user) throws AccumuloSecurityException {
try {
synchronized (zooCache) {
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserSysPerms, NodeMissingPolicy.SKIP);
zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms, NodeMissingPolicy.SKIP);
zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserNamespacePerms, NodeMissingPolicy.SKIP);
@@ -475,8 +475,8 @@ public class ZKPermHandler implements PermissionHandler {
byte[] perms;
try {
String path = ZKUserPath + "/" + user + ZKUserSysPerms;
- ZooReaderWriter.getRetryingInstance().sync(path);
- perms = ZooReaderWriter.getRetryingInstance().getData(path, null);
+ ZooReaderWriter.getInstance().sync(path);
+ perms = ZooReaderWriter.getInstance().getData(path, null);
} catch (KeeperException e) {
if (e.code() == Code.NONODE) {
return false;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
index 7a61eb6..71b7155 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
@@ -63,7 +63,7 @@ public class TableManager {
log.debug("Creating ZooKeeper entries for new namespace " + namespace + " (ID: " + namespaceId + ")");
String zPath = Constants.ZROOT + "/" + instanceId + Constants.ZNAMESPACES + "/" + namespaceId;
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
zoo.putPersistentData(zPath, new byte[0], existsPolicy);
zoo.putPersistentData(zPath + Constants.ZNAMESPACE_NAME, namespace.getBytes(Constants.UTF8), existsPolicy);
zoo.putPersistentData(zPath + Constants.ZNAMESPACE_CONF, new byte[0], existsPolicy);
@@ -76,7 +76,7 @@ public class TableManager {
Pair<String,String> qualifiedTableName = Tables.qualify(tableName);
tableName = qualifiedTableName.getSecond();
String zTablePath = Constants.ZROOT + "/" + instanceId + Constants.ZTABLES + "/" + tableId;
- IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
zoo.putPersistentData(zTablePath, new byte[0], existsPolicy);
zoo.putPersistentData(zTablePath + Constants.ZTABLE_CONF, new byte[0], existsPolicy);
zoo.putPersistentData(zTablePath + Constants.ZTABLE_NAMESPACE, namespaceId.getBytes(Constants.UTF8), existsPolicy);
@@ -132,7 +132,7 @@ public class TableManager {
String statePath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE;
try {
- ZooReaderWriter.getRetryingInstance().mutate(statePath, newState.name().getBytes(Constants.UTF8), ZooUtil.PUBLIC, new Mutator() {
+ ZooReaderWriter.getInstance().mutate(statePath, newState.name().getBytes(Constants.UTF8), ZooUtil.PUBLIC, new Mutator() {
@Override
public byte[] mutate(byte[] oldData) throws Exception {
TableState oldState = TableState.UNKNOWN;
@@ -205,13 +205,13 @@ public class TableManager {
String srcTablePath = Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + srcTable + Constants.ZTABLE_CONF;
String newTablePath = Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF;
- ZooReaderWriter.getRetryingInstance().recursiveCopyPersistent(srcTablePath, newTablePath, NodeExistsPolicy.OVERWRITE);
+ ZooReaderWriter.getInstance().recursiveCopyPersistent(srcTablePath, newTablePath, NodeExistsPolicy.OVERWRITE);
for (Entry<String,String> entry : propertiesToSet.entrySet())
TablePropUtil.setTableProperty(tableId, entry.getKey(), entry.getValue());
for (String prop : propertiesToExclude)
- ZooReaderWriter.getRetryingInstance().recursiveDelete(
+ ZooReaderWriter.getInstance().recursiveDelete(
Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF + "/" + prop, NodeMissingPolicy.SKIP);
updateTableStateCache(tableId);
@@ -220,9 +220,9 @@ public class TableManager {
public void removeTable(String tableId) throws KeeperException, InterruptedException {
synchronized (tableStateCache) {
tableStateCache.remove(tableId);
- ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE,
+ ZooReaderWriter.getInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE,
NodeMissingPolicy.SKIP);
- ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId, NodeMissingPolicy.SKIP);
+ ZooReaderWriter.getInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId, NodeMissingPolicy.SKIP);
}
}
@@ -312,7 +312,7 @@ public class TableManager {
}
public void removeNamespace(String namespaceId) throws KeeperException, InterruptedException {
- ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZNAMESPACES + "/" + namespaceId, NodeMissingPolicy.SKIP);
+ ZooReaderWriter.getInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZNAMESPACES + "/" + namespaceId, NodeMissingPolicy.SKIP);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
index 4ae8335..eabdc0d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
@@ -48,7 +48,7 @@ public class UniqueNameAllocator {
final int allocate = 100 + rand.nextInt(100);
try {
- byte[] max = ZooReaderWriter.getRetryingInstance().mutate(nextNamePath, null, ZooUtil.PRIVATE, new ZooReaderWriter.Mutator() {
+ byte[] max = ZooReaderWriter.getInstance().mutate(nextNamePath, null, ZooUtil.PRIVATE, new ZooReaderWriter.Mutator() {
public byte[] mutate(byte[] currentValue) throws Exception {
long l = Long.parseLong(new String(currentValue, Constants.UTF8), Character.MAX_RADIX);
l += allocate;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
index f7c1e68..b02e5d4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
@@ -24,11 +24,11 @@ import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock;
import org.apache.zookeeper.KeeperException;
public class ZooQueueLock extends org.apache.accumulo.fate.zookeeper.ZooQueueLock {
-
+
public ZooQueueLock(String path, boolean ephemeral) throws KeeperException, InterruptedException {
- super(ZooReaderWriter.getRetryingInstance(), path, ephemeral);
+ super(ZooReaderWriter.getInstance(), path, ephemeral);
}
-
+
public static void main(String args[]) throws InterruptedException, KeeperException {
ZooQueueLock lock = new ZooQueueLock("/lock", true);
DistributedReadWriteLock rlocker = new DistributedReadWriteLock(lock, "reader".getBytes(Constants.UTF8));
@@ -50,5 +50,5 @@ public class ZooQueueLock extends org.apache.accumulo.fate.zookeeper.ZooQueueLoc
readLock.lock();
System.out.println("success");
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
index f950077..435591d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
@@ -16,25 +16,15 @@
*/
package org.apache.accumulo.server.zookeeper;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
public class ZooReaderWriter extends org.apache.accumulo.fate.zookeeper.ZooReaderWriter {
private static final String SCHEME = "digest";
private static final String USER = "accumulo";
private static ZooReaderWriter instance = null;
- private static IZooReaderWriter retryingInstance = null;
public ZooReaderWriter(String string, int timeInMillis, String secret) {
super(string, timeInMillis, SCHEME, (USER + ":" + secret).getBytes(Constants.UTF8));
@@ -49,39 +39,4 @@ public class ZooReaderWriter extends org.apache.accumulo.fate.zookeeper.ZooReade
return instance;
}
- /**
- * get an instance that retries when zookeeper connection errors occur
- *
- * @return an instance that retries when Zookeeper connection errors occur.
- */
- public static synchronized IZooReaderWriter getRetryingInstance() {
-
- if (retryingInstance == null) {
- final IZooReaderWriter inst = getInstance();
-
- InvocationHandler ih = new InvocationHandler() {
- @Override
- public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
- long retryTime = 250;
- while (true) {
- try {
- return method.invoke(inst, args);
- } catch (InvocationTargetException e) {
- if (e.getCause() instanceof KeeperException.ConnectionLossException) {
- Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + retryTime, e.getCause());
- UtilWaitThread.sleep(retryTime);
- retryTime = Math.min(5000, retryTime + 250);
- } else {
- throw e.getCause();
- }
- }
- }
- }
- };
-
- retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(ZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
- }
-
- return retryingInstance;
- }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
index 0a9508c..4a7111d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
@@ -16,14 +16,10 @@
*/
package org.apache.accumulo.server.zookeeper;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Proxy;
-
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.RetryingInvocationHandler;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -34,7 +30,6 @@ public class ZooReaderWriterFactory {
private static final String SCHEME = "digest";
private static final String USER = "accumulo";
private static IZooReaderWriter instance = null;
- private static IZooReaderWriter retryingInstance = null;
/**
* Gets a new reader/writer.
@@ -66,21 +61,4 @@ public class ZooReaderWriterFactory {
return instance;
}
}
-
- /**
- * Gets a reader/writer, retrieving ZooKeeper information from the site configuration, and that retries on connection loss. The same instance may be returned
- * for multiple calls.
- *
- * @return retrying reader/writer
- */
- public IZooReaderWriter getRetryingInstance() {
- synchronized (ZooReaderWriterFactory.class) {
- if (retryingInstance == null) {
- IZooReaderWriter inst = getInstance();
- InvocationHandler ih = new RetryingInvocationHandler(inst);
- retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(IZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
- }
- return retryingInstance;
- }
- }
}