You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/17 21:20:35 UTC
[03/13] hbase git commit: HBASE-19114 Split out o.a.h.h.zookeeper
from hbase-server and hbase-client
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
new file mode 100644
index 0000000..2f2b036
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.UUID;
+
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Publishes and synchronizes a unique identifier specific to a given HBase
+ * cluster. The stored identifier is read from the file system by the active
+ * master on startup, and is subsequently available to all watchers (including
+ * clients).
+ */
+@InterfaceAudience.Private
+public class ZKClusterId {
+ private ZKWatcher watcher;
+ private Abortable abortable;
+ private String id;
+
+ public ZKClusterId(ZKWatcher watcher, Abortable abortable) {
+ this.watcher = watcher;
+ this.abortable = abortable;
+ }
+
+ public boolean hasId() {
+ return getId() != null;
+ }
+
+ public String getId() {
+ try {
+ if (id == null) {
+ id = readClusterIdZNode(watcher);
+ }
+ } catch (KeeperException ke) {
+ abortable.abort("Unexpected exception from ZooKeeper reading cluster ID",
+ ke);
+ }
+ return id;
+ }
+
+ public static String readClusterIdZNode(ZKWatcher watcher)
+ throws KeeperException {
+ if (ZKUtil.checkExists(watcher, watcher.znodePaths.clusterIdZNode) != -1) {
+ byte [] data;
+ try {
+ data = ZKUtil.getData(watcher, watcher.znodePaths.clusterIdZNode);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ if (data != null) {
+ try {
+ return ClusterId.parseFrom(data).toString();
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ }
+ }
+ }
+ return null;
+ }
+
+ public static void setClusterId(ZKWatcher watcher, ClusterId id)
+ throws KeeperException {
+ ZKUtil.createSetData(watcher, watcher.znodePaths.clusterIdZNode, id.toByteArray());
+ }
+
+ /**
+ * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
+ * @param zkw watcher connected to an ensemble
+ * @return the UUID read from zookeeper
+ * @throws KeeperException
+ */
+ public static UUID getUUIDForCluster(ZKWatcher zkw) throws KeeperException {
+ String uuid = readClusterIdZNode(zkw);
+ return uuid == null ? null : UUID.fromString(uuid);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
new file mode 100644
index 0000000..edd2ccd
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Handles coordination of a single "leader" instance among many possible
+ * candidates. The first {@link ZKLeaderManager} to successfully create
+ * the given znode becomes the leader, allowing the instance to continue
+ * with whatever processing must be protected. Other {@link ZKLeaderManager}
+ * instances will wait to be notified of changes to the leader znode.
+ * If the current master instance fails, the ephemeral leader znode will
+ * be removed, and all waiting instances will be notified, with the race
+ * to claim the leader znode beginning all over again.
+ * @deprecated Not used
+ */
+@Deprecated
+@InterfaceAudience.Private
+public class ZKLeaderManager extends ZKListener {
+ private static final Log LOG = LogFactory.getLog(ZKLeaderManager.class);
+
+ private final AtomicBoolean leaderExists = new AtomicBoolean();
+ private String leaderZNode;
+ private byte[] nodeId;
+ private Stoppable candidate;
+
+ public ZKLeaderManager(ZKWatcher watcher, String leaderZNode,
+ byte[] identifier, Stoppable candidate) {
+ super(watcher);
+ this.leaderZNode = leaderZNode;
+ this.nodeId = identifier;
+ this.candidate = candidate;
+ }
+
+ public void start() {
+ try {
+ watcher.registerListener(this);
+ String parent = ZKUtil.getParent(leaderZNode);
+ if (ZKUtil.checkExists(watcher, parent) < 0) {
+ ZKUtil.createWithParents(watcher, parent);
+ }
+ } catch (KeeperException ke) {
+ watcher.abort("Unhandled zk exception when starting", ke);
+ candidate.stop("Unhandled zk exception starting up: "+ke.getMessage());
+ }
+ }
+
+ @Override
+ public void nodeCreated(String path) {
+ if (leaderZNode.equals(path) && !candidate.isStopped()) {
+ handleLeaderChange();
+ }
+ }
+
+ @Override
+ public void nodeDeleted(String path) {
+ if (leaderZNode.equals(path) && !candidate.isStopped()) {
+ handleLeaderChange();
+ }
+ }
+
+ private void handleLeaderChange() {
+ try {
+ synchronized(leaderExists) {
+ if (ZKUtil.watchAndCheckExists(watcher, leaderZNode)) {
+ LOG.info("Found new leader for znode: "+leaderZNode);
+ leaderExists.set(true);
+ } else {
+ LOG.info("Leader change, but no new leader found");
+ leaderExists.set(false);
+ leaderExists.notifyAll();
+ }
+ }
+ } catch (KeeperException ke) {
+ watcher.abort("ZooKeeper error checking for leader znode", ke);
+ candidate.stop("ZooKeeper error checking for leader: "+ke.getMessage());
+ }
+ }
+
+ /**
+ * Blocks until this instance has claimed the leader ZNode in ZooKeeper
+ */
+ public void waitToBecomeLeader() {
+ while (!candidate.isStopped()) {
+ try {
+ if (ZKUtil.createEphemeralNodeAndWatch(watcher, leaderZNode, nodeId)) {
+ // claimed the leader znode
+ leaderExists.set(true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Claimed the leader znode as '"+
+ Bytes.toStringBinary(nodeId)+"'");
+ }
+ return;
+ }
+
+ // if claiming the node failed, there should be another existing node
+ byte[] currentId = ZKUtil.getDataAndWatch(watcher, leaderZNode);
+ if (currentId != null && Bytes.equals(currentId, nodeId)) {
+ // claimed with our ID, but we didn't grab it, possibly restarted?
+ LOG.info("Found existing leader with our ID ("+
+ Bytes.toStringBinary(nodeId)+"), removing");
+ ZKUtil.deleteNode(watcher, leaderZNode);
+ leaderExists.set(false);
+ } else {
+ LOG.info("Found existing leader with ID: "+Bytes.toStringBinary(nodeId));
+ leaderExists.set(true);
+ }
+ } catch (KeeperException ke) {
+ watcher.abort("Unexpected error from ZK, stopping candidate", ke);
+ candidate.stop("Unexpected error from ZK: "+ke.getMessage());
+ return;
+ }
+
+ // wait for next chance
+ synchronized(leaderExists) {
+ while (leaderExists.get() && !candidate.isStopped()) {
+ try {
+ leaderExists.wait();
+ } catch (InterruptedException ie) {
+ LOG.debug("Interrupted waiting on leader", ie);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Removes the leader znode, if it is currently claimed by this instance.
+ */
+ public void stepDownAsLeader() {
+ try {
+ synchronized(leaderExists) {
+ if (!leaderExists.get()) {
+ return;
+ }
+ byte[] leaderId = ZKUtil.getData(watcher, leaderZNode);
+ if (leaderId != null && Bytes.equals(nodeId, leaderId)) {
+ LOG.info("Stepping down as leader");
+ ZKUtil.deleteNodeFailSilent(watcher, leaderZNode);
+ leaderExists.set(false);
+ } else {
+ LOG.info("Not current leader, no need to step down");
+ }
+ }
+ } catch (KeeperException ke) {
+ watcher.abort("Unhandled zookeeper exception removing leader node", ke);
+ candidate.stop("Unhandled zookeeper exception removing leader node: "
+ + ke.getMessage());
+ } catch (InterruptedException e) {
+ watcher.abort("Unhandled zookeeper exception removing leader node", e);
+ candidate.stop("Unhandled zookeeper exception removing leader node: "
+ + e.getMessage());
+ }
+ }
+
+ public boolean hasLeader() {
+ return leaderExists.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKListener.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKListener.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKListener.java
new file mode 100644
index 0000000..595e713
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKListener.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Base class for internal listeners of ZooKeeper events.
+ *
+ * The {@link ZKWatcher} for a process will execute the appropriate
+ * methods of implementations of this class. In order to receive events from
+ * the watcher, every listener must register itself via {@link ZKWatcher#registerListener}.
+ *
+ * Subclasses need only override those methods in which they are interested.
+ *
+ * Note that the watcher will be blocked when invoking methods in listeners so
+ * they must not be long-running.
+ */
+@InterfaceAudience.Private
+public abstract class ZKListener {
+
+ // Reference to the zk watcher which also contains configuration and constants
+ protected ZKWatcher watcher;
+
+ /**
+ * Construct a ZooKeeper event listener.
+ */
+ public ZKListener(ZKWatcher watcher) {
+ this.watcher = watcher;
+ }
+
+ /**
+ * Called when a new node has been created.
+ * @param path full path of the new node
+ */
+ public void nodeCreated(String path) {
+ // no-op
+ }
+
+ /**
+ * Called when a node has been deleted
+ * @param path full path of the deleted node
+ */
+ public void nodeDeleted(String path) {
+ // no-op
+ }
+
+ /**
+ * Called when an existing node has changed data.
+ * @param path full path of the updated node
+ */
+ public void nodeDataChanged(String path) {
+ // no-op
+ }
+
+ /**
+ * Called when an existing node has a child node added or removed.
+ * @param path full path of the node whose children have changed
+ */
+ public void nodeChildrenChanged(String path) {
+ // no-op
+ }
+
+ /**
+ * @return The watcher associated with this listener
+ */
+ public ZKWatcher getWatcher() {
+ return this.watcher;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java
new file mode 100644
index 0000000..9cb0e7d
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.shaded.com.google.common.base.Stopwatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeperMain;
+
+/**
+ * Tool for running ZookeeperMain from HBase by reading a ZooKeeper server
+ * from HBase XML configuration.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class ZKMainServer {
+ private static final String SERVER_ARG = "-server";
+
+ public String parse(final Configuration c) {
+ return ZKConfig.getZKQuorumServersString(c);
+ }
+
+ /**
+ * ZooKeeper 3.4.6 broke being able to pass commands on command line.
+ * See ZOOKEEPER-1897. This class is a hack to restore this faclity.
+ */
+ private static class HACK_UNTIL_ZOOKEEPER_1897_ZooKeeperMain extends ZooKeeperMain {
+ public HACK_UNTIL_ZOOKEEPER_1897_ZooKeeperMain(String[] args)
+ throws IOException, InterruptedException {
+ super(args);
+ // Make sure we are connected before we proceed. Can take a while on some systems. If we
+ // run the command without being connected, we get ConnectionLoss KeeperErrorConnection...
+ Stopwatch stopWatch = Stopwatch.createStarted();
+ while (!this.zk.getState().isConnected()) {
+ Thread.sleep(1);
+ if (stopWatch.elapsed(TimeUnit.SECONDS) > 10) {
+ throw new InterruptedException("Failed connect after waiting " +
+ stopWatch.elapsed(TimeUnit.SECONDS) + "seconds; state=" + this.zk.getState() +
+ "; " + this.zk);
+ }
+ }
+ }
+
+ /**
+ * Run the command-line args passed. Calls System.exit when done.
+ * @throws KeeperException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void runCmdLine() throws KeeperException, IOException, InterruptedException {
+ processCmd(this.cl);
+ System.exit(0);
+ }
+ }
+
+ /**
+ * @param args
+ * @return True if argument strings have a '-server' in them.
+ */
+ private static boolean hasServer(final String args[]) {
+ return args.length > 0 && args[0].equals(SERVER_ARG);
+ }
+
+ /**
+ * @param args
+ * @return True if command-line arguments were passed.
+ */
+ private static boolean hasCommandLineArguments(final String args[]) {
+ if (hasServer(args)) {
+ if (args.length < 2) throw new IllegalStateException("-server param but no value");
+ return args.length > 2;
+ }
+ return args.length > 0;
+ }
+
+ /**
+ * Run the tool.
+ * @param args Command line arguments. First arg is path to zookeepers file.
+ */
+ public static void main(String args[]) throws Exception {
+ String [] newArgs = args;
+ if (!hasServer(args)) {
+ // Add the zk ensemble from configuration if none passed on command-line.
+ Configuration conf = HBaseConfiguration.create();
+ String hostport = new ZKMainServer().parse(conf);
+ if (hostport != null && hostport.length() > 0) {
+ newArgs = new String[args.length + 2];
+ System.arraycopy(args, 0, newArgs, 2, args.length);
+ newArgs[0] = "-server";
+ newArgs[1] = hostport;
+ }
+ }
+ // If command-line arguments, run our hack so they are executed.
+ // ZOOKEEPER-1897 was committed to zookeeper-3.4.6 but elsewhere in this class we say
+ // 3.4.6 breaks command-processing; TODO.
+ if (hasCommandLineArguments(args)) {
+ HACK_UNTIL_ZOOKEEPER_1897_ZooKeeperMain zkm =
+ new HACK_UNTIL_ZOOKEEPER_1897_ZooKeeperMain(newArgs);
+ zkm.runCmdLine();
+ } else {
+ ZooKeeperMain.main(newArgs);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetrics.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetrics.java
new file mode 100644
index 0000000..20d4a55
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetrics.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Class used to push numbers about ZooKeeper into the metrics subsystem. This will take a
+ * single function call and turn it into multiple manipulations of the hadoop metrics system.
+ */
+@InterfaceAudience.Private
+public class ZKMetrics implements ZKMetricsListener {
+ private final MetricsZooKeeperSource source;
+
+ public ZKMetrics() {
+ this(CompatibilitySingletonFactory.getInstance(MetricsZooKeeperSource.class));
+ }
+
+ @VisibleForTesting
+ public ZKMetrics(MetricsZooKeeperSource s) {
+ this.source = s;
+ }
+
+ @Override
+ public void registerAuthFailedException() {
+ source.incrementAuthFailedCount();
+ }
+
+ @Override
+ public void registerConnectionLossException() {
+ source.incrementConnectionLossCount();
+ }
+
+ @Override
+ public void registerDataInconsistencyException() {
+ source.incrementDataInconsistencyCount();
+ }
+
+ @Override
+ public void registerInvalidACLException() {
+ source.incrementInvalidACLCount();
+ }
+
+ @Override
+ public void registerNoAuthException() {
+ source.incrementNoAuthCount();
+ }
+
+ @Override
+ public void registerOperationTimeoutException() {
+ source.incrementOperationTimeoutCount();
+ }
+
+ @Override
+ public void registerRuntimeInconsistencyException() {
+ source.incrementRuntimeInconsistencyCount();
+ }
+
+ @Override
+ public void registerSessionExpiredException() {
+ source.incrementSessionExpiredCount();
+ }
+
+ @Override
+ public void registerSystemErrorException() {
+ source.incrementSystemErrorCount();
+ }
+
+ @Override
+ public void registerFailedZKCall() {
+ source.incrementTotalFailedZKCalls();
+ }
+
+ @Override
+ public void registerReadOperationLatency(long latency) {
+ source.recordReadOperationLatency(latency);
+ }
+
+ @Override
+ public void registerWriteOperationLatency(long latency) {
+ source.recordWriteOperationLatency(latency);
+ }
+
+ @Override
+ public void registerSyncOperationLatency(long latency) {
+ source.recordSyncOperationLatency(latency);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetricsListener.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetricsListener.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetricsListener.java
new file mode 100644
index 0000000..f17925e
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetricsListener.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface ZKMetricsListener {
+
+ /**
+ * An AUTHFAILED Exception was seen.
+ */
+ void registerAuthFailedException();
+
+ /**
+ * A CONNECTIONLOSS Exception was seen.
+ */
+ void registerConnectionLossException();
+
+ /**
+ * A DATAINCONSISTENCY Exception was seen.
+ */
+ void registerDataInconsistencyException();
+
+ /**
+ * An INVALIDACL Exception was seen.
+ */
+ void registerInvalidACLException();
+
+ /**
+ * A NOAUTH Exception was seen.
+ */
+ void registerNoAuthException();
+
+ /**
+ * A OPERATIONTIMEOUT Exception was seen.
+ */
+ void registerOperationTimeoutException();
+
+ /**
+ * A RUNTIMEINCONSISTENCY Exception was seen.
+ */
+ void registerRuntimeInconsistencyException();
+
+ /**
+ * A SESSIONEXPIRED Exception was seen.
+ */
+ void registerSessionExpiredException();
+
+ /**
+ * A SYSTEMERROR Exception was seen.
+ */
+ void registerSystemErrorException();
+
+ /**
+ * A ZooKeeper API Call failed.
+ */
+ void registerFailedZKCall();
+
+ /**
+ * Register the latency incurred for read operations.
+ */
+ void registerReadOperationLatency(long latency);
+
+ /**
+ * Register the latency incurred for write operations.
+ */
+ void registerWriteOperationLatency(long latency);
+
+ /**
+ * Register the latency incurred for sync operations.
+ */
+ void registerSyncOperationLatency(long latency);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java
new file mode 100644
index 0000000..8ce41e3
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java
@@ -0,0 +1,251 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracks the availability and value of a single ZooKeeper node.
+ *
+ * <p>Utilizes the {@link ZKListener} interface to get the necessary
+ * ZooKeeper events related to the node.
+ *
+ * <p>This is the base class used by trackers in both the Master and
+ * RegionServers.
+ */
+@InterfaceAudience.Private
+public abstract class ZKNodeTracker extends ZKListener {
+ // LOG is being used in subclasses, hence keeping it protected
+ protected static final Log LOG = LogFactory.getLog(ZKNodeTracker.class);
+ /** Path of node being tracked */
+ protected final String node;
+
+ /** Data of the node being tracked */
+ private byte [] data;
+
+ /** Used to abort if a fatal error occurs */
+ protected final Abortable abortable;
+
+ private boolean stopped = false;
+
+ /**
+ * Constructs a new ZK node tracker.
+ *
+ * <p>After construction, use {@link #start} to kick off tracking.
+ *
+ * @param watcher
+ * @param node
+ * @param abortable
+ */
+ public ZKNodeTracker(ZKWatcher watcher, String node,
+ Abortable abortable) {
+ super(watcher);
+ this.node = node;
+ this.abortable = abortable;
+ this.data = null;
+ }
+
+ /**
+ * Starts the tracking of the node in ZooKeeper.
+ *
+ * <p>Use {@link #blockUntilAvailable()} to block until the node is available
+ * or {@link #getData(boolean)} to get the data of the node if it is available.
+ */
+ public synchronized void start() {
+ this.watcher.registerListener(this);
+ try {
+ if(ZKUtil.watchAndCheckExists(watcher, node)) {
+ byte [] data = ZKUtil.getDataAndWatch(watcher, node);
+ if(data != null) {
+ this.data = data;
+ } else {
+ // It existed but now does not, try again to ensure a watch is set
+ LOG.debug("Try starting again because there is no data from " + node);
+ start();
+ }
+ }
+ } catch (KeeperException e) {
+ abortable.abort("Unexpected exception during initialization, aborting", e);
+ }
+ }
+
+ public synchronized void stop() {
+ this.stopped = true;
+ notifyAll();
+ }
+
+ /**
+ * Gets the data of the node, blocking until the node is available.
+ *
+ * @return data of the node
+ * @throws InterruptedException if the waiting thread is interrupted
+ */
+ public synchronized byte [] blockUntilAvailable()
+ throws InterruptedException {
+ return blockUntilAvailable(0, false);
+ }
+
+ /**
+ * Gets the data of the node, blocking until the node is available or the
+ * specified timeout has elapsed.
+ *
+ * @param timeout maximum time to wait for the node data to be available,
+ * n milliseconds. Pass 0 for no timeout.
+ * @return data of the node
+ * @throws InterruptedException if the waiting thread is interrupted
+ */
+ public synchronized byte [] blockUntilAvailable(long timeout, boolean refresh)
+ throws InterruptedException {
+ if (timeout < 0) throw new IllegalArgumentException();
+ boolean notimeout = timeout == 0;
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+ if (refresh) {
+ try {
+ // This does not create a watch if the node does not exists
+ this.data = ZKUtil.getDataAndWatch(watcher, node);
+ } catch(KeeperException e) {
+ // We use to abort here, but in some cases the abort is ignored (
+ // (empty Abortable), so it's better to log...
+ LOG.warn("Unexpected exception handling blockUntilAvailable", e);
+ abortable.abort("Unexpected exception handling blockUntilAvailable", e);
+ }
+ }
+ boolean nodeExistsChecked = (!refresh ||data!=null);
+ while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
+ if (!nodeExistsChecked) {
+ try {
+ nodeExistsChecked = (ZKUtil.checkExists(watcher, node) != -1);
+ } catch (KeeperException e) {
+ LOG.warn(
+ "Got exception while trying to check existence in ZooKeeper" +
+ " of the node: "+node+", retrying if timeout not reached",e );
+ }
+
+ // It did not exists, and now it does.
+ if (nodeExistsChecked){
+ LOG.debug("Node " + node + " now exists, resetting a watcher");
+ try {
+ // This does not create a watch if the node does not exists
+ this.data = ZKUtil.getDataAndWatch(watcher, node);
+ } catch (KeeperException e) {
+ LOG.warn("Unexpected exception handling blockUntilAvailable", e);
+ abortable.abort("Unexpected exception handling blockUntilAvailable", e);
+ }
+ }
+ }
+ // We expect a notification; but we wait with a
+ // a timeout to lower the impact of a race condition if any
+ wait(100);
+ remaining = timeout - (System.currentTimeMillis() - startTime);
+ }
+ return this.data;
+ }
+
+ /**
+ * Gets the data of the node.
+ *
+ * <p>If the node is currently available, the most up-to-date known version of
+ * the data is returned. If the node is not currently available, null is
+ * returned.
+ * @param refresh whether to refresh the data by calling ZK directly.
+ * @return data of the node, null if unavailable
+ */
+ public synchronized byte [] getData(boolean refresh) {
+ if (refresh) {
+ try {
+ this.data = ZKUtil.getDataAndWatch(watcher, node);
+ } catch(KeeperException e) {
+ abortable.abort("Unexpected exception handling getData", e);
+ }
+ }
+ return this.data;
+ }
+
+ public String getNode() {
+ return this.node;
+ }
+
+ @Override
+ public synchronized void nodeCreated(String path) {
+ if (!path.equals(node)) return;
+ try {
+ byte [] data = ZKUtil.getDataAndWatch(watcher, node);
+ if (data != null) {
+ this.data = data;
+ notifyAll();
+ } else {
+ nodeDeleted(path);
+ }
+ } catch(KeeperException e) {
+ abortable.abort("Unexpected exception handling nodeCreated event", e);
+ }
+ }
+
+ @Override
+ public synchronized void nodeDeleted(String path) {
+ if(path.equals(node)) {
+ try {
+ if(ZKUtil.watchAndCheckExists(watcher, node)) {
+ nodeCreated(path);
+ } else {
+ this.data = null;
+ }
+ } catch(KeeperException e) {
+ abortable.abort("Unexpected exception handling nodeDeleted event", e);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void nodeDataChanged(String path) {
+ if(path.equals(node)) {
+ nodeCreated(path);
+ }
+ }
+
+ /**
+ * Checks if the baseznode set as per the property 'zookeeper.znode.parent'
+ * exists.
+ * @return true if baseznode exists.
+ * false if doesnot exists.
+ */
+ public boolean checkIfBaseNodeAvailable() {
+ try {
+ if (ZKUtil.checkExists(watcher, watcher.znodePaths.baseZNode) == -1) {
+ return false;
+ }
+ } catch (KeeperException e) {
+ abortable.abort("Exception while checking if basenode (" + watcher.znodePaths.baseZNode
+ + ") exists in ZooKeeper.",
+ e);
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "ZKNodeTracker{" +
+ "node='" + node + ", stopped=" + stopped + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
new file mode 100644
index 0000000..8116c23
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Tool for reading ZooKeeper servers from HBase XML configuration and producing
+ * a line-by-line list for use by bash scripts.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class ZKServerTool {
+ public static ServerName[] readZKNodes(Configuration conf) {
+ List<ServerName> hosts = new LinkedList<>();
+ String quorum = conf.get(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
+
+ String[] values = quorum.split(",");
+ for (String value : values) {
+ String[] parts = value.split(":");
+ String host = parts[0];
+ int port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
+ if (parts.length > 1) {
+ port = Integer.parseInt(parts[1]);
+ }
+ hosts.add(ServerName.valueOf(host, port, -1));
+ }
+ return hosts.toArray(new ServerName[hosts.size()]);
+ }
+
+ /**
+ * Run the tool.
+ * @param args Command line arguments.
+ */
+ public static void main(String args[]) {
+ for(ServerName server: readZKNodes(HBaseConfiguration.create())) {
+ // bin/zookeeper.sh relies on the "ZK host" string for grepping which is case sensitive.
+ System.out.println("ZK host: " + server.getHostname());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
new file mode 100644
index 0000000..33cc43e
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Common methods and attributes used by SplitLogManager and SplitLogWorker running distributed splitting of WAL logs.
+ */
+@InterfaceAudience.Private
+public class ZKSplitLog {
+ private static final Log LOG = LogFactory.getLog(ZKSplitLog.class);
+
+ /**
+ * Gets the full path node name for the log file being split.
+ * This method will url encode the filename.
+ * @param zkw zk reference
+ * @param filename log file name (only the basename)
+ */
+ public static String getEncodedNodeName(ZKWatcher zkw, String filename) {
+ return ZNodePaths.joinZNode(zkw.znodePaths.splitLogZNode, encode(filename));
+ }
+
+ public static String getFileName(String node) {
+ String basename = node.substring(node.lastIndexOf('/') + 1);
+ return decode(basename);
+ }
+
+ static String encode(String s) {
+ try {
+ return URLEncoder.encode(s, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("URLENCODER doesn't support UTF-8");
+ }
+ }
+
+ static String decode(String s) {
+ try {
+ return URLDecoder.decode(s, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("URLDecoder doesn't support UTF-8");
+ }
+ }
+
+ public static String getRescanNode(ZKWatcher zkw) {
+ return ZNodePaths.joinZNode(zkw.znodePaths.splitLogZNode, "RESCAN");
+ }
+
+ /**
+ * @param name the last part in path
+ * @return whether the node name represents a rescan node
+ */
+ public static boolean isRescanNode(String name) {
+ return name.startsWith("RESCAN");
+ }
+
+ /**
+ * @param zkw
+ * @param path the absolute path, starts with '/'
+ * @return whether the path represents a rescan node
+ */
+ public static boolean isRescanNode(ZKWatcher zkw, String path) {
+ String prefix = getRescanNode(zkw);
+ if (path.length() <= prefix.length()) {
+ return false;
+ }
+ for (int i = 0; i < prefix.length(); i++) {
+ if (prefix.charAt(i) != path.charAt(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static Path getSplitLogDir(Path rootdir, String tmpname) {
+ return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname);
+ }
+
+ public static void markCorrupted(Path rootdir, String logFileName,
+ FileSystem fs) {
+ Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
+ try {
+ fs.createNewFile(file);
+ } catch (IOException e) {
+ LOG.warn("Could not flag a log file as corrupted. Failed to create " +
+ file, e);
+ }
+ }
+
+ public static boolean isCorrupted(Path rootdir, String logFileName,
+ FileSystem fs) throws IOException {
+ Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
+ boolean isCorrupt;
+ isCorrupt = fs.exists(file);
+ return isCorrupt;
+ }
+}