You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2019/06/07 16:41:59 UTC
[accumulo] 01/01: Merge branch '1.9' into 2.0
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 3bdb6cd94d0fbdd88bff6da4aeb682bd4391c847
Merge: c8776a9 3311218
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Fri Jun 7 12:41:25 2019 -0400
Merge branch '1.9' into 2.0
core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java | 4 +---
core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java | 4 ++--
2 files changed, 3 insertions(+), 5 deletions(-)
diff --cc core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
index 98e49d3,0000000..0bddb07
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
+++ b/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
@@@ -1,263 -1,0 +1,261 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.fate.util.Retry;
+import org.apache.accumulo.fate.util.Retry.RetryFactory;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.ZooKeeperConnectionInfo;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooReader implements IZooReader {
+ private static final Logger log = LoggerFactory.getLogger(ZooReader.class);
+
+ protected String keepers;
+ protected int timeout;
+ private final RetryFactory retryFactory;
- private final ZooKeeperConnectionInfo info;
+
+ protected ZooKeeper getSession(String keepers, int timeout, String scheme, byte[] auth) {
+ return ZooSession.getSession(keepers, timeout, scheme, auth);
+ }
+
+ protected ZooKeeper getZooKeeper() {
+ return getSession(keepers, timeout, null, null);
+ }
+
+ protected RetryFactory getRetryFactory() {
+ return retryFactory;
+ }
+
+ protected void retryOrThrow(Retry retry, KeeperException e) throws KeeperException {
+ log.warn("Saw (possibly) transient exception communicating with ZooKeeper", e);
+ if (retry.canRetry()) {
+ retry.useRetry();
+ return;
+ }
+
+ log.error("Retry attempts ({}) exceeded trying to communicate with ZooKeeper",
+ retry.retriesCompleted());
+ throw e;
+ }
+
+ @Override
+ public byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException {
+ return getData(zPath, false, stat);
+ }
+
+ @Override
+ public byte[] getData(String zPath, boolean watch, Stat stat)
+ throws KeeperException, InterruptedException {
+ final Retry retry = getRetryFactory().createRetry();
+ while (true) {
+ try {
+ return getZooKeeper().getData(zPath, watch, stat);
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
+ || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ @Override
+ public byte[] getData(String zPath, Watcher watcher, Stat stat)
+ throws KeeperException, InterruptedException {
+ final Retry retry = getRetryFactory().createRetry();
+ while (true) {
+ try {
+ return getZooKeeper().getData(zPath, watcher, stat);
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
+ || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ @Override
+ public Stat getStatus(String zPath) throws KeeperException, InterruptedException {
+ final Retry retry = getRetryFactory().createRetry();
+ while (true) {
+ try {
+ return getZooKeeper().exists(zPath, false);
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
+ || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ @Override
+ public Stat getStatus(String zPath, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ final Retry retry = getRetryFactory().createRetry();
+ while (true) {
+ try {
+ return getZooKeeper().exists(zPath, watcher);
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
+ || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ @Override
+ public List<String> getChildren(String zPath) throws KeeperException, InterruptedException {
+ final Retry retry = getRetryFactory().createRetry();
+ while (true) {
+ try {
+ return getZooKeeper().getChildren(zPath, false);
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
+ || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ @Override
+ public List<String> getChildren(String zPath, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ final Retry retry = getRetryFactory().createRetry();
+ while (true) {
+ try {
+ return getZooKeeper().getChildren(zPath, watcher);
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
+ || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ @Override
+ public boolean exists(String zPath) throws KeeperException, InterruptedException {
+ final Retry retry = getRetryFactory().createRetry();
+ while (true) {
+ try {
+ return getZooKeeper().exists(zPath, false) != null;
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
+ || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ @Override
+ public boolean exists(String zPath, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ final Retry retry = getRetryFactory().createRetry();
+ while (true) {
+ try {
+ return getZooKeeper().exists(zPath, watcher) != null;
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
+ || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ @Override
+ public void sync(final String path) throws KeeperException, InterruptedException {
+ final AtomicInteger rc = new AtomicInteger();
+ final CountDownLatch waiter = new CountDownLatch(1);
+ getZooKeeper().sync(path, (code, arg1, arg2) -> {
+ rc.set(code);
+ waiter.countDown();
+ }, null);
+ waiter.await();
+ Code code = Code.get(rc.get());
+ if (code != KeeperException.Code.OK) {
+ throw KeeperException.create(code);
+ }
+ }
+
+ @Override
+ public List<ACL> getACL(String zPath, Stat stat) throws KeeperException, InterruptedException {
- return ZooUtil.getACL(info, zPath, stat);
++ return ZooUtil.getACL(getZooKeeper(), zPath, stat);
+ }
+
+ public ZooReader(String keepers, int timeout) {
+ this.keepers = keepers;
+ this.timeout = timeout;
+ this.retryFactory = ZooUtil.DEFAULT_RETRY;
- this.info = new ZooKeeperConnectionInfo(keepers, timeout, null, null);
+ }
+}
diff --cc core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
index 032986f,0000000..6573614
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
+++ b/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
@@@ -1,617 -1,0 +1,617 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.volume.VolumeConfiguration;
+import org.apache.accumulo.fate.util.Retry;
+import org.apache.accumulo.fate.util.Retry.RetryFactory;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooUtil {
+
+ public static final RetryFactory DEFAULT_RETRY = Retry.builder().maxRetries(10)
+ .retryAfter(250, MILLISECONDS).incrementBy(250, MILLISECONDS).maxWait(5, TimeUnit.SECONDS)
+ .backOffFactor(1.5).logInterval(3, TimeUnit.MINUTES).createFactory();
+
+ private static final Logger log = LoggerFactory.getLogger(ZooUtil.class);
+
+ public enum NodeExistsPolicy {
+ SKIP, OVERWRITE, FAIL
+ }
+
+ public enum NodeMissingPolicy {
+ SKIP, CREATE, FAIL
+ }
+
+ public static class LockID {
+ public long eid;
+ public String path;
+ public String node;
+
+ public LockID(String root, String serializedLID) {
+ String[] sa = serializedLID.split("\\$");
+ int lastSlash = sa[0].lastIndexOf('/');
+
+ if (sa.length != 2 || lastSlash < 0) {
+ throw new IllegalArgumentException("Malformed serialized lock id " + serializedLID);
+ }
+
+ if (lastSlash == 0)
+ path = root;
+ else
+ path = root + "/" + sa[0].substring(0, lastSlash);
+ node = sa[0].substring(lastSlash + 1);
+ eid = new BigInteger(sa[1], 16).longValue();
+ }
+
+ public LockID(String path, String node, long eid) {
+ this.path = path;
+ this.node = node;
+ this.eid = eid;
+ }
+
+ public String serialize(String root) {
+
+ return path.substring(root.length()) + "/" + node + "$" + Long.toHexString(eid);
+ }
+
+ @Override
+ public String toString() {
+ return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid);
+ }
+ }
+
+ protected static class ZooKeeperConnectionInfo {
+ String keepers, scheme;
+ int timeout;
+ byte[] auth;
+
+ public ZooKeeperConnectionInfo(String keepers, int timeout, String scheme, byte[] auth) {
+ requireNonNull(keepers);
+ this.keepers = keepers;
+ this.timeout = timeout;
+ this.scheme = scheme;
+ this.auth = auth;
+ }
+
+ @Override
+ public int hashCode() {
+ final HashCodeBuilder hcb = new HashCodeBuilder(31, 47);
+ hcb.append(keepers).append(timeout);
+ if (scheme != null) {
+ hcb.append(scheme);
+ }
+ if (auth != null) {
+ hcb.append(auth);
+ }
+ return hcb.toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ZooKeeperConnectionInfo) {
+ ZooKeeperConnectionInfo other = (ZooKeeperConnectionInfo) o;
+ if (!keepers.equals(other.keepers) || timeout != other.timeout) {
+ return false;
+ }
+
+ if (scheme != null) {
+ if (other.scheme == null) {
+ // Ours is non-null, theirs is null
+ return false;
+ } else if (!scheme.equals(other.scheme)) {
+ // Both non-null but not equal
+ return false;
+ }
+ }
+
+ if (auth != null) {
+ if (other.auth == null) {
+ return false;
+ } else {
+ return Arrays.equals(auth, other.auth);
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(64);
+ sb.append("zookeepers=").append(keepers);
+ sb.append(", timeout=").append(timeout);
+ sb.append(", scheme=").append(scheme);
+ sb.append(", auth=").append(auth == null ? "null" : "REDACTED");
+ return sb.toString();
+ }
+ }
+
+ public static final List<ACL> PRIVATE;
+ public static final List<ACL> PUBLIC;
+ private static final RetryFactory RETRY_FACTORY;
+
+ static {
+ PRIVATE = new ArrayList<>();
+ PRIVATE.addAll(Ids.CREATOR_ALL_ACL);
+ PUBLIC = new ArrayList<>();
+ PUBLIC.addAll(PRIVATE);
+ PUBLIC.add(new ACL(Perms.READ, Ids.ANYONE_ID_UNSAFE));
+ RETRY_FACTORY = DEFAULT_RETRY;
+ }
+
+ protected static ZooKeeper getZooKeeper(ZooKeeperConnectionInfo info) {
+ return getZooKeeper(info.keepers, info.timeout, info.scheme, info.auth);
+ }
+
+ protected static ZooKeeper getZooKeeper(String keepers, int timeout, String scheme, byte[] auth) {
+ return ZooSession.getSession(keepers, timeout, scheme, auth);
+ }
+
+ protected static void retryOrThrow(Retry retry, KeeperException e) throws KeeperException {
+ log.warn("Saw (possibly) transient exception communicating with ZooKeeper", e);
+ if (retry.canRetry()) {
+ retry.useRetry();
+ return;
+ }
+
+ log.error("Retry attempts ({}) exceeded trying to communicate with ZooKeeper",
+ retry.retriesCompleted());
+ throw e;
+ }
+
+ /**
+ * This method will delete a node and all its children from zookeeper
+ *
+ * @param zPath
+ * the path to delete
+ */
+ static void recursiveDelete(ZooKeeperConnectionInfo info, String zPath, NodeMissingPolicy policy)
+ throws KeeperException, InterruptedException {
+ if (policy.equals(NodeMissingPolicy.CREATE))
+ throw new IllegalArgumentException(policy.name() + " is invalid for this operation");
+ try {
+ List<String> children;
+ final Retry retry = RETRY_FACTORY.createRetry();
+ while (true) {
+ try {
+ children = getZooKeeper(info).getChildren(zPath, false);
+ break;
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+ retry.waitForNextAttempt();
+ }
+ for (String child : children)
+ recursiveDelete(info, zPath + "/" + child, NodeMissingPolicy.SKIP);
+
+ Stat stat;
+ while (true) {
+ try {
+ stat = getZooKeeper(info).exists(zPath, null);
+ // Node exists
+ if (stat != null) {
+ try {
+ // Try to delete it. We don't care if there was an update to the node
+ // since we got the Stat, just delete all versions (-1).
+ getZooKeeper(info).delete(zPath, -1);
+ return;
+ } catch (NoNodeException e) {
+ // If the node is gone now, it's ok if we have SKIP
+ if (policy.equals(NodeMissingPolicy.SKIP)) {
+ return;
+ }
+ throw e;
+ }
+ // Let other KeeperException bubble to the outer catch
+ } else {
+ // If the stat is null, the node is now gone which is fine.
+ return;
+ }
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ } catch (KeeperException e) {
+ if (policy.equals(NodeMissingPolicy.SKIP) && e.code().equals(KeeperException.Code.NONODE))
+ return;
+ throw e;
+ }
+ }
+
+ /**
+ * Create a persistent node with the default ACL
+ *
+ * @return true if the node was created or altered; false if it was skipped
+ */
+ public static boolean putPersistentData(ZooKeeperConnectionInfo info, String zPath, byte[] data,
+ NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ return putData(info, zPath, data, CreateMode.PERSISTENT, -1, policy, PUBLIC);
+ }
+
+ public static boolean putPersistentData(ZooKeeperConnectionInfo info, String zPath, byte[] data,
+ int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ return putData(info, zPath, data, CreateMode.PERSISTENT, version, policy, PUBLIC);
+ }
+
+ public static boolean putPersistentData(ZooKeeperConnectionInfo info, String zPath, byte[] data,
+ int version, NodeExistsPolicy policy, List<ACL> acls)
+ throws KeeperException, InterruptedException {
+ return putData(info, zPath, data, CreateMode.PERSISTENT, version, policy, acls);
+ }
+
+ private static boolean putData(ZooKeeperConnectionInfo info, String zPath, byte[] data,
+ CreateMode mode, int version, NodeExistsPolicy policy, List<ACL> acls)
+ throws KeeperException, InterruptedException {
+ if (policy == null)
+ policy = NodeExistsPolicy.FAIL;
+
+ final Retry retry = RETRY_FACTORY.createRetry();
+ while (true) {
+ try {
+ getZooKeeper(info).create(zPath, data, acls, mode);
+ return true;
+ } catch (KeeperException e) {
+ final Code code = e.code();
+ if (code == Code.NODEEXISTS) {
+ switch (policy) {
+ case SKIP:
+ return false;
+ case OVERWRITE:
+ // overwrite the data in the node when it already exists
+ try {
+ getZooKeeper(info).setData(zPath, data, version);
+ return true;
+ } catch (KeeperException e2) {
+ final Code code2 = e2.code();
+ if (code2 == Code.NONODE) {
+ // node delete between create call and set data, so try create call again
+ continue;
+ } else if (code2 == Code.CONNECTIONLOSS || code2 == Code.OPERATIONTIMEOUT
+ || code2 == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e2);
+ break;
+ } else {
+ // unhandled exception on setData()
+ throw e2;
+ }
+ }
+ default:
+ throw e;
+ }
+ } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
+ || code == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ // unhandled exception on create()
+ throw e;
+ }
+ }
+
+ // Catch all to wait before retrying
+ retry.waitForNextAttempt();
+ }
+ }
+
+ public static byte[] getData(ZooKeeperConnectionInfo info, String zPath, Stat stat)
+ throws KeeperException, InterruptedException {
+ final Retry retry = RETRY_FACTORY.createRetry();
+ while (true) {
+ try {
+ return getZooKeeper(info).getData(zPath, false, stat);
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ public static Stat getStatus(ZooKeeperConnectionInfo info, String zPath)
+ throws KeeperException, InterruptedException {
+ final Retry retry = RETRY_FACTORY.createRetry();
+ while (true) {
+ try {
+ return getZooKeeper(info).exists(zPath, false);
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ public static boolean exists(ZooKeeperConnectionInfo info, String zPath)
+ throws KeeperException, InterruptedException {
+ return getStatus(info, zPath) != null;
+ }
+
+ public static void recursiveCopyPersistent(ZooKeeperConnectionInfo info, String source,
+ String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ Stat stat = null;
+ if (!exists(info, source))
+ throw KeeperException.create(Code.NONODE, source);
+ if (exists(info, destination)) {
+ switch (policy) {
+ case OVERWRITE:
+ break;
+ case SKIP:
+ return;
+ case FAIL:
+ default:
+ throw KeeperException.create(Code.NODEEXISTS, source);
+ }
+ }
+
+ stat = new Stat();
+ byte[] data = getData(info, source, stat);
+
+ if (stat.getEphemeralOwner() == 0) {
+ if (data == null)
+ throw KeeperException.create(Code.NONODE, source);
+ putPersistentData(info, destination, data, policy);
+ if (stat.getNumChildren() > 0) {
+ List<String> children;
+ final Retry retry = RETRY_FACTORY.createRetry();
+ while (true) {
+ try {
+ children = getZooKeeper(info).getChildren(source, false);
+ break;
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT
+ || c == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+ retry.waitForNextAttempt();
+ }
+ for (String child : children) {
+ recursiveCopyPersistent(info, source + "/" + child, destination + "/" + child, policy);
+ }
+ }
+ }
+ }
+
+ public static boolean putPrivatePersistentData(ZooKeeperConnectionInfo info, String zPath,
+ byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ return putData(info, zPath, data, CreateMode.PERSISTENT, -1, policy, PRIVATE);
+ }
+
+ public static String putPersistentSequential(ZooKeeperConnectionInfo info, String zPath,
+ byte[] data) throws KeeperException, InterruptedException {
+ final Retry retry = RETRY_FACTORY.createRetry();
+ while (true) {
+ try {
+ return getZooKeeper(info).create(zPath, data, ZooUtil.PUBLIC,
+ CreateMode.PERSISTENT_SEQUENTIAL);
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ public static String putEphemeralData(ZooKeeperConnectionInfo info, String zPath, byte[] data)
+ throws KeeperException, InterruptedException {
+ final Retry retry = RETRY_FACTORY.createRetry();
+ while (true) {
+ try {
+ return getZooKeeper(info).create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL);
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ public static String putEphemeralSequential(ZooKeeperConnectionInfo info, String zPath,
+ byte[] data) throws KeeperException, InterruptedException {
+ final Retry retry = RETRY_FACTORY.createRetry();
+ while (true) {
+ try {
+ return getZooKeeper(info).create(zPath, data, ZooUtil.PUBLIC,
+ CreateMode.EPHEMERAL_SEQUENTIAL);
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ public static byte[] getLockData(ZooCache zc, String path) {
+
+ List<String> children = zc.getChildren(path);
+
+ if (children == null || children.size() == 0) {
+ return null;
+ }
+
+ children = new ArrayList<>(children);
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+
+ return zc.get(path + "/" + lockNode);
+ }
+
+ public static boolean isLockHeld(ZooKeeperConnectionInfo info, LockID lid)
+ throws KeeperException, InterruptedException {
+ final Retry retry = RETRY_FACTORY.createRetry();
+ while (true) {
+ try {
+ List<String> children = getZooKeeper(info).getChildren(lid.path, false);
+
+ if (children.size() == 0) {
+ return false;
+ }
+
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+ if (!lid.node.equals(lockNode))
+ return false;
+
+ Stat stat = getZooKeeper(info).exists(lid.path + "/" + lid.node, false);
+ return stat != null && stat.getEphemeralOwner() == lid.eid;
+ } catch (KeeperException ex) {
+ final Code c = ex.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, ex);
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
- public static List<ACL> getACL(ZooKeeperConnectionInfo info, String zPath, Stat stat)
++ public static List<ACL> getACL(ZooKeeper zk, String zPath, Stat stat)
+ throws KeeperException, InterruptedException {
+ final Retry retry = RETRY_FACTORY.createRetry();
+ while (true) {
+ try {
- return getZooKeeper(info).getACL(zPath, stat);
++ return zk.getACL(zPath, stat);
+ } catch (KeeperException e) {
+ final Code c = e.code();
+ if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+ retryOrThrow(retry, e);
+ } else {
+ throw e;
+ }
+ }
+
+ retry.waitForNextAttempt();
+ }
+ }
+
+ public static String getRoot(final String instanceId) {
+ return Constants.ZROOT + "/" + instanceId;
+ }
+
+ public static String getInstanceIDFromHdfs(Path instanceDirectory, AccumuloConfiguration conf,
+ Configuration hadoopConf) {
+ try {
+ FileSystem fs = VolumeConfiguration.getVolume(instanceDirectory.toString(), hadoopConf, conf)
+ .getFileSystem();
+ FileStatus[] files = null;
+ try {
+ files = fs.listStatus(instanceDirectory);
+ } catch (FileNotFoundException ex) {
+ // ignored
+ }
+ log.debug("Trying to read instance id from {}", instanceDirectory);
+ if (files == null || files.length == 0) {
+ log.error("unable obtain instance id at {}", instanceDirectory);
+ throw new RuntimeException(
+ "Accumulo not initialized, there is no instance id at " + instanceDirectory);
+ } else if (files.length != 1) {
+ log.error("multiple potential instances in {}", instanceDirectory);
+ throw new RuntimeException(
+ "Accumulo found multiple possible instance ids in " + instanceDirectory);
+ } else {
+ return files[0].getPath().getName();
+ }
+ } catch (IOException e) {
+ log.error("Problem reading instance id out of hdfs at " + instanceDirectory, e);
+ throw new RuntimeException(
+ "Can't tell if Accumulo is initialized; can't read instance id at " + instanceDirectory,
+ e);
+ } catch (IllegalArgumentException exception) {
+ /* HDFS throws this when there's a UnknownHostException due to DNS troubles. */
+ if (exception.getCause() instanceof UnknownHostException) {
+ log.error("Problem reading instance id out of hdfs at " + instanceDirectory, exception);
+ }
+ throw exception;
+ }
+ }
+
+}