You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/07/28 08:44:29 UTC
svn commit: r1151751 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/util/ src/main/java/org/apa...
Author: stack
Date: Thu Jul 28 06:44:27 2011
New Revision: 1151751
URL: http://svn.apache.org/viewvc?rev=1151751&view=rev
Log:
HBASE-3065 Retry all 'retryable' zk operations; e.g. connection loss
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Jul 28 06:44:27 2011
@@ -342,6 +342,7 @@ Release 0.91.0 - Unreleased
HBASE-1938 Make in-memory table scanning faster (nkeywal)
HBASE-4143 HTable.doPut(List) should check the writebuffer length every so often
(Doug Meil via Ted Yu)
+ HBASE-3065 Retry all 'retryable' zk operations; e.g. connection loss (Liyin Tang)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Thu Jul 28 06:44:27 2011
@@ -1706,8 +1706,8 @@ public class HConnectionManager {
}
this.servers.clear();
if (this.zooKeeper != null) {
- LOG.info("Closed zookeeper sessionid=0x"
- + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
+ LOG.info("Closed zookeeper sessionid=0x" +
+ Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
this.zooKeeper.close();
this.zooKeeper = null;
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Jul 28 06:44:27 2011
@@ -1145,7 +1145,7 @@ public class AssignmentManager extends Z
LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.toString());
// Async exists to set a watcher so we'll get triggered when
// unassigned node changes.
- this.zkw.getZooKeeper().exists(path, this.zkw,
+ this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw,
new ExistsUnassignedAsyncCallback(this.counter, destination), ctx);
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Thu Jul 28 06:44:27 2011
@@ -370,7 +370,7 @@ implements HMasterInterface, HMasterRegi
LOG.info("Server active/primary master; " + this.serverName +
", sessionid=0x" +
- Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) +
+ Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
", cluster-up flag was=" + wasUp);
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Thu Jul 28 06:44:27 2011
@@ -197,6 +197,7 @@ public class SplitLogManager extends Zoo
* @throws IOException
* if there was an error while splitting any log file
* @return cumulative size of the logfiles split
+ * @throws KeeperException
*/
public long splitLogDistributed(final Path logDir) throws IOException {
List<Path> logDirs = new ArrayList<Path>();
@@ -370,7 +371,8 @@ public class SplitLogManager extends Zoo
private void getDataSetWatch(String path, Long retry_count) {
- this.watcher.getZooKeeper().getData(path, this.watcher,
+ this.watcher.getRecoverableZooKeeper().getZooKeeper().
+ getData(path, this.watcher,
new GetDataAsyncCallback(), retry_count);
tot_mgr_get_data_queued.incrementAndGet();
}
@@ -524,7 +526,8 @@ public class SplitLogManager extends Zoo
private void deleteNode(String path, Long retries) {
tot_mgr_node_delete_queued.incrementAndGet();
- this.watcher.getZooKeeper().delete(path, -1, new DeleteAsyncCallback(),
+ this.watcher.getRecoverableZooKeeper().getZooKeeper().
+ delete(path, -1, new DeleteAsyncCallback(),
retries);
}
@@ -551,9 +554,11 @@ public class SplitLogManager extends Zoo
/**
* signal the workers that a task was resubmitted by creating the
* RESCAN node.
+ * @throws KeeperException
*/
private void createRescanNode(long retries) {
- watcher.getZooKeeper().create(ZKSplitLog.getRescanNode(watcher),
+ this.watcher.getRecoverableZooKeeper().getZooKeeper().
+ create(ZKSplitLog.getRescanNode(watcher),
TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL,
new CreateRescanAsyncCallback(), new Long(retries));
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Jul 28 06:44:27 2011
@@ -879,7 +879,7 @@ public class HRegionServer implements HR
LOG.info("Serving as " + this.serverNameFromMasterPOV +
", RPC listening on " + this.isa +
", sessionid=0x" +
- Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
+ Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
isOnline = true;
} catch (Throwable e) {
this.isOnline = false;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Thu Jul 28 06:44:27 2011
@@ -329,7 +329,7 @@ public class SplitLogWorker extends ZooK
*/
private boolean ownTask(boolean isFirstTime) {
try {
- Stat stat = this.watcher.getZooKeeper().setData(currentTask,
+ Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask,
TaskState.TASK_OWNED.get(serverName), currentVersion);
if (stat == null) {
LOG.warn("zk.setData() returned null for path " + currentTask);
@@ -392,8 +392,9 @@ public class SplitLogWorker extends ZooK
}
void getDataSetWatchAsync() {
- this.watcher.getZooKeeper().getData(currentTask, this.watcher,
- new GetDataAsyncCallback(), null);
+ this.watcher.getRecoverableZooKeeper().getZooKeeper().
+ getData(currentTask, this.watcher,
+ new GetDataAsyncCallback(), null);
tot_wkr_get_data_queued.incrementAndGet();
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Thu Jul 28 06:44:27 2011
@@ -310,7 +310,16 @@ public class Bytes {
public static String toStringBinary(final byte [] b) {
return toStringBinary(b, 0, b.length);
}
-
+
+ /**
+ * The same as {@link #toStringBinary(byte[])}, but returns a string "null"
+ * if given a null argument.
+ */
+ public static String toStringBinarySafe(final byte [] b) {
+ if (b == null)
+ return "null";
+ return toStringBinary(b, 0, b.length);
+ }
/**
* Write a printable representation of a byte array. Non-printable
* characters are hex escaped in the format \\x%02X, eg:
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java?rev=1151751&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java Thu Jul 28 06:44:27 2011
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.TimeUnit;
+
+public class RetryCounter {
+ private final int maxRetries;
+ private int retriesRemaining;
+ private final int retryIntervalMillis;
+ private final TimeUnit timeUnit;
+
+ public RetryCounter(int maxRetries,
+ int retryIntervalMillis, TimeUnit timeUnit) {
+ this.maxRetries = maxRetries;
+ this.retriesRemaining = maxRetries;
+ this.retryIntervalMillis = retryIntervalMillis;
+ this.timeUnit = timeUnit;
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public void sleepUntilNextRetry() throws InterruptedException {
+ timeUnit.sleep(retryIntervalMillis);
+ }
+
+ public boolean shouldRetry() {
+ return retriesRemaining > 0;
+ }
+
+ public void useRetry() {
+ retriesRemaining--;
+ }
+
+ public int getAttemptTimes() {
+ return maxRetries-retriesRemaining+1;
+ }
+}
\ No newline at end of file
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java?rev=1151751&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java Thu Jul 28 06:44:27 2011
@@ -0,0 +1,38 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.TimeUnit;
+
+public class RetryCounterFactory {
+ private final int maxRetries;
+ private final int retryIntervalMillis;
+
+ public RetryCounterFactory(int maxRetries, int retryIntervalMillis) {
+ this.maxRetries = maxRetries;
+ this.retryIntervalMillis = retryIntervalMillis;
+ }
+
+ public RetryCounter create() {
+ return new RetryCounter(
+ maxRetries, retryIntervalMillis, TimeUnit.MILLISECONDS
+ );
+ }
+}
\ No newline at end of file
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java?rev=1151751&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java Thu Jul 28 06:44:27 2011
@@ -0,0 +1,661 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * A zookeeper that can handle 'recoverable' errors.
+ * To handle recoverable errors, developers need to realize that there are two
+ * classes of requests: idempotent and non-idempotent requests. Read requests
+ * and unconditional sets and deletes are examples of idempotent requests, they
+ * can be reissued with the same results.
+ * (Although, the delete may throw a NoNodeException on reissue its effect on
+ * the ZooKeeper state is the same.) Non-idempotent requests need special
+ * handling, application and library writers need to keep in mind that they may
+ * need to encode information in the data or name of znodes to detect
+ * retries. A simple example is a create that uses a sequence flag.
+ * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
+ * loss exception, that process will reissue another
+ * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
+ * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
+ * that x-109 was the result of the previous create, so the process actually
+ * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
+ * when doing the create. If the process is using an id of 352, before reissuing
+ * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
+ * "x-352-109", x-333-110". The process will know that the original create
+ * succeeded an the znode it created is "x-352-109".
+ * @see http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
+ */
+public class RecoverableZooKeeper {
+ private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
+ // the actual ZooKeeper client instance
+ private ZooKeeper zk;
+ private final RetryCounterFactory retryCounterFactory;
+ // An identifier of this process in the cluster
+ private final String identifier;
+ private final byte[] id;
+ private int retryIntervalMillis;
+
+ private static final int ID_OFFSET = Bytes.SIZEOF_INT;
+ // the magic number is to be backward compatible
+ private static final byte MAGIC =(byte) 0XFF;
+ private static final int MAGIC_OFFSET = Bytes.SIZEOF_BYTE;
+
+ public RecoverableZooKeeper(String quorumServers, int seesionTimeout,
+ Watcher watcher, int maxRetries, int retryIntervalMillis)
+ throws IOException {
+ this.zk = new ZooKeeper(quorumServers, seesionTimeout, watcher);
+ this.retryCounterFactory =
+ new RetryCounterFactory(maxRetries, retryIntervalMillis);
+ this.retryIntervalMillis = retryIntervalMillis;
+
+ // the identifier = processID@hostName
+ this.identifier = ManagementFactory.getRuntimeMXBean().getName();
+ LOG.info("The identifier of this process is " + identifier);
+ this.id = Bytes.toBytes(identifier);
+ }
+
+ /**
+ * delete is an idempotent operation. Retry before throw out exception.
+ * This function will not throw out NoNodeException if the path is not existed
+ * @param path
+ * @param version
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public void delete(String path, int version)
+ throws InterruptedException, KeeperException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ boolean isRetry = false; // False for first attempt, true for all retries.
+ while (true) {
+ try {
+ zk.delete(path, version);
+ return;
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case NONODE:
+ if (isRetry) {
+ LOG.info("Node " + path + " already deleted. Assuming that a " +
+ "previous attempt succeeded.");
+ return;
+ }
+ LOG.warn("Node " + path + " already deleted, and this is not a " +
+ "retry");
+ throw e;
+
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ LOG.warn("Possibly transient ZooKeeper exception: " + e);
+ if (!retryCounter.shouldRetry()) {
+ LOG.error("ZooKeeper delete failed after "
+ + retryCounter.getMaxRetries() + " retries");
+ throw e;
+ }
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+ "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ isRetry = true;
+ }
+ }
+
+ /**
+ * exists is an idempotent operation. Retry before throw out exception
+ * @param path
+ * @param watcher
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public Stat exists(String path, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ return zk.exists(path, watcher);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ LOG.warn("Possibly transient ZooKeeper exception: " + e);
+ if (!retryCounter.shouldRetry()) {
+ LOG.error("ZooKeeper exists failed after "
+ + retryCounter.getMaxRetries() + " retries");
+ throw e;
+ }
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+ "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ /**
+ * exists is an idempotent operation. Retry before throw out exception
+ * @param path
+ * @param watch
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public Stat exists(String path, boolean watch)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ return zk.exists(path, watch);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ LOG.warn("Possibly transient ZooKeeper exception: " + e);
+ if (!retryCounter.shouldRetry()) {
+ LOG.error("ZooKeeper exists failed after "
+ + retryCounter.getMaxRetries() + " retries");
+ throw e;
+ }
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+ "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ /**
+ * getChildren is an idempotent operation. Retry before throw out exception
+ * @param path
+ * @param watcher
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public List<String> getChildren(String path, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ return zk.getChildren(path, watcher);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ LOG.warn("Possibly transient ZooKeeper exception: " + e);
+ if (!retryCounter.shouldRetry()) {
+ LOG.error("ZooKeeper getChildren failed after "
+ + retryCounter.getMaxRetries() + " retries");
+ throw e;
+ }
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+ "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ /**
+ * getChildren is an idempotent operation. Retry before throw out exception
+ * @param path
+ * @param watch
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public List<String> getChildren(String path, boolean watch)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ return zk.getChildren(path, watch);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ LOG.warn("Possibly transient ZooKeeper exception: " + e);
+ if (!retryCounter.shouldRetry()) {
+ LOG.error("ZooKeeper getChildren failed after "
+ + retryCounter.getMaxRetries() + " retries");
+ throw e;
+ }
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+ "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ /**
+ * getData is an idempotent operation. Retry before throw out exception
+ * @param path
+ * @param watcher
+ * @param stat
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] getData(String path, Watcher watcher, Stat stat)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ byte[] revData = zk.getData(path, watcher, stat);
+ return this.removeMetaData(revData);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ LOG.warn("Possibly transient ZooKeeper exception: " + e);
+ if (!retryCounter.shouldRetry()) {
+ LOG.error("ZooKeeper getData failed after "
+ + retryCounter.getMaxRetries() + " retries");
+ throw e;
+ }
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+ "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ /**
+ * getData is an idemnpotent operation. Retry before throw out exception
+ * @param path
+ * @param watch
+ * @param stat
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] getData(String path, boolean watch, Stat stat)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ byte[] revData = zk.getData(path, watch, stat);
+ return this.removeMetaData(revData);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ LOG.warn("Possibly transient ZooKeeper exception: " + e);
+ if (!retryCounter.shouldRetry()) {
+ LOG.error("ZooKeeper getData failed after "
+ + retryCounter.getMaxRetries() + " retries");
+ throw e;
+ }
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+ "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ /**
+ * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
+ * Adding an identifier field into the data to check whether
+ * badversion is caused by the result of previous correctly setData
+ * @param path
+ * @param data
+ * @param version
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public Stat setData(String path, byte[] data, int version)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ byte[] newData = appendMetaData(data);
+ while (true) {
+ try {
+ return zk.setData(path, newData, version);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ LOG.warn("Possibly transient ZooKeeper exception: " + e);
+ if (!retryCounter.shouldRetry()) {
+ LOG.error("ZooKeeper setData failed after "
+ + retryCounter.getMaxRetries() + " retries");
+ throw e;
+ }
+ break;
+ case BADVERSION:
+ // try to verify whether the previous setData success or not
+ try{
+ Stat stat = new Stat();
+ byte[] revData = zk.getData(path, false, stat);
+ int idLength = Bytes.toInt(revData, ID_OFFSET);
+ int dataLength = revData.length-ID_OFFSET-idLength;
+ int dataOffset = ID_OFFSET+idLength;
+
+ if(Bytes.compareTo(revData, ID_OFFSET, id.length,
+ revData, dataOffset, dataLength) == 0) {
+ // the bad version is caused by previous successful setData
+ return stat;
+ }
+ } catch(KeeperException keeperException){
+ // the ZK is not reliable at this moment. just throw out exception
+ throw keeperException;
+ }
+
+ // throw out other exceptions and verified bad version exceptions
+ default:
+ throw e;
+ }
+ }
+ LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+ "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ /**
+ * <p>
+ * NONSEQUENTIAL create is idempotent operation.
+ * Retry before throw out exceptions.
+ * But this function will not throw out the NodeExist exception back to the
+ * application.
+ * </p>
+ * <p>
+ * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
+ * identifier to the path to verify, whether the previous one is successful
+ * or not.
+ * </p>
+ *
+ * @param path
+ * @param data
+ * @param acl
+ * @param createMode
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public String create(String path, byte[] data, List<ACL> acl,
+ CreateMode createMode)
+ throws KeeperException, InterruptedException {
+ byte[] newData = appendMetaData(data);
+ switch (createMode) {
+ case EPHEMERAL:
+ case PERSISTENT:
+ return createNonSequential(path, newData, acl, createMode);
+
+ case EPHEMERAL_SEQUENTIAL:
+ case PERSISTENT_SEQUENTIAL:
+ return createSequential(path, newData, acl, createMode);
+
+ default:
+ throw new IllegalArgumentException("Unrecognized CreateMode: " +
+ createMode);
+ }
+ }
+
+ private String createNonSequential(String path, byte[] data, List<ACL> acl,
+ CreateMode createMode) throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ boolean isRetry = false; // False for first attempt, true for all retries.
+ while (true) {
+ try {
+ return zk.create(path, data, acl, createMode);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case NODEEXISTS:
+ if (isRetry) {
+ // If the connection was lost, there is still a possibility that
+ // we have successfully created the node at our previous attempt,
+ // so we read the node and compare.
+ byte[] currentData = zk.getData(path, false, null);
+ if (currentData != null &&
+ Bytes.compareTo(currentData, data) == 0) {
+ // We successfully created a non-sequential node
+ return path;
+ }
+ LOG.error("Node " + path + " already exists with " +
+ Bytes.toStringBinarySafe(currentData) + ", could not write " +
+ Bytes.toStringBinarySafe(data));
+ throw e;
+ }
+ LOG.error("Node " + path + " already exists and this is not a " +
+ "retry");
+ throw e;
+
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ LOG.warn("Possibly transient ZooKeeper exception: " + e);
+ if (!retryCounter.shouldRetry()) {
+ LOG.error("ZooKeeper create failed after "
+ + retryCounter.getMaxRetries() + " retries");
+ throw e;
+ }
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+ "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ isRetry = true;
+ }
+ }
+
+ private String createSequential(String path, byte[] data,
+ List<ACL> acl, CreateMode createMode)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ boolean first = true;
+ String newPath = path+this.identifier;
+ while (true) {
+ try {
+ if (!first) {
+ // Check if we succeeded on a previous attempt
+ String previousResult = findPreviousSequentialNode(newPath);
+ if (previousResult != null) {
+ return previousResult;
+ }
+ }
+ first = false;
+ return zk.create(newPath, data, acl, createMode);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ LOG.warn("Possibly transient ZooKeeper exception: " + e);
+ if (!retryCounter.shouldRetry()) {
+ LOG.error("ZooKeeper create failed after "
+ + retryCounter.getMaxRetries() + " retries");
+ throw e;
+ }
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+ "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ private String findPreviousSequentialNode(String path)
+ throws KeeperException, InterruptedException {
+ int lastSlashIdx = path.lastIndexOf('/');
+ assert(lastSlashIdx != -1);
+ String parent = path.substring(0, lastSlashIdx);
+ String nodePrefix = path.substring(lastSlashIdx+1);
+
+ List<String> nodes = zk.getChildren(parent, false);
+ List<String> matching = filterByPrefix(nodes, nodePrefix);
+ for (String node : matching) {
+ String nodePath = parent + "/" + node;
+ Stat stat = zk.exists(nodePath, false);
+ if (stat != null) {
+ return nodePath;
+ }
+ }
+ return null;
+ }
+
+ public byte[] removeMetaData(byte[] data) {
+ if(data == null || data.length == 0) {
+ return data;
+ }
+ // check the magic data; to be backward compatible
+ byte magic = data[0];
+ if(magic != MAGIC) {
+ return data;
+ }
+
+ int idLength = Bytes.toInt(data, MAGIC_OFFSET);
+ int dataLength = data.length-MAGIC_OFFSET-ID_OFFSET-idLength;
+ int dataOffset = MAGIC_OFFSET+ID_OFFSET+idLength;
+
+ byte[] newData = new byte[dataLength];
+ System.arraycopy(data, dataOffset, newData, 0, dataLength);
+
+ return newData;
+
+ }
+
+ private byte[] appendMetaData(byte[] data) {
+ if(data == null){
+ return null;
+ }
+
+ byte[] newData = new byte[MAGIC_OFFSET+ID_OFFSET+id.length+data.length];
+ int pos = 0;
+ pos = Bytes.putByte(newData, pos, MAGIC);
+ pos = Bytes.putInt(newData, pos, id.length);
+ pos = Bytes.putBytes(newData, pos, id, 0, id.length);
+ pos = Bytes.putBytes(newData, pos, data, 0, data.length);
+
+ return newData;
+ }
+
+ public long getSessionId() {
+ return zk.getSessionId();
+ }
+
+ public void close() throws InterruptedException {
+ zk.close();
+ }
+
+ public States getState() {
+ return zk.getState();
+ }
+
+ public ZooKeeper getZooKeeper() {
+ return zk;
+ }
+
+ public byte[] getSessionPasswd() {
+ return zk.getSessionPasswd();
+ }
+
+ public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) {
+ this.zk.sync(path, null, null);
+ }
+
+ /**
+ * Filters the given node list by the given prefixes.
+ * This method is all-inclusive--if any element in the node list starts
+ * with any of the given prefixes, then it is included in the result.
+ *
+ * @param nodes the nodes to filter
+ * @param prefixes the prefixes to include in the result
+ * @return list of every element that starts with one of the prefixes
+ */
+ private static List<String> filterByPrefix(List<String> nodes,
+ String... prefixes) {
+ List<String> lockChildren = new ArrayList<String>();
+ for (String child : nodes){
+ for (String prefix : prefixes){
+ if (child.startsWith(prefix)){
+ lockChildren.add(child);
+ break;
+ }
+ }
+ }
+ return lockChildren;
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Thu Jul 28 06:44:27 2011
@@ -70,20 +70,20 @@ public class ZKUtil {
* @return connection to zookeeper
* @throws IOException if unable to connect to zk or config problem
*/
- public static ZooKeeper connect(Configuration conf, Watcher watcher)
+ public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
throws IOException {
Properties properties = ZKConfig.makeZKProps(conf);
String ensemble = ZKConfig.getZKQuorumServersString(properties);
return connect(conf, ensemble, watcher);
}
- public static ZooKeeper connect(Configuration conf, String ensemble,
+ public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
Watcher watcher)
throws IOException {
return connect(conf, ensemble, watcher, "");
}
- public static ZooKeeper connect(Configuration conf, String ensemble,
+ public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
Watcher watcher, final String descriptor)
throws IOException {
if(ensemble == null) {
@@ -92,7 +92,11 @@ public class ZKUtil {
int timeout = conf.getInt("zookeeper.session.timeout", 180 * 1000);
LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" +
ensemble + ")");
- return new ZooKeeper(ensemble, timeout, watcher);
+ int retry = conf.getInt("zookeeper.recovery.retry", 3);
+ int retryIntervalMillis =
+ conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
+ return new RecoverableZooKeeper(ensemble, timeout, watcher,
+ retry, retryIntervalMillis);
}
//
@@ -214,7 +218,7 @@ public class ZKUtil {
public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode)
throws KeeperException {
try {
- Stat s = zkw.getZooKeeper().exists(znode, zkw);
+ Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw);
LOG.debug(zkw.prefix("Set watcher on existing znode " + znode));
return s != null ? true : false;
} catch (KeeperException e) {
@@ -242,7 +246,7 @@ public class ZKUtil {
public static int checkExists(ZooKeeperWatcher zkw, String znode)
throws KeeperException {
try {
- Stat s = zkw.getZooKeeper().exists(znode, null);
+ Stat s = zkw.getRecoverableZooKeeper().exists(znode, null);
return s != null ? s.getVersion() : -1;
} catch (KeeperException e) {
LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e);
@@ -279,7 +283,7 @@ public class ZKUtil {
ZooKeeperWatcher zkw, String znode)
throws KeeperException {
try {
- List<String> children = zkw.getZooKeeper().getChildren(znode, zkw);
+ List<String> children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw);
return children;
} catch(KeeperException.NoNodeException ke) {
LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
@@ -339,7 +343,7 @@ public class ZKUtil {
List<String> children = null;
try {
// List the children without watching
- children = zkw.getZooKeeper().getChildren(znode, null);
+ children = zkw.getRecoverableZooKeeper().getChildren(znode, null);
} catch(KeeperException.NoNodeException nne) {
return null;
} catch(InterruptedException ie) {
@@ -389,7 +393,7 @@ public class ZKUtil {
public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode)
throws KeeperException {
try {
- return !zkw.getZooKeeper().getChildren(znode, null).isEmpty();
+ return !zkw.getRecoverableZooKeeper().getChildren(znode, null).isEmpty();
} catch(KeeperException.NoNodeException ke) {
LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
"because node does not exist (not an error)"));
@@ -421,7 +425,7 @@ public class ZKUtil {
public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode)
throws KeeperException {
try {
- Stat stat = zkw.getZooKeeper().exists(znode, null);
+ Stat stat = zkw.getRecoverableZooKeeper().exists(znode, null);
return stat == null ? 0 : stat.getNumChildren();
} catch(KeeperException e) {
LOG.warn(zkw.prefix("Unable to get children of node " + znode));
@@ -443,7 +447,7 @@ public class ZKUtil {
public static byte [] getData(ZooKeeperWatcher zkw, String znode)
throws KeeperException {
try {
- byte [] data = zkw.getZooKeeper().getData(znode, null, null);
+ byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, null);
logRetrievedMsg(zkw, znode, data, false);
return data;
} catch (KeeperException.NoNodeException e) {
@@ -475,7 +479,7 @@ public class ZKUtil {
public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode)
throws KeeperException {
try {
- byte [] data = zkw.getZooKeeper().getData(znode, zkw, null);
+ byte [] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, null);
logRetrievedMsg(zkw, znode, data, true);
return data;
} catch (KeeperException.NoNodeException e) {
@@ -512,7 +516,7 @@ public class ZKUtil {
Stat stat)
throws KeeperException {
try {
- byte [] data = zkw.getZooKeeper().getData(znode, null, stat);
+ byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, stat);
logRetrievedMsg(zkw, znode, data, false);
return data;
} catch (KeeperException.NoNodeException e) {
@@ -549,7 +553,7 @@ public class ZKUtil {
byte [] data, int expectedVersion)
throws KeeperException {
try {
- zkw.getZooKeeper().setData(znode, data, expectedVersion);
+ zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion);
} catch(InterruptedException ie) {
zkw.interruptedException(ie);
}
@@ -583,7 +587,7 @@ public class ZKUtil {
byte [] data, int expectedVersion)
throws KeeperException, KeeperException.NoNodeException {
try {
- return zkw.getZooKeeper().setData(znode, data, expectedVersion) != null;
+ return zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion) != null;
} catch (InterruptedException e) {
zkw.interruptedException(e);
return false;
@@ -654,7 +658,7 @@ public class ZKUtil {
String znode, byte [] data)
throws KeeperException {
try {
- zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+ zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
} catch (KeeperException.NodeExistsException nee) {
if(!watchAndCheckExists(zkw, znode)) {
@@ -693,11 +697,11 @@ public class ZKUtil {
ZooKeeperWatcher zkw, String znode, byte [] data)
throws KeeperException {
try {
- zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+ zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
try {
- zkw.getZooKeeper().exists(znode, zkw);
+ zkw.getRecoverableZooKeeper().exists(znode, zkw);
} catch (InterruptedException e) {
zkw.interruptedException(e);
return false;
@@ -730,9 +734,9 @@ public class ZKUtil {
String znode, byte [] data)
throws KeeperException, KeeperException.NodeExistsException {
try {
- zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+ zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- return zkw.getZooKeeper().exists(znode, zkw).getVersion();
+ return zkw.getRecoverableZooKeeper().exists(znode, zkw).getVersion();
} catch (InterruptedException e) {
zkw.interruptedException(e);
return -1;
@@ -757,7 +761,7 @@ public class ZKUtil {
public static void asyncCreate(ZooKeeperWatcher zkw,
String znode, byte [] data, final AsyncCallback.StringCallback cb,
final Object ctx) {
- zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+ zkw.getRecoverableZooKeeper().getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT, cb, ctx);
}
@@ -775,7 +779,7 @@ public class ZKUtil {
String znode)
throws KeeperException {
try {
- ZooKeeper zk = zkw.getZooKeeper();
+ RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
if (zk.exists(znode, false) == null) {
zk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -783,7 +787,7 @@ public class ZKUtil {
} catch(KeeperException.NodeExistsException nee) {
} catch(KeeperException.NoAuthException nee){
try {
- if (null == zkw.getZooKeeper().exists(znode, false)) {
+ if (null == zkw.getRecoverableZooKeeper().exists(znode, false)) {
// If we failed to create the file and it does not already exist.
throw(nee);
}
@@ -813,7 +817,7 @@ public class ZKUtil {
if(znode == null) {
return;
}
- zkw.getZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ zkw.getRecoverableZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch(KeeperException.NodeExistsException nee) {
return;
@@ -845,7 +849,7 @@ public class ZKUtil {
int version)
throws KeeperException {
try {
- zkw.getZooKeeper().delete(node, version);
+ zkw.getRecoverableZooKeeper().delete(node, version);
return true;
} catch(KeeperException.BadVersionException bve) {
return false;
@@ -864,7 +868,7 @@ public class ZKUtil {
public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node)
throws KeeperException {
try {
- zkw.getZooKeeper().delete(node, -1);
+ zkw.getRecoverableZooKeeper().delete(node, -1);
} catch(KeeperException.NoNodeException nne) {
} catch(InterruptedException ie) {
zkw.interruptedException(ie);
@@ -886,7 +890,7 @@ public class ZKUtil {
deleteNodeRecursively(zkw, joinZNode(node, child));
}
}
- zkw.getZooKeeper().delete(node, -1);
+ zkw.getRecoverableZooKeeper().delete(node, -1);
} catch(InterruptedException ie) {
zkw.interruptedException(ie);
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Thu Jul 28 06:44:27 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.zookeepe
import java.io.IOException;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.util.Thre
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
/**
* Acts as the single ZooKeeper Watcher. One instance of this is instantiated
@@ -58,7 +56,7 @@ public class ZooKeeperWatcher implements
private String quorum;
// zookeeper connection
- private ZooKeeper zooKeeper;
+ private RecoverableZooKeeper recoverableZooKeeper;
// abortable in case of zk failure
private Abortable abortable;
@@ -116,51 +114,11 @@ public class ZooKeeperWatcher implements
this.identifier = descriptor;
this.abortable = abortable;
setNodeNames(conf);
- this.zooKeeper = ZKUtil.connect(conf, quorum, this, descriptor);
+ this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, descriptor);
try {
// Create all the necessary "directories" of znodes
// TODO: Move this to an init method somewhere so not everyone calls it?
-
- // The first call against zk can fail with connection loss. Seems common.
- // Apparently this is recoverable. Retry a while.
- // See http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
- // TODO: Generalize out in ZKUtil.
- long wait = conf.getLong(HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
- HConstants.DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME);
- long finished = System.currentTimeMillis() + wait;
- KeeperException ke = null;
- do {
- try {
- ZKUtil.createAndFailSilent(this, baseZNode);
- ke = null;
- break;
- } catch (KeeperException.ConnectionLossException e) {
- if (LOG.isDebugEnabled() && (isFinishedRetryingRecoverable(finished))) {
- LOG.debug("Retrying zk create for another " +
- (finished - System.currentTimeMillis()) +
- "ms; set 'hbase.zookeeper.recoverable.waittime' to change " +
- "wait time); " + e.getMessage());
- }
- ke = e;
- }
- } while (isFinishedRetryingRecoverable(finished));
- // Convert connectionloss exception to ZKCE.
- if (ke != null) {
- try {
- // If we don't close it, the zk connection managers won't be killed
- this.zooKeeper.close();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted while closing", e);
- }
- throw new ZooKeeperConnectionException("HBase is able to connect to" +
- " ZooKeeper but the connection closes immediately. This could be" +
- " a sign that the server has too many connections (30 is the" +
- " default). Consider inspecting your ZK server logs for that" +
- " error and then make sure you are reusing HBaseConfiguration" +
- " as often as you can. See HTable's javadoc for more information.",
- ke);
- }
+ ZKUtil.createAndFailSilent(this, baseZNode);
ZKUtil.createAndFailSilent(this, assignmentZNode);
ZKUtil.createAndFailSilent(this, rsZNode);
ZKUtil.createAndFailSilent(this, tableZNode);
@@ -235,8 +193,8 @@ public class ZooKeeperWatcher implements
* Get the connection to ZooKeeper.
* @return connection reference to zookeeper
*/
- public ZooKeeper getZooKeeper() {
- return zooKeeper;
+ public RecoverableZooKeeper getRecoverableZooKeeper() {
+ return recoverableZooKeeper;
}
/**
@@ -321,16 +279,16 @@ public class ZooKeeperWatcher implements
this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
while (System.currentTimeMillis() < finished) {
Threads.sleep(1);
- if (this.zooKeeper != null) break;
+ if (this.recoverableZooKeeper != null) break;
}
- if (this.zooKeeper == null) {
+ if (this.recoverableZooKeeper == null) {
LOG.error("ZK is null on connection event -- see stack trace " +
"for the stack trace when constructor was called on this zkw",
this.constructorCaller);
throw new NullPointerException("ZK is null");
}
this.identifier = this.identifier + "-0x" +
- Long.toHexString(this.zooKeeper.getSessionId());
+ Long.toHexString(this.recoverableZooKeeper.getSessionId());
// Update our identifier. Otherwise ignore.
LOG.debug(this.identifier + " connected");
break;
@@ -365,7 +323,7 @@ public class ZooKeeperWatcher implements
* is up-to-date from when we begin the operation.
*/
public void sync(String path) {
- this.zooKeeper.sync(path, null, null);
+ this.recoverableZooKeeper.sync(path, null, null);
}
/**
@@ -408,8 +366,8 @@ public class ZooKeeperWatcher implements
*/
public void close() {
try {
- if (zooKeeper != null) {
- zooKeeper.close();
+ if (recoverableZooKeeper != null) {
+ recoverableZooKeeper.close();
// super.close();
}
} catch (InterruptedException e) {
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Jul 28 06:44:27 2011
@@ -1126,7 +1126,7 @@ public class HBaseTestingUtility {
Configuration c = new Configuration(this.conf);
String quorumServers = ZKConfig.getZKQuorumServersString(c);
int sessionTimeout = 5 * 1000; // 5 seconds
- ZooKeeper zk = nodeZK.getZooKeeper();
+ ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
byte[] password = zk.getSessionPasswd();
long sessionID = zk.getSessionId();
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java Thu Jul 28 06:44:27 2011
@@ -99,9 +99,8 @@ public class TestZooKeeper {
int sessionTimeout = 5 * 1000; // 5 seconds
HConnection connection = HConnectionManager.getConnection(c);
ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher();
- long sessionID = connectionZK.getZooKeeper().getSessionId();
- byte [] password = connectionZK.getZooKeeper().getSessionPasswd();
-
+ long sessionID = connectionZK.getRecoverableZooKeeper().getSessionId();
+ byte[] password = connectionZK.getRecoverableZooKeeper().getSessionPasswd();
ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout,
EmptyWatcher.instance, sessionID, password);
LOG.info("Session timeout=" + zk.getSessionTimeout() +
@@ -116,15 +115,16 @@ public class TestZooKeeper {
// Check that the old ZK connection is closed, means we did expire
System.err.println("ZooKeeper should have timed out");
- String state = connectionZK.getZooKeeper().getState().toString();
- Assert.assertTrue("State=" + state,
- connectionZK.getZooKeeper().getState().equals(States.CLOSED));
+ String state = connectionZK.getRecoverableZooKeeper().getState().toString();
+ LOG.info("state=" + connectionZK.getRecoverableZooKeeper().getState());
+ Assert.assertTrue(connectionZK.getRecoverableZooKeeper().getState().
+ equals(States.CLOSED));
// Check that the client recovered
ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher();
- LOG.info("state=" + newConnectionZK.getZooKeeper().getState());
- Assert.assertTrue(newConnectionZK.getZooKeeper().getState().equals(
- States.CONNECTED));
+ LOG.info("state=" + newConnectionZK.getRecoverableZooKeeper().getState());
+ Assert.assertTrue(newConnectionZK.getRecoverableZooKeeper().getState().equals(
+ States.CONNECTED));
}
@Test
@@ -272,4 +272,4 @@ public class TestZooKeeper {
ZKUtil.createAndFailSilent(zk2, aclZnode);
}
-}
\ No newline at end of file
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Thu Jul 28 06:44:27 2011
@@ -198,7 +198,7 @@ public class TestSplitLogManager {
LOG.info("TestOrphanTaskAcquisition");
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
- zkw.getZooKeeper().create(tasknode,
+ zkw.getRecoverableZooKeeper().create(tasknode,
TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -231,7 +231,7 @@ public class TestSplitLogManager {
" startup");
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
//create an unassigned orphan task
- zkw.getZooKeeper().create(tasknode,
+ zkw.getRecoverableZooKeeper().create(tasknode,
TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
int version = ZKUtil.checkExists(zkw, tasknode);
@@ -391,7 +391,7 @@ public class TestSplitLogManager {
// create an orphan task in OWNED state
String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
- zkw.getZooKeeper().create(tasknode1,
+ zkw.getRecoverableZooKeeper().create(tasknode1,
TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java Thu Jul 28 06:44:27 2011
@@ -129,7 +129,7 @@ public class TestSplitLogWorker {
LOG.info("testAcquireTaskAtStartup");
ZKSplitLog.Counters.resetCounters();
- zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"),
+ zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"),
TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -161,7 +161,7 @@ public class TestSplitLogWorker {
LOG.info("testRaceForTask");
ZKSplitLog.Counters.resetCounters();
- zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"),
+ zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"),
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -200,7 +200,7 @@ public class TestSplitLogWorker {
Thread.sleep(100);
// this time create a task node after starting the splitLogWorker
- zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
+ zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -228,7 +228,7 @@ public class TestSplitLogWorker {
Thread.yield(); // let the worker start
Thread.sleep(100);
- zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
+ zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -236,7 +236,7 @@ public class TestSplitLogWorker {
// now the worker is busy doing the above task
// create another task
- zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"),
+ zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"),
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -264,7 +264,7 @@ public class TestSplitLogWorker {
Thread.yield(); // let the worker start
Thread.sleep(100);
- zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"),
+ zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"),
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -277,7 +277,7 @@ public class TestSplitLogWorker {
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
// create a RESCAN node
- zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"),
+ zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"),
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java Thu Jul 28 06:44:27 2011
@@ -139,7 +139,7 @@ public class TestSplitTransactionOnClust
String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(),
hri.getEncodedName());
Stat stats =
- t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false);
+ t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
RegionTransitionData rtd =
ZKAssign.getData(t.getConnection().getZooKeeperWatcher(),
@@ -162,7 +162,7 @@ public class TestSplitTransactionOnClust
assertTrue(daughters.contains(r));
}
// Finally assert that the ephemeral SPLIT znode was cleaned up.
- stats = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false);
+ stats = t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats);
assertTrue(stats == null);
} finally {