You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2013/03/04 12:24:53 UTC
svn commit: r1452257 [5/14] - in /hbase/branches/0.94:
security/src/main/java/org/apache/hadoop/hbase/security/access/
security/src/test/java/org/apache/hadoop/hbase/security/access/
src/main/jamon/org/apache/hadoop/hbase/tmpl/master/ src/main/java/org...
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java Mon Mar 4 11:24:50 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+
+/**
+ * This is the notification interface for Procedures that encapsulates message passing from
+ * members to a coordinator. Each of these calls should send a message to the coordinator.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ProcedureMemberRpcs extends Closeable {
+
+ /**
+ * Initialize and start any threads or connections the member needs.
+ */
+ public void start(ProcedureMember member);
+
+ /**
+ * Each subprocedure is being executed on a member. This is the identifier for the member.
+ * @return the member name
+ */
+ public String getMemberName();
+
+ /**
+ * Notify the coordinator that we aborted the specified {@link Subprocedure}
+ *
+ * @param sub the {@link Subprocedure} we are aborting
+ * @param cause the reason why the member's subprocedure aborted
+ * @throws IOException thrown when the rpcs can't reach the other members of the procedure (and
+ * thus can't recover).
+ */
+ public void sendMemberAborted(Subprocedure sub, ForeignException cause) throws IOException;
+
+ /**
+ * Notify the coordinator that the specified {@link Subprocedure} has acquired the locally required
+ * barrier condition.
+ *
+ * @param sub the specified {@link Subprocedure}
+ * @throws IOException if we can't reach the coordinator
+ */
+ public void sendMemberAcquired(Subprocedure sub) throws IOException;
+
+ /**
+ * Notify the coordinator that the specified {@link Subprocedure} has completed the work that
+ * needed to be done under the global barrier.
+ *
+ * @param sub the specified {@link Subprocedure}
+ * @throws IOException if we can't reach the coordinator
+ */
+ public void sendMemberCompleted(Subprocedure sub) throws IOException;
+}
\ No newline at end of file
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java Mon Mar 4 11:24:50 2013
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
+import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
+
+/**
+ * Distributed procedure member's Subprocedure. A procedure is sarted on a ProcedureCoordinator
+ * which communicates with ProcedureMembers who create and start its part of the Procedure. This
+ * sub part is called a Subprocedure
+ *
+ * Users should subclass this and implement {@link #acquireBarrier()} (get local barrier for this
+ * member), {@link #insideBarrier()} (execute while globally barriered and release barrier) and
+ * {@link #cleanup(Exception)} (release state associated with subprocedure.)
+ *
+ * When submitted to a ProcedureMemeber, the call method is executed in a separate thread.
+ * Latches are use too block its progress and trigger continuations when barrier conditions are
+ * met.
+ *
+ * Exception that makes it out of calls to {@link #acquireBarrier()} or {@link #insideBarrier()}
+ * gets converted into {@link ForeignException}, which will get propagated to the
+ * {@link ProcedureCoordinator}.
+ *
+ * There is a category of procedure (ex: online-snapshots), and a user-specified instance-specific
+ * barrierName. (ex: snapshot121126).
+ */
+abstract public class Subprocedure implements Callable<Void> {
+ private static final Log LOG = LogFactory.getLog(Subprocedure.class);
+
+ // Name of the procedure
+ final private String barrierName;
+
+ //
+ // Execution state
+ //
+
+ /** wait on before allowing the in barrier phase to proceed */
+ private final CountDownLatch inGlobalBarrier;
+ /** counted down when the Subprocedure has completed */
+ private final CountDownLatch releasedLocalBarrier;
+
+ //
+ // Error handling
+ //
+ /** monitor to check for errors */
+ protected final ForeignExceptionDispatcher monitor;
+ /** frequency to check for errors (ms) */
+ protected final long wakeFrequency;
+ protected final TimeoutExceptionInjector executionTimeoutTimer;
+ protected final ProcedureMemberRpcs rpcs;
+
+ private volatile boolean complete = false;
+
+ /**
+ * @param member reference to the member managing this subprocedure
+ * @param procName name of the procedure this subprocedure is associated with
+ * @param monitor notified if there is an error in the subprocedure
+ * @param wakeFrequency time in millis to wake to check if there is an error via the monitor (in
+ * milliseconds).
+ * @param timeout time in millis that will trigger a subprocedure abort if it has not completed
+ */
+ public Subprocedure(ProcedureMember member, String procName, ForeignExceptionDispatcher monitor,
+ long wakeFrequency, long timeout) {
+ // Asserts should be caught during unit testing
+ assert member != null : "procedure member should be non-null";
+ assert member.getRpcs() != null : "rpc handlers should be non-null";
+ assert procName != null : "procedure name should be non-null";
+ assert monitor != null : "monitor should be non-null";
+
+ // Default to a very large timeout
+ this.rpcs = member.getRpcs();
+ this.barrierName = procName;
+ this.monitor = monitor;
+ // forward any failures to coordinator. Since this is a dispatcher, resend loops should not be
+ // possible.
+ this.monitor.addListener(new ForeignExceptionListener() {
+ @Override
+ public void receive(ForeignException ee) {
+ // if this is a notification from a remote source, just log
+ if (ee.isRemote()) {
+ LOG.debug("Was remote foreign exception, not redispatching error", ee);
+ return;
+ }
+
+ // if it is local, then send it to the coordinator
+ try {
+ rpcs.sendMemberAborted(Subprocedure.this, ee);
+ } catch (IOException e) {
+ // this will fail all the running procedures, since the connection is down
+ LOG.error("Can't reach controller, not propagating error", e);
+ }
+ }
+ });
+
+ this.wakeFrequency = wakeFrequency;
+ this.inGlobalBarrier = new CountDownLatch(1);
+ this.releasedLocalBarrier = new CountDownLatch(1);
+
+ // accept error from timer thread, this needs to be started.
+ this.executionTimeoutTimer = new TimeoutExceptionInjector(monitor, timeout);
+ }
+
+ public String getName() {
+ return barrierName;
+ }
+
+ public String getMemberName() {
+ return rpcs.getMemberName();
+ }
+
+ private void rethrowException() throws ForeignException {
+ monitor.rethrowException();
+ }
+
+ /**
+ * Execute the Subprocedure {@link #acquireBarrier()} and {@link #insideBarrier()} methods
+ * while keeping some state for other threads to access.
+ *
+ * This would normally be executed by the ProcedureMemeber when a acquire message comes from the
+ * coordinator. Rpcs are used to spend message back to the coordinator after different phases
+ * are executed. Any exceptions caught during the execution (except for InterrupedException) get
+ * converted and propagated to coordinator via {@link ProcedureMemberRpcs#sendMemberAborted(
+ * Subprocedure, ForeignException)}.
+ */
+ @SuppressWarnings("finally")
+ final public Void call() {
+ LOG.debug("Starting subprocedure '" + barrierName + "' with timeout " +
+ executionTimeoutTimer.getMaxTime() + "ms");
+ // start the execution timeout timer
+ executionTimeoutTimer.start();
+
+ try {
+ // start by checking for error first
+ rethrowException();
+ LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage");
+ acquireBarrier();
+ LOG.debug("Subprocedure '" + barrierName + "' locally acquired");
+
+ // vote yes to coordinator about being prepared
+ rpcs.sendMemberAcquired(this);
+ LOG.debug("Subprocedure '" + barrierName + "' coordinator notified of 'acquire', waiting on" +
+ " 'reached' or 'abort' from coordinator");
+
+ // wait for the procedure to reach global barrier before proceding
+ waitForReachedGlobalBarrier();
+ rethrowException(); // if Coordinator aborts, will bail from here with exception
+
+ // In traditional 2PC, if a member reaches this state the TX has been committed and the
+ // member is responsible for rolling forward and recovering and completing the subsequent
+ // operations in the case of failure. It cannot rollback.
+ //
+ // This implementation is not 2PC since it can still rollback here, and thus has different
+ // semantics.
+
+ LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator.");
+ insideBarrier();
+ LOG.debug("Subprocedure '" + barrierName + "' locally completed");
+
+ // Ack that the member has executed and released local barrier
+ rpcs.sendMemberCompleted(this);
+ LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion");
+
+ // make sure we didn't get an external exception
+ rethrowException();
+ } catch (Exception e) {
+ String msg = null;
+ if (e instanceof InterruptedException) {
+ msg = "Procedure '" + barrierName + "' aborting due to interrupt!" +
+ " Likely due to pool shutdown.";
+ Thread.currentThread().interrupt();
+ } else if (e instanceof ForeignException) {
+ msg = "Subprocedure '" + barrierName + "' aborting due to a ForeignException!";
+ } else {
+ msg = "Subprocedure '" + barrierName + "' failed!";
+ }
+ LOG.error(msg , e);
+ cancel(msg, e);
+
+ LOG.debug("Subprocedure '" + barrierName + "' running cleanup.");
+ cleanup(e);
+ } finally {
+ releasedLocalBarrier.countDown();
+
+ // tell the timer we are done, if we get here successfully
+ executionTimeoutTimer.complete();
+ complete = true;
+ LOG.debug("Subprocedure '" + barrierName + "' completed.");
+ return null;
+ }
+ }
+
+ boolean isComplete() {
+ return complete;
+ }
+
+ /**
+ * exposed for testing.
+ */
+ ForeignExceptionSnare getErrorCheckable() {
+ return this.monitor;
+ }
+
+ /**
+ * The implementation of this method should gather and hold required resources (locks, disk
+ * space, etc) to satisfy the Procedures barrier condition. For example, this would be where
+ * to make all the regions on a RS on the quiescent for an procedure that required all regions
+ * to be globally quiesed.
+ *
+ * Users should override this method. If a quiescent is not required, this is overkill but
+ * can still be used to execute a procedure on all members and to propagate any exceptions.
+ *
+ * @throws ForeignException
+ */
+ abstract public void acquireBarrier() throws ForeignException;
+
+ /**
+ * The implementation of this method should act with the assumption that the barrier condition
+ * has been satisfied. Continuing the previous example, a condition could be that all RS's
+ * globally have been quiesced, and procedures that require this precondition could be
+ * implemented here.
+ *
+ * Users should override this method. If quiescense is not required, this can be a no-op
+ *
+ * @throws ForeignException
+ */
+ abstract public void insideBarrier() throws ForeignException;
+
+ /**
+ * Users should override this method. This implementation of this method should rollback and
+ * cleanup any temporary or partially completed state that the {@link #acquireBarrier()} may have
+ * created.
+ * @param e
+ */
+ abstract public void cleanup(Exception e);
+
+ /**
+ * Method to cancel the Subprocedure by injecting an exception from and external source.
+ * @param cause
+ */
+ public void cancel(String msg, Throwable cause) {
+ LOG.error(msg, cause);
+ if (cause instanceof ForeignException) {
+ monitor.receive((ForeignException) cause);
+ } else {
+ monitor.receive(new ForeignException(getMemberName(), cause));
+ }
+ }
+
+ /**
+ * Callback for the member rpcs to call when the global barrier has been reached. This
+ * unblocks the main subprocedure exectuion thread so that the Subprocedure's
+ * {@link #insideBarrier()} method can be run.
+ */
+ public void receiveReachedGlobalBarrier() {
+ inGlobalBarrier.countDown();
+ }
+
+ //
+ // Subprocedure Internal State interface
+ //
+
+ /**
+ * Wait for the reached global barrier notification.
+ *
+ * Package visibility for testing
+ *
+ * @throws ForeignException
+ * @throws InterruptedException
+ */
+ void waitForReachedGlobalBarrier() throws ForeignException, InterruptedException {
+ Procedure.waitForLatch(inGlobalBarrier, monitor, wakeFrequency,
+ barrierName + ":remote acquired");
+ }
+
+ /**
+ * Waits until the entire procedure has globally completed, or has been aborted.
+ * @throws ForeignException
+ * @throws InterruptedException
+ */
+ public void waitForLocallyCompleted() throws ForeignException, InterruptedException {
+ Procedure.waitForLatch(releasedLocalBarrier, monitor, wakeFrequency,
+ barrierName + ":completed");
+ }
+
+ /**
+ * Empty Subprocedure for testing.
+ *
+ * Must be public for stubbing used in testing to work.
+ */
+ public static class SubprocedureImpl extends Subprocedure {
+
+ public SubprocedureImpl(ProcedureMember member, String opName,
+ ForeignExceptionDispatcher monitor, long wakeFrequency, long timeout) {
+ super(member, opName, monitor, wakeFrequency, timeout);
+ }
+
+ @Override
+ public void acquireBarrier() throws ForeignException {}
+
+ @Override
+ public void insideBarrier() throws ForeignException {}
+
+ @Override
+ public void cleanup(Exception e) {}
+ };
+}
\ No newline at end of file
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/SubprocedureFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/SubprocedureFactory.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/SubprocedureFactory.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/SubprocedureFactory.java Mon Mar 4 11:24:50 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Task builder to build instances of a {@link ProcedureMember}'s {@link Subprocedure}s.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface SubprocedureFactory {
+
+ /**
+ * Build {@link Subprocedure} when requested.
+ * @param procName name of the procedure associated with this subprocedure
+ * @param procArgs arguments passed from the coordinator about the procedure
+ * @return {@link Subprocedure} to run or <tt>null</tt> if the no operation should be run
+ * @throws IllegalArgumentException if the operation could not be run because of errors in the
+ * request
+ * @throws IllegalStateException if the current runner cannot accept any more new requests
+ */
+ public Subprocedure buildSubprocedure(String procName, byte[] procArgs);
+}
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java Mon Mar 4 11:24:50 2013
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator}
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
+ public static final Log LOG = LogFactory.getLog(ZKProcedureCoordinatorRpcs.class);
+ private ZKProcedureUtil zkProc = null;
+ protected ProcedureCoordinator coordinator = null; // if started this should be non-null
+
+ ZooKeeperWatcher watcher;
+ String procedureType;
+ String coordName;
+
+ /**
+ * @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()}
+ * @param procedureClass procedure type name is a category for when there are multiple kinds of
+ * procedures.-- this becomes a znode so be aware of the naming restrictions
+ * @param coordName name of the node running the coordinator
+ * @throws KeeperException if an unexpected zk error occurs
+ */
+ public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
+ String procedureClass, String coordName) throws KeeperException {
+ this.watcher = watcher;
+ this.procedureType = procedureClass;
+ this.coordName = coordName;
+ }
+
+ /**
+ * The "acquire" phase. The coordinator creates a new procType/acquired/ znode dir. If znodes
+ * appear, first acquire to relevant listener or sets watch waiting for notification of
+ * the acquire node
+ *
+ * @param proc the Procedure
+ * @param info data to be stored in the acquire node
+ * @param nodeNames children of the acquire phase
+ * @throws IOException if any failure occurs.
+ */
+ @Override
+ final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
+ throws IOException, IllegalArgumentException {
+ String procName = proc.getName();
+ // start watching for the abort node
+ String abortNode = zkProc.getAbortZNode(procName);
+ try {
+ // check to see if the abort node already exists
+ if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
+ abort(abortNode);
+ }
+ // If we get an abort node watch triggered here, we'll go complete creating the acquired
+ // znode but then handle the acquire znode and bail out
+ } catch (KeeperException e) {
+ LOG.error("Failed to watch abort", e);
+ throw new IOException("Failed while watching abort node:" + abortNode, e);
+ }
+
+ // create the acquire barrier
+ String acquire = zkProc.getAcquiredBarrierNode(procName);
+ LOG.debug("Creating acquire znode:" + acquire);
+ try {
+ // notify all the procedure listeners to look for the acquire node
+ byte[] data = ProtobufUtil.prependPBMagic(info);
+ ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
+ // loop through all the children of the acquire phase and watch for them
+ for (String node : nodeNames) {
+ String znode = ZKUtil.joinZNode(acquire, node);
+ LOG.debug("Watching for acquire node:" + znode);
+ if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
+ coordinator.memberAcquiredBarrier(procName, node);
+ }
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Failed while creating acquire node:" + acquire, e);
+ }
+ }
+
+ @Override
+ public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
+ String procName = proc.getName();
+ String reachedNode = zkProc.getReachedBarrierNode(procName);
+ LOG.debug("Creating reached barrier zk node:" + reachedNode);
+ try {
+ // create the reached znode and watch for the reached znodes
+ ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
+ // loop through all the children of the acquire phase and watch for them
+ for (String node : nodeNames) {
+ String znode = ZKUtil.joinZNode(reachedNode, node);
+ if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
+ coordinator.memberFinishedBarrier(procName, node);
+ }
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Failed while creating reached node:" + reachedNode, e);
+ }
+ }
+
+
+ /**
+ * Delete znodes that are no longer in use.
+ */
+ @Override
+ final public void resetMembers(Procedure proc) throws IOException {
+ String procName = proc.getName();
+ boolean stillGettingNotifications = false;
+ do {
+ try {
+ LOG.debug("Attempting to clean out zk node for op:" + procName);
+ zkProc.clearZNodes(procName);
+ stillGettingNotifications = false;
+ } catch (KeeperException.NotEmptyException e) {
+ // recursive delete isn't transactional (yet) so we need to deal with cases where we get
+ // children trickling in
+ stillGettingNotifications = true;
+ } catch (KeeperException e) {
+ throw new IOException("Failed to complete reset procedure " + procName, e);
+ }
+ } while (stillGettingNotifications);
+ }
+
+ /**
+ * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about.
+ * @return true if succeed, false if encountered initialization errors.
+ */
+ final public boolean start(final ProcedureCoordinator coordinator) {
+ if (this.coordinator != null) {
+ throw new IllegalStateException(
+ "ZKProcedureCoordinator already started and already has listener installed");
+ }
+ this.coordinator = coordinator;
+
+ try {
+ this.zkProc = new ZKProcedureUtil(watcher, procedureType, coordName) {
+ @Override
+ public void nodeCreated(String path) {
+ if (!isInProcedurePath(path)) return;
+ LOG.debug("Node created: " + path);
+ logZKTree(this.baseZNode);
+ if (isAcquiredPathNode(path)) {
+ // node wasn't present when we created the watch so zk event triggers acquire
+ coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
+ ZKUtil.getNodeName(path));
+ } else if (isReachedPathNode(path)) {
+ // node was absent when we created the watch so zk event triggers the finished barrier.
+
+ // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order.
+ coordinator.memberFinishedBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
+ ZKUtil.getNodeName(path));
+ } else if (isAbortPathNode(path)) {
+ abort(path);
+ }
+ }
+ };
+ zkProc.clearChildZNodes();
+ } catch (KeeperException e) {
+ LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
+ return false;
+ }
+
+ LOG.debug("Starting the controller for procedure member:" + zkProc.getMemberName());
+ return true;
+ }
+
+ /**
+ * This is the abort message being sent by the coordinator to member
+ *
+ * TODO this code isn't actually used but can be used to issue a cancellation from the
+ * coordinator.
+ */
+ @Override
+ final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
+ String procName = proc.getName();
+ LOG.debug("Aborting procedure '" + procName + "' in zk");
+ String procAbortNode = zkProc.getAbortZNode(procName);
+ try {
+ LOG.debug("Creating abort znode:" + procAbortNode);
+ String source = (ee.getSource() == null) ? coordName : ee.getSource();
+ byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
+ // first create the znode for the procedure
+ ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
+ LOG.debug("Finished creating abort node:" + procAbortNode);
+ } catch (KeeperException e) {
+ // possible that we get this error for the procedure if we already reset the zk state, but in
+ // that case we should still get an error for that procedure anyways
+ zkProc.logZKTree(zkProc.baseZNode);
+ coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode
+ + " to abort procedure '" + procName + "'", new IOException(e));
+ }
+ }
+
+ /**
+ * Receive a notification and propagate it to the local coordinator
+ * @param abortNode full znode path to the failed procedure information
+ */
+ protected void abort(String abortNode) {
+ String procName = ZKUtil.getNodeName(abortNode);
+ ForeignException ee = null;
+ try {
+ byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
+ if (!ProtobufUtil.isPBMagicPrefix(data)) {
+ LOG.warn("Got an error notification for op:" + abortNode
+ + " but we can't read the information. Killing the procedure.");
+ // we got a remote exception, but we can't describe it
+ ee = new ForeignException(coordName, "Data in abort node is illegally formatted. ignoring content.");
+ } else {
+
+ data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
+ ee = ForeignException.deserialize(data);
+ }
+ } catch (InvalidProtocolBufferException e) {
+ LOG.warn("Got an error notification for op:" + abortNode
+ + " but we can't read the information. Killing the procedure.");
+ // we got a remote exception, but we can't describe it
+ ee = new ForeignException(coordName, e);
+ } catch (KeeperException e) {
+ coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
+ + zkProc.getAbortZnode(), new IOException(e));
+ }
+ coordinator.abortProcedure(procName, ee);
+ }
+
+ @Override
+ final public void close() throws IOException {
+ zkProc.close();
+ }
+
+ /**
+ * Used in testing
+ */
+ final ZKProcedureUtil getZkProcedureUtil() {
+ return zkProc;
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java Mon Mar 4 11:24:50 2013
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * ZooKeeper based controller for a procedure member.
+ * <p>
+ * There can only be one {@link ZKProcedureMemberRpcs} per procedure type per member,
+ * since each procedure type is bound to a single set of znodes. You can have multiple
+ * {@link ZKProcedureMemberRpcs} on the same server, each serving a different member
+ * name, but each individual rpcs is still bound to a single member name (and since they are
+ * used to determine global progress, its important to not get this wrong).
+ * <p>
+ * To make this slightly more confusing, you can run multiple, concurrent procedures at the same
+ * time (as long as they have different types), from the same controller, but the same node name
+ * must be used for each procedure (though there is no conflict between the two procedure as long
+ * as they have distinct names).
+ * <p>
+ * There is no real error recovery with this mechanism currently -- if any the coordinator fails,
+ * its re-initialization will delete the znodes and require all in progress subprocedures to start
+ * anew.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
+
+ private static final Log LOG = LogFactory.getLog(ZKProcedureMemberRpcs.class);
+ private final String memberName;
+
+ protected ProcedureMember member;
+ private ZKProcedureUtil zkController;
+
+ /**
+ * Must call {@link #start(ProcedureMember)} before this can be used.
+ * @param watcher {@link ZooKeeperWatcher} to be owned by <tt>this</tt>. Closed via
+ * {@link #close()}.
+ * @param procType name of the znode describing the procedure type
+ * @param memberName name of the member to join the procedure
+ * @throws KeeperException if we can't reach zookeeper
+ */
+ public ZKProcedureMemberRpcs(ZooKeeperWatcher watcher,
+ String procType, String memberName) throws KeeperException {
+ this.zkController = new ZKProcedureUtil(watcher, procType, memberName) {
+ @Override
+ public void nodeCreated(String path) {
+ if (!isInProcedurePath(path)) {
+ return;
+ }
+
+ LOG.info("Received created event:" + path);
+ // if it is a simple start/end/abort then we just rewatch the node
+ if (isAcquiredNode(path)) {
+ waitForNewProcedures();
+ return;
+ } else if (isAbortNode(path)) {
+ watchForAbortedProcedures();
+ return;
+ }
+ String parent = ZKUtil.getParent(path);
+ // if its the end barrier, the procedure can be completed
+ if (isReachedNode(parent)) {
+ receivedReachedGlobalBarrier(path);
+ return;
+ } else if (isAbortNode(parent)) {
+ abort(path);
+ return;
+ } else if (isAcquiredNode(parent)) {
+ startNewSubprocedure(path);
+ } else {
+ LOG.debug("Ignoring created notification for node:" + path);
+ }
+ }
+
+ @Override
+ public void nodeChildrenChanged(String path) {
+ if (path.equals(this.acquiredZnode)) {
+ LOG.info("Received procedure start children changed event: " + path);
+ waitForNewProcedures();
+ } else if (path.equals(this.abortZnode)) {
+ LOG.info("Received procedure abort children changed event: " + path);
+ watchForAbortedProcedures();
+ }
+ }
+ };
+ this.memberName = memberName;
+ }
+
+ public ZKProcedureUtil getZkController() {
+ return zkController;
+ }
+
+ @Override
+ public String getMemberName() {
+ return memberName;
+ }
+
+ /**
+ * Pass along the procedure global barrier notification to any listeners
+ * @param path full znode path that cause the notification
+ */
+ private void receivedReachedGlobalBarrier(String path) {
+ LOG.debug("Recieved reached global barrier:" + path);
+ String procName = ZKUtil.getNodeName(path);
+ this.member.receivedReachedGlobalBarrier(procName);
+ }
+
+ private void watchForAbortedProcedures() {
+ LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
+ try {
+ // this is the list of the currently aborted procedues
+ for (String node : ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
+ zkController.getAbortZnode())) {
+ String abortNode = ZKUtil.joinZNode(zkController.getAbortZnode(), node);
+ abort(abortNode);
+ }
+ } catch (KeeperException e) {
+ member.controllerConnectionFailure("Failed to list children for abort node:"
+ + zkController.getAbortZnode(), new IOException(e));
+ }
+ }
+
+ private void waitForNewProcedures() {
+ // watch for new procedues that we need to start subprocedures for
+ LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'");
+ List<String> runningProcedures = null;
+ try {
+ runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
+ zkController.getAcquiredBarrier());
+ if (runningProcedures == null) {
+ LOG.debug("No running procedures.");
+ return;
+ }
+ } catch (KeeperException e) {
+ member.controllerConnectionFailure("General failure when watching for new procedures",
+ new IOException(e));
+ }
+ if (runningProcedures == null) {
+ LOG.debug("No running procedures.");
+ return;
+ }
+ for (String procName : runningProcedures) {
+ // then read in the procedure information
+ String path = ZKUtil.joinZNode(zkController.getAcquiredBarrier(), procName);
+ startNewSubprocedure(path);
+ }
+ }
+
+ /**
+ * Kick off a new sub-procedure on the listener with the data stored in the passed znode.
+ * <p>
+ * Will attempt to create the same procedure multiple times if an procedure znode with the same
+ * name is created. It is left up the coordinator to ensure this doesn't occur.
+ * @param path full path to the znode for the procedure to start
+ */
+ private synchronized void startNewSubprocedure(String path) {
+ LOG.debug("Found procedure znode: " + path);
+ String opName = ZKUtil.getNodeName(path);
+ // start watching for an abort notification for the procedure
+ String abortZNode = zkController.getAbortZNode(opName);
+ try {
+ if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) {
+ LOG.debug("Not starting:" + opName + " because we already have an abort notification.");
+ return;
+ }
+ } catch (KeeperException e) {
+ member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
+ + ") for procedure :" + opName, new IOException(e));
+ return;
+ }
+
+ // get the data for the procedure
+ Subprocedure subproc = null;
+ try {
+ byte[] data = ZKUtil.getData(zkController.getWatcher(), path);
+ LOG.debug("start proc data length is " + data.length);
+ if (!ProtobufUtil.isPBMagicPrefix(data)) {
+ String msg = "Data in for starting procuedure " + opName + " is illegally formatted. "
+ + "Killing the procedure.";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
+ LOG.debug("Found data for znode:" + path);
+ subproc = member.createSubprocedure(opName, data);
+ member.submitSubprocedure(subproc);
+ } catch (IllegalArgumentException iae ) {
+ LOG.error("Illegal argument exception", iae);
+ sendMemberAborted(subproc, new ForeignException(getMemberName(), iae));
+ } catch (IllegalStateException ise) {
+ LOG.error("Illegal state exception ", ise);
+ sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
+ } catch (KeeperException e) {
+ member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
+ new IOException(e));
+ }
+ }
+
+ /**
+ * This attempts to create an acquired state znode for the procedure (snapshot name).
+ *
+ * It then looks for the reached znode to trigger in-barrier execution. If not present we
+ * have a watcher, if present then trigger the in-barrier action.
+ */
+ @Override
+ public void sendMemberAcquired(Subprocedure sub) throws IOException {
+ String procName = sub.getName();
+ try {
+ LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
+ + ") in zk");
+ String acquiredZNode = ZKUtil.joinZNode(ZKProcedureUtil.getAcquireBarrierNode(
+ zkController, procName), memberName);
+ ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
+
+ // watch for the complete node for this snapshot
+ String reachedBarrier = zkController.getReachedBarrierNode(procName);
+ LOG.debug("Watch for global barrier reached:" + reachedBarrier);
+ if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
+ receivedReachedGlobalBarrier(reachedBarrier);
+ }
+ } catch (KeeperException e) {
+ member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
+ + procName + " and member: " + memberName, new IOException(e));
+ }
+ }
+
+ /**
+ * This acts as the ack for a completed snapshot
+ */
+ @Override
+ public void sendMemberCompleted(Subprocedure sub) throws IOException {
+ String procName = sub.getName();
+ LOG.debug("Marking procedure '" + procName + "' completed for member '" + memberName
+ + "' in zk");
+ String joinPath = ZKUtil.joinZNode(zkController.getReachedBarrierNode(procName), memberName);
+ try {
+ ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath);
+ } catch (KeeperException e) {
+ member.controllerConnectionFailure("Failed to post zk node:" + joinPath
+ + " to join procedure barrier.", new IOException(e));
+ }
+ }
+
+ /**
+ * This should be called by the member and should write a serialized root cause exception as
+ * to the abort znode.
+ */
+ @Override
+ public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
+ if (sub == null) {
+ LOG.error("Failed due to null subprocedure", ee);
+ return;
+ }
+ String procName = sub.getName();
+ LOG.debug("Aborting procedure (" + procName + ") in zk");
+ String procAbortZNode = zkController.getAbortZNode(procName);
+ try {
+ String source = (ee.getSource() == null) ? memberName: ee.getSource();
+ byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
+ ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
+ LOG.debug("Finished creating abort znode:" + procAbortZNode);
+ } catch (KeeperException e) {
+ // possible that we get this error for the procedure if we already reset the zk state, but in
+ // that case we should still get an error for that procedure anyways
+ zkController.logZKTree(zkController.getBaseZnode());
+ member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
+ + " to abort procedure", new IOException(e));
+ }
+ }
+
+ /**
+ * Pass along the found abort notification to the listener
+ * @param abortZNode full znode path to the failed procedure information
+ */
+ protected void abort(String abortZNode) {
+ LOG.debug("Aborting procedure member for znode " + abortZNode);
+ String opName = ZKUtil.getNodeName(abortZNode);
+ try {
+ byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode);
+
+ // figure out the data we need to pass
+ ForeignException ee;
+ try {
+ if (!ProtobufUtil.isPBMagicPrefix(data)) {
+ String msg = "Illegally formatted data in abort node for proc " + opName
+ + ". Killing the procedure.";
+ LOG.error(msg);
+ // we got a remote exception, but we can't describe it so just return exn from here
+ ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg));
+ } else {
+ data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
+ ee = ForeignException.deserialize(data);
+ }
+ } catch (InvalidProtocolBufferException e) {
+ LOG.warn("Got an error notification for op:" + opName
+ + " but we can't read the information. Killing the procedure.");
+ // we got a remote exception, but we can't describe it so just return exn from here
+ ee = new ForeignException(getMemberName(), e);
+ }
+
+ this.member.receiveAbortProcedure(opName, ee);
+ } catch (KeeperException e) {
+ member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
+ + zkController.getAbortZnode(), new IOException(e));
+ }
+ }
+
+ public void start(ProcedureMember listener) {
+ LOG.debug("Starting procedure member '" + this.memberName + "'");
+ this.member = listener;
+ watchForAbortedProcedures();
+ waitForNewProcedures();
+ }
+
+ @Override
+ public void close() throws IOException {
+ zkController.close();
+ }
+
+}
\ No newline at end of file
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java Mon Mar 4 11:24:50 2013
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This is a shared ZooKeeper-based znode management utils for distributed procedure. All znode
+ * operations should go through the provided methods in coordinators and members.
+ *
+ * Layout of nodes in ZK is
+ * /hbase/[op name]/acquired/
+ * [op instance] - op data/
+ * /[nodes that have acquired]
+ * /reached/
+ * [op instance]/
+ * /[nodes that have completed]
+ * /abort/
+ * [op instance] - failure data
+ *
+ * NOTE: while acquired and completed are znode dirs, abort is actually just a znode.
+ *
+ * Assumption here that procedure names are unique
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class ZKProcedureUtil
+ extends ZooKeeperListener implements Closeable {
+
+ private static final Log LOG = LogFactory.getLog(ZKProcedureUtil.class);
+
+ public static final String ACQUIRED_BARRIER_ZNODE_DEFAULT = "acquired";
+ public static final String REACHED_BARRIER_ZNODE_DEFAULT = "reached";
+ public static final String ABORT_ZNODE_DEFAULT = "abort";
+
+ public final String baseZNode;
+ protected final String acquiredZnode;
+ protected final String reachedZnode;
+ protected final String abortZnode;
+
+ protected final String memberName;
+
+ /**
+ * Top-level watcher/controller for procedures across the cluster.
+ * <p>
+ * On instantiation, this ensures the procedure znodes exist. This however requires the passed in
+ * watcher has been started.
+ * @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
+ * {@link #close()}
+ * @param procDescription name of the znode describing the procedure to run
+ * @param memberName name of the member from which we are interacting with running procedures
+ * @throws KeeperException when the procedure znodes cannot be created
+ */
+ public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription,
+ String memberName) throws KeeperException {
+ super(watcher);
+ this.memberName = memberName;
+ // make sure we are listening for events
+ watcher.registerListener(this);
+ // setup paths for the zknodes used in procedures
+ this.baseZNode = ZKUtil.joinZNode(watcher.baseZNode, procDescription);
+ acquiredZnode = ZKUtil.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT);
+ reachedZnode = ZKUtil.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT);
+ abortZnode = ZKUtil.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT);
+
+ // first make sure all the ZK nodes exist
+ // make sure all the parents exist (sometimes not the case in tests)
+ ZKUtil.createWithParents(watcher, acquiredZnode);
+ // regular create because all the parents exist
+ ZKUtil.createAndFailSilent(watcher, reachedZnode);
+ ZKUtil.createAndFailSilent(watcher, abortZnode);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (watcher != null) {
+ watcher.close();
+ }
+ }
+
+ public String getAcquiredBarrierNode(String opInstanceName) {
+ return ZKProcedureUtil.getAcquireBarrierNode(this, opInstanceName);
+ }
+
+ public String getReachedBarrierNode(String opInstanceName) {
+ return ZKProcedureUtil.getReachedBarrierNode(this, opInstanceName);
+ }
+
+ public String getAbortZNode(String opInstanceName) {
+ return ZKProcedureUtil.getAbortNode(this, opInstanceName);
+ }
+
+ public String getAbortZnode() {
+ return abortZnode;
+ }
+
+ public String getBaseZnode() {
+ return baseZNode;
+ }
+
+ public String getAcquiredBarrier() {
+ return acquiredZnode;
+ }
+
+ public String getMemberName() {
+ return memberName;
+ }
+
+ /**
+ * Get the full znode path for the node used by the coordinator to trigger a global barrier
+ * acquire on each subprocedure.
+ * @param controller controller running the procedure
+ * @param opInstanceName name of the running procedure instance (not the procedure description).
+ * @return full znode path to the prepare barrier/start node
+ */
+ public static String getAcquireBarrierNode(ZKProcedureUtil controller,
+ String opInstanceName) {
+ return ZKUtil.joinZNode(controller.acquiredZnode, opInstanceName);
+ }
+
+ /**
+ * Get the full znode path for the node used by the coordinator to trigger a global barrier
+ * execution and release on each subprocedure.
+ * @param controller controller running the procedure
+ * @param opInstanceName name of the running procedure instance (not the procedure description).
+ * @return full znode path to the commit barrier
+ */
+ public static String getReachedBarrierNode(ZKProcedureUtil controller,
+ String opInstanceName) {
+ return ZKUtil.joinZNode(controller.reachedZnode, opInstanceName);
+ }
+
+ /**
+ * Get the full znode path for the node used by the coordinator or member to trigger an abort
+ * of the global barrier acquisition or execution in subprocedures.
+ * @param controller controller running the procedure
+ * @param opInstanceName name of the running procedure instance (not the procedure description).
+ * @return full znode path to the abort znode
+ */
+ public static String getAbortNode(ZKProcedureUtil controller, String opInstanceName) {
+ return ZKUtil.joinZNode(controller.abortZnode, opInstanceName);
+ }
+
+ public ZooKeeperWatcher getWatcher() {
+ return watcher;
+ }
+
+ /**
+ * Is this a procedure related znode path?
+ *
+ * TODO: this is not strict, can return true if had name just starts with same prefix but is
+ * different zdir.
+ *
+ * @return true if starts with baseZnode
+ */
+ boolean isInProcedurePath(String path) {
+ return path.startsWith(baseZNode);
+ }
+
+ /**
+ * Is this the exact procedure barrier acquired znode
+ */
+ boolean isAcquiredNode(String path) {
+ return path.equals(acquiredZnode);
+ }
+
+
+ /**
+ * Is this in the procedure barrier acquired znode path
+ */
+ boolean isAcquiredPathNode(String path) {
+ return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode);
+ }
+
+ /**
+ * Is this the exact procedure barrier reached znode
+ */
+ boolean isReachedNode(String path) {
+ return path.equals(reachedZnode);
+ }
+
+ /**
+ * Is this in the procedure barrier reached znode path
+ */
+ boolean isReachedPathNode(String path) {
+ return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode);
+ }
+
+
+ /**
+ * Is this in the procedure barrier abort znode path
+ */
+ boolean isAbortNode(String path) {
+ return path.equals(abortZnode);
+ }
+
+ /**
+ * Is this in the procedure barrier abort znode path
+ */
+ public boolean isAbortPathNode(String path) {
+ return path.startsWith(this.abortZnode) && !path.equals(abortZnode);
+ }
+
+ // --------------------------------------------------------------------------
+ // internal debugging methods
+ // --------------------------------------------------------------------------
+ /**
+ * Recursively print the current state of ZK (non-transactional)
+ * @param root name of the root directory in zk to print
+ * @throws KeeperException
+ */
+ void logZKTree(String root) {
+ if (!LOG.isDebugEnabled()) return;
+ LOG.debug("Current zk system:");
+ String prefix = "|-";
+ LOG.debug(prefix + root);
+ try {
+ logZKTree(root, prefix);
+ } catch (KeeperException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Helper method to print the current state of the ZK tree.
+ * @see #logZKTree(String)
+ * @throws KeeperException if an unexpected exception occurs
+ */
+ protected void logZKTree(String root, String prefix) throws KeeperException {
+ List<String> children = ZKUtil.listChildrenNoWatch(watcher, root);
+ if (children == null) return;
+ for (String child : children) {
+ LOG.debug(prefix + child);
+ String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
+ logZKTree(node, prefix + "---");
+ }
+ }
+
+ public void clearChildZNodes() throws KeeperException {
+ // TODO This is potentially racy since not atomic. update when we support zk that has multi
+ LOG.info("Clearing all procedure znodes: " + acquiredZnode + " " + reachedZnode + " "
+ + abortZnode);
+
+ // If the coordinator was shutdown mid-procedure, then we are going to lose
+ // an procedure that was previously started by cleaning out all the previous state. Its much
+ // harder to figure out how to keep an procedure going and the subject of HBASE-5487.
+ ZKUtil.deleteChildrenRecursively(watcher, acquiredZnode);
+ ZKUtil.deleteChildrenRecursively(watcher, reachedZnode);
+ ZKUtil.deleteChildrenRecursively(watcher, abortZnode);
+ }
+
+ public void clearZNodes(String procedureName) throws KeeperException {
+ // TODO This is potentially racy since not atomic. update when we support zk that has multi
+ LOG.info("Clearing all znodes for procedure " + procedureName + "including nodes "
+ + acquiredZnode + " " + reachedZnode + " " + abortZnode);
+ ZKUtil.deleteNodeRecursively(watcher, getAcquiredBarrierNode(procedureName));
+ ZKUtil.deleteNodeRecursively(watcher, getReachedBarrierNode(procedureName));
+ ZKUtil.deleteNodeRecursively(watcher, getAbortZNode(procedureName));
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Mon Mar 4 11:24:50 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.protobuf;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Protobufs utility.
+ */
+@SuppressWarnings("deprecation")
+public final class ProtobufUtil {
+
+ private ProtobufUtil() {
+ }
+
+ /**
+ * Magic we put ahead of a serialized protobuf message.
+ * For example, all znode content is protobuf messages with the below magic
+ * for preamble.
+ */
+ public static final byte [] PB_MAGIC = new byte [] {'P', 'B', 'U', 'F'};
+ private static final String PB_MAGIC_STR = Bytes.toString(PB_MAGIC);
+
+ /**
+ * Prepend the passed bytes with four bytes of magic, {@link #PB_MAGIC}, to flag what
+ * follows as a protobuf in hbase. Prepend these bytes to all content written to znodes, etc.
+ * @param bytes Bytes to decorate
+ * @return The passed <code>bytes</codes> with magic prepended (Creates a new
+ * byte array that is <code>bytes.length</code> plus {@link #PB_MAGIC}.length.
+ */
+ public static byte [] prependPBMagic(final byte [] bytes) {
+ return Bytes.add(PB_MAGIC, bytes);
+ }
+
+ /**
+ * @param bytes Bytes to check.
+ * @return True if passed <code>bytes</code> has {@link #PB_MAGIC} for a prefix.
+ */
+ public static boolean isPBMagicPrefix(final byte [] bytes) {
+ if (bytes == null || bytes.length < PB_MAGIC.length) return false;
+ return Bytes.compareTo(PB_MAGIC, 0, PB_MAGIC.length, bytes, 0, PB_MAGIC.length) == 0;
+ }
+
+ /**
+ * @return Length of {@link #PB_MAGIC}
+ */
+ public static int lengthOfPBMagic() {
+ return PB_MAGIC.length;
+ }
+}